Skip to main content

systemprompt_cloud/api_client/
streams.rs

1use anyhow::{Context, Result, anyhow};
2use futures::stream::{Stream, StreamExt};
3use reqwest_eventsource::{Event, EventSource};
4use std::pin::Pin;
5use systemprompt_models::modules::ApiPaths;
6
7use super::CloudApiClient;
8use super::types::{CheckoutEvent, ProvisioningEvent};
9
10impl CloudApiClient {
11    pub fn subscribe_provisioning_events(
12        &self,
13        tenant_id: &str,
14    ) -> Pin<Box<dyn Stream<Item = Result<ProvisioningEvent>> + Send + '_>> {
15        let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
16        let token = self.token().to_string();
17        let client = self.client.clone();
18
19        let stream = async_stream::stream! {
20            let request = client
21                .get(&url)
22                .header("Authorization", format!("Bearer {}", token))
23                .header("Accept", "text/event-stream");
24
25            let mut es = EventSource::new(request).context("Failed to create SSE connection")?;
26
27            while let Some(event) = es.next().await {
28                match event {
29                    Ok(Event::Open) => {
30                        tracing::debug!("SSE connection opened");
31                    }
32                    Ok(Event::Message(message)) => {
33                        if message.event == "provisioning" || message.event == "message" {
34                            match serde_json::from_str::<ProvisioningEvent>(&message.data) {
35                                Ok(event) => yield Ok(event),
36                                Err(e) => {
37                                    tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
38                                }
39                            }
40                        } else if message.event == "heartbeat" {
41                            tracing::trace!("SSE heartbeat received");
42                        }
43                    }
44                    Err(reqwest_eventsource::Error::StreamEnded) => {
45                        tracing::debug!("SSE stream ended normally");
46                        break;
47                    }
48                    Err(e) => {
49                        tracing::warn!(error = %e, "SSE stream error");
50                        yield Err(anyhow!("SSE stream error: {}", e));
51                        break;
52                    }
53                }
54            }
55        };
56
57        Box::pin(stream)
58    }
59
60    pub fn subscribe_checkout_events(
61        &self,
62        checkout_session_id: &str,
63    ) -> Pin<Box<dyn Stream<Item = Result<CheckoutEvent>> + Send + '_>> {
64        let url = format!(
65            "{}/api/v1/checkout/{}/events",
66            self.api_url(),
67            checkout_session_id
68        );
69        let token = self.token().to_string();
70        let client = self.client.clone();
71
72        let stream = async_stream::stream! {
73            tracing::debug!(url = %url, "Building SSE request");
74            let request = client
75                .get(&url)
76                .header("Authorization", format!("Bearer {}", token))
77                .header("Accept", "text/event-stream");
78
79            let mut es = match EventSource::new(request) {
80                Ok(es) => es,
81                Err(e) => {
82                    tracing::error!(error = %e, "Failed to create EventSource");
83                    yield Err(anyhow!("Failed to create SSE connection: {}", e));
84                    return;
85                }
86            };
87
88            while let Some(event) = es.next().await {
89                match event {
90                    Ok(Event::Open) => {
91                        tracing::debug!("SSE connection opened");
92                    }
93                    Ok(Event::Message(message)) => {
94                        tracing::debug!(event_type = %message.event, "SSE message received");
95                        if message.event == "provisioning" {
96                            match serde_json::from_str::<CheckoutEvent>(&message.data) {
97                                Ok(event) => yield Ok(event),
98                                Err(e) => {
99                                    tracing::warn!(error = %e, "Failed to parse checkout event");
100                                }
101                            }
102                        }
103                    }
104                    Err(reqwest_eventsource::Error::StreamEnded) => {
105                        tracing::debug!("SSE stream ended");
106                        break;
107                    }
108                    Err(e) => {
109                        tracing::warn!(error = %e, "SSE stream error");
110                        yield Err(anyhow!("SSE stream error: {}", e));
111                        break;
112                    }
113                }
114            }
115        };
116
117        Box::pin(stream)
118    }
119}