stygian_graph/adapters/
stream.rs1use async_trait::async_trait;
20use reqwest::Client;
21use serde_json::json;
22
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26
27pub struct SseSource {
36 client: Client,
37}
38
39impl SseSource {
40 #[must_use]
47 pub fn new(client: Option<Client>) -> Self {
48 Self {
49 client: client.unwrap_or_default(),
50 }
51 }
52
53 fn parse_event(lines: &[String]) -> Option<StreamEvent> {
55 let mut id = None;
56 let mut event_type = None;
57 let mut data_lines: Vec<&str> = Vec::new();
58
59 for line in lines {
60 if let Some(value) = line.strip_prefix("id:") {
61 id = Some(value.trim().to_string());
62 } else if let Some(value) = line.strip_prefix("event:") {
63 event_type = Some(value.trim().to_string());
64 } else if let Some(value) = line.strip_prefix("data:") {
65 data_lines.push(value.trim());
66 }
67 }
68
69 if data_lines.is_empty() {
70 return None;
71 }
72
73 Some(StreamEvent {
74 id,
75 event_type,
76 data: data_lines.join("\n"),
77 })
78 }
79}
80
81#[async_trait]
86impl StreamSourcePort for SseSource {
87 async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>> {
88 let response = self
89 .client
90 .get(url)
91 .header("Accept", "text/event-stream")
92 .send()
93 .await
94 .map_err(|e| {
95 StygianError::Service(ServiceError::Unavailable(format!(
96 "SSE connection to {url} failed: {e}"
97 )))
98 })?;
99
100 let text = response.text().await.map_err(|e| {
101 StygianError::Service(ServiceError::InvalidResponse(format!(
102 "failed to read SSE body: {e}"
103 )))
104 })?;
105
106 let mut events = Vec::new();
107 let mut current_frame: Vec<String> = Vec::new();
108
109 for line in text.lines() {
110 if line.is_empty() {
111 if let Some(event) = Self::parse_event(¤t_frame) {
113 events.push(event);
114 if let Some(max) = max_events
115 && events.len() >= max
116 {
117 break;
118 }
119 }
120 current_frame.clear();
121 } else if !line.starts_with(':') {
122 current_frame.push(line.to_string());
124 }
125 }
126
127 if !current_frame.is_empty()
129 && let Some(event) = Self::parse_event(¤t_frame)
130 {
131 events.push(event);
132 }
133
134 Ok(events)
135 }
136
137 fn source_name(&self) -> &'static str {
138 "sse"
139 }
140}
141
142#[async_trait]
147impl ScrapingService for SseSource {
148 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
155 let max_events = input
156 .params
157 .get("max_events")
158 .and_then(serde_json::Value::as_u64)
159 .and_then(|n| usize::try_from(n).ok());
160
161 let events = self.subscribe(&input.url, max_events).await?;
162 let event_count = events.len();
163
164 Ok(ServiceOutput {
165 data: serde_json::to_string(&events).unwrap_or_default(),
166 metadata: json!({
167 "source": "sse",
168 "event_count": event_count,
169 }),
170 })
171 }
172
173 fn name(&self) -> &'static str {
174 "stream"
175 }
176}
177
178#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn parse_event_basic() -> std::result::Result<(), Box<dyn std::error::Error>> {
188 let lines = vec![
189 "event:message".to_string(),
190 "data:{\"price\":29.99}".to_string(),
191 ];
192 let event = SseSource::parse_event(&lines)
193 .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
194 assert_eq!(event.event_type.as_deref(), Some("message"));
195 assert_eq!(event.data, r#"{"price":29.99}"#);
196 assert!(event.id.is_none());
197 Ok(())
198 }
199
200 #[test]
201 fn parse_event_with_id() -> std::result::Result<(), Box<dyn std::error::Error>> {
202 let lines = vec![
203 "id:42".to_string(),
204 "event:update".to_string(),
205 "data:hello".to_string(),
206 ];
207 let event = SseSource::parse_event(&lines)
208 .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
209 assert_eq!(event.id.as_deref(), Some("42"));
210 assert_eq!(event.event_type.as_deref(), Some("update"));
211 assert_eq!(event.data, "hello");
212 Ok(())
213 }
214
215 #[test]
216 fn parse_event_multiline_data() -> std::result::Result<(), Box<dyn std::error::Error>> {
217 let lines = vec!["data:line one".to_string(), "data:line two".to_string()];
218 let event = SseSource::parse_event(&lines)
219 .ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
220 assert_eq!(event.data, "line one\nline two");
221 Ok(())
222 }
223
224 #[test]
225 fn parse_event_no_data_returns_none() {
226 let lines = vec!["event:ping".to_string()];
227 assert!(SseSource::parse_event(&lines).is_none());
228 }
229}