Skip to main content

systemprompt_cloud/api_client/
streams.rs

1use anyhow::{anyhow, Context, Result};
2use futures::stream::{Stream, StreamExt};
3use reqwest::Client;
4use reqwest_eventsource::{Event, EventSource};
5use std::pin::Pin;
6use systemprompt_models::modules::ApiPaths;
7
8use super::types::{CheckoutEvent, ProvisioningEvent};
9use super::CloudApiClient;
10
11impl CloudApiClient {
12    pub fn subscribe_provisioning_events(
13        &self,
14        tenant_id: &str,
15    ) -> Pin<Box<dyn Stream<Item = Result<ProvisioningEvent>> + Send + '_>> {
16        let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
17        let token = self.token().to_string();
18
19        let stream = async_stream::stream! {
20            let request = Client::new()
21                .get(&url)
22                .header("Authorization", format!("Bearer {}", token))
23                .header("Accept", "text/event-stream");
24
25            let mut es = EventSource::new(request).context("Failed to create SSE connection")?;
26
27            while let Some(event) = es.next().await {
28                match event {
29                    Ok(Event::Open) => {
30                        tracing::debug!("SSE connection opened");
31                    }
32                    Ok(Event::Message(message)) => {
33                        if message.event == "provisioning" || message.event == "message" {
34                            match serde_json::from_str::<ProvisioningEvent>(&message.data) {
35                                Ok(event) => yield Ok(event),
36                                Err(e) => {
37                                    tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
38                                }
39                            }
40                        } else if message.event == "heartbeat" {
41                            tracing::trace!("SSE heartbeat received");
42                        }
43                    }
44                    Err(reqwest_eventsource::Error::StreamEnded) => {
45                        tracing::debug!("SSE stream ended normally");
46                        break;
47                    }
48                    Err(e) => {
49                        tracing::warn!(error = %e, "SSE stream error");
50                        yield Err(anyhow!("SSE stream error: {}", e));
51                        break;
52                    }
53                }
54            }
55        };
56
57        Box::pin(stream)
58    }
59
60    pub fn subscribe_checkout_events(
61        &self,
62        checkout_session_id: &str,
63    ) -> Pin<Box<dyn Stream<Item = Result<CheckoutEvent>> + Send + '_>> {
64        let url = format!(
65            "{}/api/v1/checkout/{}/events",
66            self.api_url(),
67            checkout_session_id
68        );
69        let token = self.token().to_string();
70
71        let stream = async_stream::stream! {
72            tracing::debug!(url = %url, "Building SSE request");
73            let request = Client::new()
74                .get(&url)
75                .header("Authorization", format!("Bearer {}", token))
76                .header("Accept", "text/event-stream");
77
78            let mut es = match EventSource::new(request) {
79                Ok(es) => es,
80                Err(e) => {
81                    tracing::error!(error = %e, "Failed to create EventSource");
82                    yield Err(anyhow!("Failed to create SSE connection: {}", e));
83                    return;
84                }
85            };
86
87            while let Some(event) = es.next().await {
88                match event {
89                    Ok(Event::Open) => {
90                        tracing::debug!("SSE connection opened");
91                    }
92                    Ok(Event::Message(message)) => {
93                        tracing::debug!(event_type = %message.event, "SSE message received");
94                        if message.event == "provisioning" {
95                            match serde_json::from_str::<CheckoutEvent>(&message.data) {
96                                Ok(event) => yield Ok(event),
97                                Err(e) => {
98                                    tracing::warn!(error = %e, "Failed to parse checkout event");
99                                }
100                            }
101                        }
102                    }
103                    Err(reqwest_eventsource::Error::StreamEnded) => {
104                        tracing::debug!("SSE stream ended");
105                        break;
106                    }
107                    Err(e) => {
108                        tracing::warn!(error = %e, "SSE stream error");
109                        yield Err(anyhow!("SSE stream error: {}", e));
110                        break;
111                    }
112                }
113            }
114        };
115
116        Box::pin(stream)
117    }
118}