systemprompt_cloud/api_client/
streams.rs1use anyhow::{anyhow, Context, Result};
2use futures::stream::{Stream, StreamExt};
3use reqwest::Client;
4use reqwest_eventsource::{Event, EventSource};
5use std::pin::Pin;
6use systemprompt_models::modules::ApiPaths;
7
8use super::types::{CheckoutEvent, ProvisioningEvent};
9use super::CloudApiClient;
10
11impl CloudApiClient {
12 pub fn subscribe_provisioning_events(
13 &self,
14 tenant_id: &str,
15 ) -> Pin<Box<dyn Stream<Item = Result<ProvisioningEvent>> + Send + '_>> {
16 let url = format!("{}{}", self.api_url(), ApiPaths::tenant_events(tenant_id));
17 let token = self.token().to_string();
18
19 let stream = async_stream::stream! {
20 let request = Client::new()
21 .get(&url)
22 .header("Authorization", format!("Bearer {}", token))
23 .header("Accept", "text/event-stream");
24
25 let mut es = EventSource::new(request).context("Failed to create SSE connection")?;
26
27 while let Some(event) = es.next().await {
28 match event {
29 Ok(Event::Open) => {
30 tracing::debug!("SSE connection opened");
31 }
32 Ok(Event::Message(message)) => {
33 if message.event == "provisioning" || message.event == "message" {
34 match serde_json::from_str::<ProvisioningEvent>(&message.data) {
35 Ok(event) => yield Ok(event),
36 Err(e) => {
37 tracing::warn!(error = %e, data = %message.data, "Failed to parse SSE event");
38 }
39 }
40 } else if message.event == "heartbeat" {
41 tracing::trace!("SSE heartbeat received");
42 }
43 }
44 Err(reqwest_eventsource::Error::StreamEnded) => {
45 tracing::debug!("SSE stream ended normally");
46 break;
47 }
48 Err(e) => {
49 tracing::warn!(error = %e, "SSE stream error");
50 yield Err(anyhow!("SSE stream error: {}", e));
51 break;
52 }
53 }
54 }
55 };
56
57 Box::pin(stream)
58 }
59
60 pub fn subscribe_checkout_events(
61 &self,
62 checkout_session_id: &str,
63 ) -> Pin<Box<dyn Stream<Item = Result<CheckoutEvent>> + Send + '_>> {
64 let url = format!(
65 "{}/api/v1/checkout/{}/events",
66 self.api_url(),
67 checkout_session_id
68 );
69 let token = self.token().to_string();
70
71 let stream = async_stream::stream! {
72 tracing::debug!(url = %url, "Building SSE request");
73 let request = Client::new()
74 .get(&url)
75 .header("Authorization", format!("Bearer {}", token))
76 .header("Accept", "text/event-stream");
77
78 let mut es = match EventSource::new(request) {
79 Ok(es) => es,
80 Err(e) => {
81 tracing::error!(error = %e, "Failed to create EventSource");
82 yield Err(anyhow!("Failed to create SSE connection: {}", e));
83 return;
84 }
85 };
86
87 while let Some(event) = es.next().await {
88 match event {
89 Ok(Event::Open) => {
90 tracing::debug!("SSE connection opened");
91 }
92 Ok(Event::Message(message)) => {
93 tracing::debug!(event_type = %message.event, "SSE message received");
94 if message.event == "provisioning" {
95 match serde_json::from_str::<CheckoutEvent>(&message.data) {
96 Ok(event) => yield Ok(event),
97 Err(e) => {
98 tracing::warn!(error = %e, "Failed to parse checkout event");
99 }
100 }
101 }
102 }
103 Err(reqwest_eventsource::Error::StreamEnded) => {
104 tracing::debug!("SSE stream ended");
105 break;
106 }
107 Err(e) => {
108 tracing::warn!(error = %e, "SSE stream error");
109 yield Err(anyhow!("SSE stream error: {}", e));
110 break;
111 }
112 }
113 }
114 };
115
116 Box::pin(stream)
117 }
118}