Skip to main content

systemprompt_cloud/api_client/
streams.rs

1//! Server-Sent Events (SSE) subscriptions for tenant provisioning
2//! and Paddle checkout pipelines.
3
4use std::pin::Pin;
5
6use futures::stream::{Stream, StreamExt};
7use reqwest_eventsource::{Event, EventSource};
8use systemprompt_identifiers::TenantId;
9use systemprompt_models::modules::ApiPaths;
10
11use super::CloudApiClient;
12use super::types::{CheckoutEvent, ProvisioningEvent};
13use crate::error::{CloudError, CloudResult};
14
15impl CloudApiClient {
16    pub fn subscribe_provisioning_events(
17        &self,
18        tenant_id: &TenantId,
19    ) -> Pin<Box<dyn Stream<Item = CloudResult<ProvisioningEvent>> + Send + '_>> {
20        let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
21        let token = self.token().to_string();
22        let client = self.client.clone();
23
24        let stream = async_stream::stream! {
25            let request = client
26                .get(&url)
27                .header("Authorization", format!("Bearer {}", token))
28                .header("Accept", "text/event-stream");
29
30            let mut es = match EventSource::new(request) {
31                Ok(es) => es,
32                Err(e) => {
33                    yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
34                    return;
35                }
36            };
37
38            while let Some(event) = es.next().await {
39                match event {
40                    Ok(Event::Open) => {
41                        tracing::debug!("SSE connection opened");
42                    }
43                    Ok(Event::Message(message)) => {
44                        if message.event == "provisioning" || message.event == "message" {
45                            match serde_json::from_str::<ProvisioningEvent>(&message.data) {
46                                Ok(event) => yield Ok(event),
47                                Err(e) => {
48                                    tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
49                                }
50                            }
51                        } else if message.event == "heartbeat" {
52                            tracing::trace!("SSE heartbeat received");
53                        }
54                    }
55                    Err(reqwest_eventsource::Error::StreamEnded) => {
56                        tracing::debug!("SSE stream ended normally");
57                        break;
58                    }
59                    Err(e) => {
60                        tracing::warn!(error = %e, "SSE stream error");
61                        yield Err(CloudError::SseStream { message: e.to_string() });
62                        break;
63                    }
64                }
65            }
66        };
67
68        Box::pin(stream)
69    }
70
71    pub fn subscribe_checkout_events(
72        &self,
73        checkout_session_id: &systemprompt_identifiers::CheckoutSessionId,
74    ) -> Pin<Box<dyn Stream<Item = CloudResult<CheckoutEvent>> + Send + '_>> {
75        let url = format!(
76            "{}/api/v1/checkout/{}/events",
77            self.api_url(),
78            checkout_session_id.as_str()
79        );
80        let token = self.token().to_string();
81        let client = self.client.clone();
82
83        let stream = async_stream::stream! {
84            tracing::debug!(url = %url, "Building SSE request");
85            let request = client
86                .get(&url)
87                .header("Authorization", format!("Bearer {}", token))
88                .header("Accept", "text/event-stream");
89
90            let mut es = match EventSource::new(request) {
91                Ok(es) => es,
92                Err(e) => {
93                    tracing::error!(error = %e, "Failed to create EventSource");
94                    yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
95                    return;
96                }
97            };
98
99            while let Some(event) = es.next().await {
100                match event {
101                    Ok(Event::Open) => {
102                        tracing::debug!("SSE connection opened");
103                    }
104                    Ok(Event::Message(message)) => {
105                        tracing::debug!(event_type = %message.event, "SSE message received");
106                        if message.event == "provisioning" {
107                            match serde_json::from_str::<CheckoutEvent>(&message.data) {
108                                Ok(event) => yield Ok(event),
109                                Err(e) => {
110                                    tracing::warn!(error = %e, "Failed to parse checkout event");
111                                }
112                            }
113                        }
114                    }
115                    Err(reqwest_eventsource::Error::StreamEnded) => {
116                        tracing::debug!("SSE stream ended");
117                        break;
118                    }
119                    Err(e) => {
120                        tracing::warn!(error = %e, "SSE stream error");
121                        yield Err(CloudError::SseStream { message: e.to_string() });
122                        break;
123                    }
124                }
125            }
126        };
127
128        Box::pin(stream)
129    }
130}