Skip to main content

stygian_graph/adapters/
stream.rs

1//! Server-Sent Events (SSE) stream adapter.
2//!
3//! Implements [`StreamSourcePort`](crate::ports::stream_source::StreamSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) for consuming
4//! SSE event streams via HTTP.  Uses `reqwest` for the underlying connection.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::stream::SseSource;
10//! use stygian_graph::ports::stream_source::StreamSourcePort;
11//!
12//! # async fn example() {
13//! let source = SseSource::new(None);
14//! let events = source.subscribe("https://api.example.com/events", Some(5)).await.unwrap();
15//! println!("received {} events", events.len());
16//! # }
17//! ```
18
19use async_trait::async_trait;
20use reqwest::Client;
21use serde_json::json;
22
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26
27// ─────────────────────────────────────────────────────────────────────────────
28// SseSource
29// ─────────────────────────────────────────────────────────────────────────────
30
31/// Adapter: Server-Sent Events stream source.
32///
33/// Connects to an SSE endpoint and collects events until `max_events`
34/// is reached or the stream closes.
35pub struct SseSource {
36    client: Client,
37}
38
39impl SseSource {
40    /// Create a new SSE stream source.
41    ///
42    /// # Arguments
43    ///
44    /// * `client` - Optional pre-configured `reqwest::Client`.  If `None`,
45    ///   a default client is created.
46    #[must_use]
47    pub fn new(client: Option<Client>) -> Self {
48        Self {
49            client: client.unwrap_or_default(),
50        }
51    }
52
53    /// Parse a single SSE frame from accumulated field lines.
54    fn parse_event(lines: &[String]) -> Option<StreamEvent> {
55        let mut id = None;
56        let mut event_type = None;
57        let mut data_lines: Vec<&str> = Vec::new();
58
59        for line in lines {
60            if let Some(value) = line.strip_prefix("id:") {
61                id = Some(value.trim().to_string());
62            } else if let Some(value) = line.strip_prefix("event:") {
63                event_type = Some(value.trim().to_string());
64            } else if let Some(value) = line.strip_prefix("data:") {
65                data_lines.push(value.trim());
66            }
67        }
68
69        if data_lines.is_empty() {
70            return None;
71        }
72
73        Some(StreamEvent {
74            id,
75            event_type,
76            data: data_lines.join("\n"),
77        })
78    }
79}
80
81// ─────────────────────────────────────────────────────────────────────────────
82// StreamSourcePort
83// ─────────────────────────────────────────────────────────────────────────────
84
85#[async_trait]
86impl StreamSourcePort for SseSource {
87    async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>> {
88        let response = self
89            .client
90            .get(url)
91            .header("Accept", "text/event-stream")
92            .send()
93            .await
94            .map_err(|e| {
95                StygianError::Service(ServiceError::Unavailable(format!(
96                    "SSE connection to {url} failed: {e}"
97                )))
98            })?;
99
100        let text = response.text().await.map_err(|e| {
101            StygianError::Service(ServiceError::InvalidResponse(format!(
102                "failed to read SSE body: {e}"
103            )))
104        })?;
105
106        let mut events = Vec::new();
107        let mut current_frame: Vec<String> = Vec::new();
108
109        for line in text.lines() {
110            if line.is_empty() {
111                // Empty line = event boundary in SSE
112                if let Some(event) = Self::parse_event(&current_frame) {
113                    events.push(event);
114                    if let Some(max) = max_events
115                        && events.len() >= max
116                    {
117                        break;
118                    }
119                }
120                current_frame.clear();
121            } else if !line.starts_with(':') {
122                // Lines starting with ':' are SSE comments — skip them
123                current_frame.push(line.to_string());
124            }
125        }
126
127        // Handle final event if no trailing blank line
128        if !current_frame.is_empty()
129            && let Some(event) = Self::parse_event(&current_frame)
130        {
131            events.push(event);
132        }
133
134        Ok(events)
135    }
136
137    fn source_name(&self) -> &'static str {
138        "sse"
139    }
140}
141
142// ─────────────────────────────────────────────────────────────────────────────
143// ScrapingService (DAG integration)
144// ─────────────────────────────────────────────────────────────────────────────
145
146#[async_trait]
147impl ScrapingService for SseSource {
148    /// Connect to an SSE endpoint and collect events.
149    ///
150    /// Expected params:
151    /// ```json
152    /// { "max_events": 10 }
153    /// ```
154    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
155        let max_events = input
156            .params
157            .get("max_events")
158            .and_then(serde_json::Value::as_u64)
159            .and_then(|n| usize::try_from(n).ok());
160
161        let events = self.subscribe(&input.url, max_events).await?;
162        let event_count = events.len();
163
164        Ok(ServiceOutput {
165            data: serde_json::to_string(&events).unwrap_or_default(),
166            metadata: json!({
167                "source": "sse",
168                "event_count": event_count,
169            }),
170        })
171    }
172
173    fn name(&self) -> &'static str {
174        "stream"
175    }
176}
177
178// ─────────────────────────────────────────────────────────────────────────────
179// Tests
180// ─────────────────────────────────────────────────────────────────────────────
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn parse_event_basic() -> std::result::Result<(), Box<dyn std::error::Error>> {
188        let lines = vec![
189            "event:message".to_string(),
190            "data:{\"price\":29.99}".to_string(),
191        ];
192        let event = SseSource::parse_event(&lines)
193            .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
194        assert_eq!(event.event_type.as_deref(), Some("message"));
195        assert_eq!(event.data, r#"{"price":29.99}"#);
196        assert!(event.id.is_none());
197        Ok(())
198    }
199
200    #[test]
201    fn parse_event_with_id() -> std::result::Result<(), Box<dyn std::error::Error>> {
202        let lines = vec![
203            "id:42".to_string(),
204            "event:update".to_string(),
205            "data:hello".to_string(),
206        ];
207        let event = SseSource::parse_event(&lines)
208            .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
209        assert_eq!(event.id.as_deref(), Some("42"));
210        assert_eq!(event.event_type.as_deref(), Some("update"));
211        assert_eq!(event.data, "hello");
212        Ok(())
213    }
214
215    #[test]
216    fn parse_event_multiline_data() -> std::result::Result<(), Box<dyn std::error::Error>> {
217        let lines = vec!["data:line one".to_string(), "data:line two".to_string()];
218        let event = SseSource::parse_event(&lines)
219            .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
220        assert_eq!(event.data, "line one\nline two");
221        Ok(())
222    }
223
224    #[test]
225    fn parse_event_no_data_returns_none() {
226        let lines = vec!["event:ping".to_string()];
227        assert!(SseSource::parse_event(&lines).is_none());
228    }
229}