use async_trait::async_trait;
use reqwest::Client;
use serde_json::json;
use crate::domain::error::{Result, ServiceError, StygianError};
use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
pub struct SseSource {
client: Client,
}
impl SseSource {
#[must_use]
pub fn new(client: Option<Client>) -> Self {
Self {
client: client.unwrap_or_default(),
}
}
fn parse_event(lines: &[String]) -> Option<StreamEvent> {
let mut id = None;
let mut event_type = None;
let mut data_lines: Vec<&str> = Vec::new();
for line in lines {
if let Some(value) = line.strip_prefix("id:") {
id = Some(value.trim().to_string());
} else if let Some(value) = line.strip_prefix("event:") {
event_type = Some(value.trim().to_string());
} else if let Some(value) = line.strip_prefix("data:") {
data_lines.push(value.trim());
}
}
if data_lines.is_empty() {
return None;
}
Some(StreamEvent {
id,
event_type,
data: data_lines.join("\n"),
})
}
}
#[async_trait]
impl StreamSourcePort for SseSource {
async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>> {
let response = self
.client
.get(url)
.header("Accept", "text/event-stream")
.send()
.await
.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"SSE connection to {url} failed: {e}"
)))
})?;
let text = response.text().await.map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"failed to read SSE body: {e}"
)))
})?;
let mut events = Vec::new();
let mut current_frame: Vec<String> = Vec::new();
for line in text.lines() {
if line.is_empty() {
if let Some(event) = Self::parse_event(¤t_frame) {
events.push(event);
if let Some(max) = max_events
&& events.len() >= max
{
break;
}
}
current_frame.clear();
} else if !line.starts_with(':') {
current_frame.push(line.to_string());
}
}
if !current_frame.is_empty()
&& let Some(event) = Self::parse_event(¤t_frame)
{
events.push(event);
}
Ok(events)
}
fn source_name(&self) -> &'static str {
"sse"
}
}
#[async_trait]
impl ScrapingService for SseSource {
async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
let max_events = input
.params
.get("max_events")
.and_then(serde_json::Value::as_u64)
.and_then(|n| usize::try_from(n).ok());
let events = self.subscribe(&input.url, max_events).await?;
let event_count = events.len();
Ok(ServiceOutput {
data: serde_json::to_string(&events).unwrap_or_default(),
metadata: json!({
"source": "sse",
"event_count": event_count,
}),
})
}
fn name(&self) -> &'static str {
"stream"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_event_basic() -> std::result::Result<(), Box<dyn std::error::Error>> {
let lines = vec![
"event:message".to_string(),
"data:{\"price\":29.99}".to_string(),
];
let event = SseSource::parse_event(&lines)
.ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
assert_eq!(event.event_type.as_deref(), Some("message"));
assert_eq!(event.data, r#"{"price":29.99}"#);
assert!(event.id.is_none());
Ok(())
}
#[test]
fn parse_event_with_id() -> std::result::Result<(), Box<dyn std::error::Error>> {
let lines = vec![
"id:42".to_string(),
"event:update".to_string(),
"data:hello".to_string(),
];
let event = SseSource::parse_event(&lines)
.ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
assert_eq!(event.id.as_deref(), Some("42"));
assert_eq!(event.event_type.as_deref(), Some("update"));
assert_eq!(event.data, "hello");
Ok(())
}
#[test]
fn parse_event_multiline_data() -> std::result::Result<(), Box<dyn std::error::Error>> {
let lines = vec!["data:line one".to_string(), "data:line two".to_string()];
let event = SseSource::parse_event(&lines)
.ok_or_else(|| std::io::Error::other("expected parse_event to return Some"))?;
assert_eq!(event.data, "line one\nline two");
Ok(())
}
#[test]
fn parse_event_no_data_returns_none() {
let lines = vec!["event:ping".to_string()];
assert!(SseSource::parse_event(&lines).is_none());
}
}