Skip to main content

systemprompt_cloud/api_client/
streams.rs

1//! Server-Sent Events (SSE) subscriptions for tenant provisioning
2//! and Paddle checkout pipelines.
3
4use std::pin::Pin;
5
6use futures::stream::{Stream, StreamExt};
7use reqwest_eventsource::{Event, EventSource};
8use systemprompt_models::modules::ApiPaths;
9
10use super::CloudApiClient;
11use super::types::{CheckoutEvent, ProvisioningEvent};
12use crate::error::{CloudError, CloudResult};
13
14impl CloudApiClient {
15    pub fn subscribe_provisioning_events(
16        &self,
17        tenant_id: &str,
18    ) -> Pin<Box<dyn Stream<Item = CloudResult<ProvisioningEvent>> + Send + '_>> {
19        let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
20        let token = self.token().to_string();
21        let client = self.client.clone();
22
23        let stream = async_stream::stream! {
24            let request = client
25                .get(&url)
26                .header("Authorization", format!("Bearer {}", token))
27                .header("Accept", "text/event-stream");
28
29            let mut es = match EventSource::new(request) {
30                Ok(es) => es,
31                Err(e) => {
32                    yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
33                    return;
34                }
35            };
36
37            while let Some(event) = es.next().await {
38                match event {
39                    Ok(Event::Open) => {
40                        tracing::debug!("SSE connection opened");
41                    }
42                    Ok(Event::Message(message)) => {
43                        if message.event == "provisioning" || message.event == "message" {
44                            match serde_json::from_str::<ProvisioningEvent>(&message.data) {
45                                Ok(event) => yield Ok(event),
46                                Err(e) => {
47                                    tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
48                                }
49                            }
50                        } else if message.event == "heartbeat" {
51                            tracing::trace!("SSE heartbeat received");
52                        }
53                    }
54                    Err(reqwest_eventsource::Error::StreamEnded) => {
55                        tracing::debug!("SSE stream ended normally");
56                        break;
57                    }
58                    Err(e) => {
59                        tracing::warn!(error = %e, "SSE stream error");
60                        yield Err(CloudError::SseStream { message: e.to_string() });
61                        break;
62                    }
63                }
64            }
65        };
66
67        Box::pin(stream)
68    }
69
70    pub fn subscribe_checkout_events(
71        &self,
72        checkout_session_id: &str,
73    ) -> Pin<Box<dyn Stream<Item = CloudResult<CheckoutEvent>> + Send + '_>> {
74        let url = format!(
75            "{}/api/v1/checkout/{}/events",
76            self.api_url(),
77            checkout_session_id
78        );
79        let token = self.token().to_string();
80        let client = self.client.clone();
81
82        let stream = async_stream::stream! {
83            tracing::debug!(url = %url, "Building SSE request");
84            let request = client
85                .get(&url)
86                .header("Authorization", format!("Bearer {}", token))
87                .header("Accept", "text/event-stream");
88
89            let mut es = match EventSource::new(request) {
90                Ok(es) => es,
91                Err(e) => {
92                    tracing::error!(error = %e, "Failed to create EventSource");
93                    yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
94                    return;
95                }
96            };
97
98            while let Some(event) = es.next().await {
99                match event {
100                    Ok(Event::Open) => {
101                        tracing::debug!("SSE connection opened");
102                    }
103                    Ok(Event::Message(message)) => {
104                        tracing::debug!(event_type = %message.event, "SSE message received");
105                        if message.event == "provisioning" {
106                            match serde_json::from_str::<CheckoutEvent>(&message.data) {
107                                Ok(event) => yield Ok(event),
108                                Err(e) => {
109                                    tracing::warn!(error = %e, "Failed to parse checkout event");
110                                }
111                            }
112                        }
113                    }
114                    Err(reqwest_eventsource::Error::StreamEnded) => {
115                        tracing::debug!("SSE stream ended");
116                        break;
117                    }
118                    Err(e) => {
119                        tracing::warn!(error = %e, "SSE stream error");
120                        yield Err(CloudError::SseStream { message: e.to_string() });
121                        break;
122                    }
123                }
124            }
125        };
126
127        Box::pin(stream)
128    }
129}