Skip to main content

st/inputs/
sse.rs

1//! Server-Sent Events (SSE) input adapter
2//!
3//! Visualizes real-time event streams as living trees
4
5use super::*;
6use anyhow::Result;
7use async_trait::async_trait;
8use reqwest;
9use std::time::{Duration, SystemTime};
10
11pub struct SseAdapter;
12
13#[async_trait]
14impl InputAdapter for SseAdapter {
15    fn name(&self) -> &'static str {
16        "SSE"
17    }
18
19    fn supported_formats(&self) -> Vec<&'static str> {
20        vec!["sse", "events", "stream"]
21    }
22
23    async fn can_handle(&self, input: &InputSource) -> bool {
24        match input {
25            InputSource::Url(url) => {
26                url.contains("/events") || url.contains("/stream") || url.contains("sse")
27            }
28            InputSource::Raw { format_hint, .. } => {
29                format_hint.as_ref().map(|h| h == "sse").unwrap_or(false)
30            }
31            _ => false,
32        }
33    }
34
35    async fn parse(&self, input: InputSource) -> Result<ContextNode> {
36        match input {
37            InputSource::Url(url) => self.parse_sse_stream(&url).await,
38            InputSource::Raw { data, .. } => self.parse_sse_data(&data).await,
39            _ => anyhow::bail!("SSE adapter requires URL or raw data"),
40        }
41    }
42}
43
44impl SseAdapter {
45    async fn parse_sse_stream(&self, url: &str) -> Result<ContextNode> {
46        let _client = reqwest::Client::new();
47
48        // Create root node for event stream
49        let mut root = ContextNode {
50            id: url.to_string(),
51            name: "Event Stream".to_string(),
52            node_type: NodeType::EventSource,
53            quantum_state: Some(QuantumState {
54                amplitude: 1.0,
55                frequency: 1.0, // 1 Hz update rate
56                phase: 0.0,
57                collapse_probability: 0.0, // Never collapses, always streaming
58            }),
59            children: vec![],
60            metadata: serde_json::json!({
61                "url": url,
62                "status": "connecting",
63                "event_count": 0
64            }),
65            entanglements: vec![],
66        };
67
68        // For demo, just show structure
69        // In real implementation, would stream events
70        root.children = vec![
71            ContextNode {
72                id: format!("{}/types", url),
73                name: "Event Types".to_string(),
74                node_type: NodeType::Directory,
75                quantum_state: None,
76                children: vec![
77                    self.create_event_type_node("user_action", 10.5),
78                    self.create_event_type_node("system_update", 2.0),
79                    self.create_event_type_node("error", 0.1),
80                ],
81                metadata: serde_json::json!({}),
82                entanglements: vec![],
83            },
84            ContextNode {
85                id: format!("{}/timeline", url),
86                name: "Timeline".to_string(),
87                node_type: NodeType::Directory,
88                quantum_state: None,
89                children: self.create_timeline_nodes(),
90                metadata: serde_json::json!({}),
91                entanglements: vec![],
92            },
93        ];
94
95        Ok(root)
96    }
97
98    async fn parse_sse_data(&self, data: &[u8]) -> Result<ContextNode> {
99        let content = String::from_utf8_lossy(data);
100        let mut events = Vec::new();
101
102        // Parse SSE format
103        for line in content.lines() {
104            if let Some(event_data) = line.strip_prefix("data: ") {
105                if let Ok(json) = serde_json::from_str::<serde_json::Value>(event_data) {
106                    events.push(json);
107                }
108            }
109        }
110
111        Ok(ContextNode {
112            id: "sse_data".to_string(),
113            name: "SSE Events".to_string(),
114            node_type: NodeType::EventSource,
115            quantum_state: None,
116            children: events
117                .iter()
118                .enumerate()
119                .map(|(i, event)| ContextNode {
120                    id: format!("event_{}", i),
121                    name: event
122                        .get("type")
123                        .and_then(|t| t.as_str())
124                        .unwrap_or("event")
125                        .to_string(),
126                    node_type: NodeType::EventType,
127                    quantum_state: None,
128                    children: vec![],
129                    metadata: event.clone(),
130                    entanglements: vec![],
131                })
132                .collect(),
133            metadata: serde_json::json!({
134                "event_count": events.len()
135            }),
136            entanglements: vec![],
137        })
138    }
139
140    fn create_event_type_node(&self, event_type: &str, frequency: f64) -> ContextNode {
141        ContextNode {
142            id: format!("type_{}", event_type),
143            name: event_type.to_string(),
144            node_type: NodeType::EventType,
145            quantum_state: Some(QuantumState {
146                amplitude: frequency / 10.0, // Normalize
147                frequency,
148                phase: 0.0,
149                collapse_probability: 0.5,
150            }),
151            children: vec![],
152            metadata: serde_json::json!({
153                "average_per_second": frequency,
154                "total_count": (frequency * 3600.0) as u64, // Last hour
155            }),
156            entanglements: vec![],
157        }
158    }
159
160    fn create_timeline_nodes(&self) -> Vec<ContextNode> {
161        let now = SystemTime::now();
162        let mut nodes = Vec::new();
163
164        // Create nodes for last 5 time buckets
165        for i in 0..5 {
166            let bucket_time = now - Duration::from_secs(i * 60);
167            nodes.push(ContextNode {
168                id: format!("bucket_{}", i),
169                name: format!("{} min ago", i),
170                node_type: NodeType::Directory,
171                quantum_state: Some(QuantumState {
172                    amplitude: 1.0 / (i as f64 + 1.0), // Decay over time
173                    frequency: 1.0,
174                    phase: i as f64 * std::f64::consts::PI / 5.0,
175                    collapse_probability: i as f64 / 5.0,
176                }),
177                children: vec![],
178                metadata: serde_json::json!({
179                    "timestamp": bucket_time,
180                    "event_count": 42 - (i * 8), // Simulated decay
181                }),
182                entanglements: if i > 0 {
183                    vec![Entanglement {
184                        target_id: format!("bucket_{}", i - 1),
185                        strength: 0.8,
186                        relationship: "temporal_sequence".to_string(),
187                    }]
188                } else {
189                    vec![]
190                },
191            });
192        }
193
194        nodes
195    }
196}