stygian_graph/adapters/
stream.rs1use async_trait::async_trait;
20use reqwest::Client;
21use serde_json::json;
22
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26
27pub struct SseSource {
36 client: Client,
37}
38
39impl SseSource {
40 #[must_use]
47 pub fn new(client: Option<Client>) -> Self {
48 Self {
49 client: client.unwrap_or_default(),
50 }
51 }
52
53 fn parse_event(lines: &[String]) -> Option<StreamEvent> {
55 let mut id = None;
56 let mut event_type = None;
57 let mut data_lines: Vec<&str> = Vec::new();
58
59 for line in lines {
60 if let Some(value) = line.strip_prefix("id:") {
61 id = Some(value.trim().to_string());
62 } else if let Some(value) = line.strip_prefix("event:") {
63 event_type = Some(value.trim().to_string());
64 } else if let Some(value) = line.strip_prefix("data:") {
65 data_lines.push(value.trim());
66 }
67 }
68
69 if data_lines.is_empty() {
70 return None;
71 }
72
73 Some(StreamEvent {
74 id,
75 event_type,
76 data: data_lines.join("\n"),
77 })
78 }
79}
80
81#[async_trait]
86impl StreamSourcePort for SseSource {
87 async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>> {
88 let response = self
89 .client
90 .get(url)
91 .header("Accept", "text/event-stream")
92 .send()
93 .await
94 .map_err(|e| {
95 StygianError::Service(ServiceError::Unavailable(format!(
96 "SSE connection to {url} failed: {e}"
97 )))
98 })?;
99
100 let text = response.text().await.map_err(|e| {
101 StygianError::Service(ServiceError::InvalidResponse(format!(
102 "failed to read SSE body: {e}"
103 )))
104 })?;
105
106 let mut events = Vec::new();
107 let mut current_frame: Vec<String> = Vec::new();
108
109 for line in text.lines() {
110 if line.is_empty() {
111 if let Some(event) = Self::parse_event(¤t_frame) {
113 events.push(event);
114 if let Some(max) = max_events
115 && events.len() >= max
116 {
117 break;
118 }
119 }
120 current_frame.clear();
121 } else if !line.starts_with(':') {
122 current_frame.push(line.to_string());
124 }
125 }
126
127 if !current_frame.is_empty()
129 && let Some(event) = Self::parse_event(¤t_frame)
130 {
131 events.push(event);
132 }
133
134 Ok(events)
135 }
136
137 fn source_name(&self) -> &str {
138 "sse"
139 }
140}
141
142#[async_trait]
147impl ScrapingService for SseSource {
148 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
155 let max_events = input.params["max_events"].as_u64().map(|n| n as usize);
156
157 let events = self.subscribe(&input.url, max_events).await?;
158 let event_count = events.len();
159
160 Ok(ServiceOutput {
161 data: serde_json::to_string(&events).unwrap_or_default(),
162 metadata: json!({
163 "source": "sse",
164 "event_count": event_count,
165 }),
166 })
167 }
168
169 fn name(&self) -> &'static str {
170 "stream"
171 }
172}
173
174#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn parse_event_basic() {
184 let lines = vec![
185 "event:message".to_string(),
186 "data:{\"price\":29.99}".to_string(),
187 ];
188 let event = SseSource::parse_event(&lines).unwrap();
189 assert_eq!(event.event_type.as_deref(), Some("message"));
190 assert_eq!(event.data, r#"{"price":29.99}"#);
191 assert!(event.id.is_none());
192 }
193
194 #[test]
195 fn parse_event_with_id() {
196 let lines = vec![
197 "id:42".to_string(),
198 "event:update".to_string(),
199 "data:hello".to_string(),
200 ];
201 let event = SseSource::parse_event(&lines).unwrap();
202 assert_eq!(event.id.as_deref(), Some("42"));
203 assert_eq!(event.event_type.as_deref(), Some("update"));
204 assert_eq!(event.data, "hello");
205 }
206
207 #[test]
208 fn parse_event_multiline_data() {
209 let lines = vec!["data:line one".to_string(), "data:line two".to_string()];
210 let event = SseSource::parse_event(&lines).unwrap();
211 assert_eq!(event.data, "line one\nline two");
212 }
213
214 #[test]
215 fn parse_event_no_data_returns_none() {
216 let lines = vec!["event:ping".to_string()];
217 assert!(SseSource::parse_event(&lines).is_none());
218 }
219}