Skip to main content

stygian_graph/adapters/
stream.rs

1//! Server-Sent Events (SSE) stream adapter.
2//!
3//! Implements [`StreamSourcePort`](crate::ports::stream_source::StreamSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) for consuming
4//! SSE event streams via HTTP.  Uses `reqwest` for the underlying connection.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::stream::SseSource;
10//! use stygian_graph::ports::stream_source::StreamSourcePort;
11//!
12//! # async fn example() {
13//! let source = SseSource::new(None);
14//! let events = source.subscribe("https://api.example.com/events", Some(5)).await.unwrap();
15//! println!("received {} events", events.len());
16//! # }
17//! ```
18
19use async_trait::async_trait;
20use reqwest::Client;
21use serde_json::json;
22
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26
27// ─────────────────────────────────────────────────────────────────────────────
28// SseSource
29// ─────────────────────────────────────────────────────────────────────────────
30
31/// Adapter: Server-Sent Events stream source.
32///
33/// Connects to an SSE endpoint and collects events until `max_events`
34/// is reached or the stream closes.
35pub struct SseSource {
36    client: Client,
37}
38
39impl SseSource {
40    /// Create a new SSE stream source.
41    ///
42    /// # Arguments
43    ///
44    /// * `client` - Optional pre-configured `reqwest::Client`.  If `None`,
45    ///   a default client is created.
46    #[must_use]
47    pub fn new(client: Option<Client>) -> Self {
48        Self {
49            client: client.unwrap_or_default(),
50        }
51    }
52
53    /// Parse a single SSE frame from accumulated field lines.
54    fn parse_event(lines: &[String]) -> Option<StreamEvent> {
55        let mut id = None;
56        let mut event_type = None;
57        let mut data_lines: Vec<&str> = Vec::new();
58
59        for line in lines {
60            if let Some(value) = line.strip_prefix("id:") {
61                id = Some(value.trim().to_string());
62            } else if let Some(value) = line.strip_prefix("event:") {
63                event_type = Some(value.trim().to_string());
64            } else if let Some(value) = line.strip_prefix("data:") {
65                data_lines.push(value.trim());
66            }
67        }
68
69        if data_lines.is_empty() {
70            return None;
71        }
72
73        Some(StreamEvent {
74            id,
75            event_type,
76            data: data_lines.join("\n"),
77        })
78    }
79}
80
81// ─────────────────────────────────────────────────────────────────────────────
82// StreamSourcePort
83// ─────────────────────────────────────────────────────────────────────────────
84
85#[async_trait]
86impl StreamSourcePort for SseSource {
87    async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>> {
88        let response = self
89            .client
90            .get(url)
91            .header("Accept", "text/event-stream")
92            .send()
93            .await
94            .map_err(|e| {
95                StygianError::Service(ServiceError::Unavailable(format!(
96                    "SSE connection to {url} failed: {e}"
97                )))
98            })?;
99
100        let text = response.text().await.map_err(|e| {
101            StygianError::Service(ServiceError::InvalidResponse(format!(
102                "failed to read SSE body: {e}"
103            )))
104        })?;
105
106        let mut events = Vec::new();
107        let mut current_frame: Vec<String> = Vec::new();
108
109        for line in text.lines() {
110            if line.is_empty() {
111                // Empty line = event boundary in SSE
112                if let Some(event) = Self::parse_event(&current_frame) {
113                    events.push(event);
114                    if let Some(max) = max_events
115                        && events.len() >= max
116                    {
117                        break;
118                    }
119                }
120                current_frame.clear();
121            } else if !line.starts_with(':') {
122                // Lines starting with ':' are SSE comments — skip them
123                current_frame.push(line.to_string());
124            }
125        }
126
127        // Handle final event if no trailing blank line
128        if !current_frame.is_empty()
129            && let Some(event) = Self::parse_event(&current_frame)
130        {
131            events.push(event);
132        }
133
134        Ok(events)
135    }
136
137    fn source_name(&self) -> &str {
138        "sse"
139    }
140}
141
142// ─────────────────────────────────────────────────────────────────────────────
143// ScrapingService (DAG integration)
144// ─────────────────────────────────────────────────────────────────────────────
145
146#[async_trait]
147impl ScrapingService for SseSource {
148    /// Connect to an SSE endpoint and collect events.
149    ///
150    /// Expected params:
151    /// ```json
152    /// { "max_events": 10 }
153    /// ```
154    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
155        let max_events = input.params["max_events"].as_u64().map(|n| n as usize);
156
157        let events = self.subscribe(&input.url, max_events).await?;
158        let event_count = events.len();
159
160        Ok(ServiceOutput {
161            data: serde_json::to_string(&events).unwrap_or_default(),
162            metadata: json!({
163                "source": "sse",
164                "event_count": event_count,
165            }),
166        })
167    }
168
169    fn name(&self) -> &'static str {
170        "stream"
171    }
172}
173
174// ─────────────────────────────────────────────────────────────────────────────
175// Tests
176// ─────────────────────────────────────────────────────────────────────────────
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn parse_event_basic() {
184        let lines = vec![
185            "event:message".to_string(),
186            "data:{\"price\":29.99}".to_string(),
187        ];
188        let event = SseSource::parse_event(&lines).unwrap();
189        assert_eq!(event.event_type.as_deref(), Some("message"));
190        assert_eq!(event.data, r#"{"price":29.99}"#);
191        assert!(event.id.is_none());
192    }
193
194    #[test]
195    fn parse_event_with_id() {
196        let lines = vec![
197            "id:42".to_string(),
198            "event:update".to_string(),
199            "data:hello".to_string(),
200        ];
201        let event = SseSource::parse_event(&lines).unwrap();
202        assert_eq!(event.id.as_deref(), Some("42"));
203        assert_eq!(event.event_type.as_deref(), Some("update"));
204        assert_eq!(event.data, "hello");
205    }
206
207    #[test]
208    fn parse_event_multiline_data() {
209        let lines = vec!["data:line one".to_string(), "data:line two".to_string()];
210        let event = SseSource::parse_event(&lines).unwrap();
211        assert_eq!(event.data, "line one\nline two");
212    }
213
214    #[test]
215    fn parse_event_no_data_returns_none() {
216        let lines = vec!["event:ping".to_string()];
217        assert!(SseSource::parse_event(&lines).is_none());
218    }
219}