systemprompt_cloud/api_client/
streams.rs1use std::pin::Pin;
5
6use futures::stream::{Stream, StreamExt};
7use reqwest_eventsource::{Event, EventSource};
8use systemprompt_identifiers::TenantId;
9use systemprompt_models::modules::ApiPaths;
10
11use super::CloudApiClient;
12use super::types::{CheckoutEvent, ProvisioningEvent};
13use crate::error::{CloudError, CloudResult};
14
15impl CloudApiClient {
16 pub fn subscribe_provisioning_events(
17 &self,
18 tenant_id: &TenantId,
19 ) -> Pin<Box<dyn Stream<Item = CloudResult<ProvisioningEvent>> + Send + '_>> {
20 let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
21 let token = self.token().to_string();
22 let client = self.client.clone();
23
24 let stream = async_stream::stream! {
25 let request = client
26 .get(&url)
27 .header("Authorization", format!("Bearer {}", token))
28 .header("Accept", "text/event-stream");
29
30 let mut es = match EventSource::new(request) {
31 Ok(es) => es,
32 Err(e) => {
33 yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
34 return;
35 }
36 };
37
38 while let Some(event) = es.next().await {
39 match event {
40 Ok(Event::Open) => {
41 tracing::debug!("SSE connection opened");
42 }
43 Ok(Event::Message(message)) => {
44 if message.event == "provisioning" || message.event == "message" {
45 match serde_json::from_str::<ProvisioningEvent>(&message.data) {
46 Ok(event) => yield Ok(event),
47 Err(e) => {
48 tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
49 }
50 }
51 } else if message.event == "heartbeat" {
52 tracing::trace!("SSE heartbeat received");
53 }
54 }
55 Err(reqwest_eventsource::Error::StreamEnded) => {
56 tracing::debug!("SSE stream ended normally");
57 break;
58 }
59 Err(e) => {
60 tracing::warn!(error = %e, "SSE stream error");
61 yield Err(CloudError::SseStream { message: e.to_string() });
62 break;
63 }
64 }
65 }
66 };
67
68 Box::pin(stream)
69 }
70
71 pub fn subscribe_checkout_events(
72 &self,
73 checkout_session_id: &systemprompt_identifiers::CheckoutSessionId,
74 ) -> Pin<Box<dyn Stream<Item = CloudResult<CheckoutEvent>> + Send + '_>> {
75 let url = format!(
76 "{}/api/v1/checkout/{}/events",
77 self.api_url(),
78 checkout_session_id.as_str()
79 );
80 let token = self.token().to_string();
81 let client = self.client.clone();
82
83 let stream = async_stream::stream! {
84 tracing::debug!(url = %url, "Building SSE request");
85 let request = client
86 .get(&url)
87 .header("Authorization", format!("Bearer {}", token))
88 .header("Accept", "text/event-stream");
89
90 let mut es = match EventSource::new(request) {
91 Ok(es) => es,
92 Err(e) => {
93 tracing::error!(error = %e, "Failed to create EventSource");
94 yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
95 return;
96 }
97 };
98
99 while let Some(event) = es.next().await {
100 match event {
101 Ok(Event::Open) => {
102 tracing::debug!("SSE connection opened");
103 }
104 Ok(Event::Message(message)) => {
105 tracing::debug!(event_type = %message.event, "SSE message received");
106 if message.event == "provisioning" {
107 match serde_json::from_str::<CheckoutEvent>(&message.data) {
108 Ok(event) => yield Ok(event),
109 Err(e) => {
110 tracing::warn!(error = %e, "Failed to parse checkout event");
111 }
112 }
113 }
114 }
115 Err(reqwest_eventsource::Error::StreamEnded) => {
116 tracing::debug!("SSE stream ended");
117 break;
118 }
119 Err(e) => {
120 tracing::warn!(error = %e, "SSE stream error");
121 yield Err(CloudError::SseStream { message: e.to_string() });
122 break;
123 }
124 }
125 }
126 };
127
128 Box::pin(stream)
129 }
130}