systemprompt_cloud/api_client/
streams.rs1use std::pin::Pin;
5
6use futures::stream::{Stream, StreamExt};
7use reqwest_eventsource::{Event, EventSource};
8use systemprompt_models::modules::ApiPaths;
9
10use super::CloudApiClient;
11use super::types::{CheckoutEvent, ProvisioningEvent};
12use crate::error::{CloudError, CloudResult};
13
14impl CloudApiClient {
15 pub fn subscribe_provisioning_events(
16 &self,
17 tenant_id: &str,
18 ) -> Pin<Box<dyn Stream<Item = CloudResult<ProvisioningEvent>> + Send + '_>> {
19 let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
20 let token = self.token().to_string();
21 let client = self.client.clone();
22
23 let stream = async_stream::stream! {
24 let request = client
25 .get(&url)
26 .header("Authorization", format!("Bearer {}", token))
27 .header("Accept", "text/event-stream");
28
29 let mut es = match EventSource::new(request) {
30 Ok(es) => es,
31 Err(e) => {
32 yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
33 return;
34 }
35 };
36
37 while let Some(event) = es.next().await {
38 match event {
39 Ok(Event::Open) => {
40 tracing::debug!("SSE connection opened");
41 }
42 Ok(Event::Message(message)) => {
43 if message.event == "provisioning" || message.event == "message" {
44 match serde_json::from_str::<ProvisioningEvent>(&message.data) {
45 Ok(event) => yield Ok(event),
46 Err(e) => {
47 tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
48 }
49 }
50 } else if message.event == "heartbeat" {
51 tracing::trace!("SSE heartbeat received");
52 }
53 }
54 Err(reqwest_eventsource::Error::StreamEnded) => {
55 tracing::debug!("SSE stream ended normally");
56 break;
57 }
58 Err(e) => {
59 tracing::warn!(error = %e, "SSE stream error");
60 yield Err(CloudError::SseStream { message: e.to_string() });
61 break;
62 }
63 }
64 }
65 };
66
67 Box::pin(stream)
68 }
69
70 pub fn subscribe_checkout_events(
71 &self,
72 checkout_session_id: &str,
73 ) -> Pin<Box<dyn Stream<Item = CloudResult<CheckoutEvent>> + Send + '_>> {
74 let url = format!(
75 "{}/api/v1/checkout/{}/events",
76 self.api_url(),
77 checkout_session_id
78 );
79 let token = self.token().to_string();
80 let client = self.client.clone();
81
82 let stream = async_stream::stream! {
83 tracing::debug!(url = %url, "Building SSE request");
84 let request = client
85 .get(&url)
86 .header("Authorization", format!("Bearer {}", token))
87 .header("Accept", "text/event-stream");
88
89 let mut es = match EventSource::new(request) {
90 Ok(es) => es,
91 Err(e) => {
92 tracing::error!(error = %e, "Failed to create EventSource");
93 yield Err(CloudError::SseStream { message: format!("Failed to create SSE connection: {e}") });
94 return;
95 }
96 };
97
98 while let Some(event) = es.next().await {
99 match event {
100 Ok(Event::Open) => {
101 tracing::debug!("SSE connection opened");
102 }
103 Ok(Event::Message(message)) => {
104 tracing::debug!(event_type = %message.event, "SSE message received");
105 if message.event == "provisioning" {
106 match serde_json::from_str::<CheckoutEvent>(&message.data) {
107 Ok(event) => yield Ok(event),
108 Err(e) => {
109 tracing::warn!(error = %e, "Failed to parse checkout event");
110 }
111 }
112 }
113 }
114 Err(reqwest_eventsource::Error::StreamEnded) => {
115 tracing::debug!("SSE stream ended");
116 break;
117 }
118 Err(e) => {
119 tracing::warn!(error = %e, "SSE stream error");
120 yield Err(CloudError::SseStream { message: e.to_string() });
121 break;
122 }
123 }
124 }
125 };
126
127 Box::pin(stream)
128 }
129}