1use super::*;
6use anyhow::Result;
7use async_trait::async_trait;
8use reqwest;
9use std::time::{Duration, SystemTime};
10
11pub struct SseAdapter;
12
13#[async_trait]
14impl InputAdapter for SseAdapter {
15 fn name(&self) -> &'static str {
16 "SSE"
17 }
18
19 fn supported_formats(&self) -> Vec<&'static str> {
20 vec!["sse", "events", "stream"]
21 }
22
23 async fn can_handle(&self, input: &InputSource) -> bool {
24 match input {
25 InputSource::Url(url) => {
26 url.contains("/events") || url.contains("/stream") || url.contains("sse")
27 }
28 InputSource::Raw { format_hint, .. } => {
29 format_hint.as_ref().map(|h| h == "sse").unwrap_or(false)
30 }
31 _ => false,
32 }
33 }
34
35 async fn parse(&self, input: InputSource) -> Result<ContextNode> {
36 match input {
37 InputSource::Url(url) => self.parse_sse_stream(&url).await,
38 InputSource::Raw { data, .. } => self.parse_sse_data(&data).await,
39 _ => anyhow::bail!("SSE adapter requires URL or raw data"),
40 }
41 }
42}
43
44impl SseAdapter {
45 async fn parse_sse_stream(&self, url: &str) -> Result<ContextNode> {
46 let _client = reqwest::Client::new();
47
48 let mut root = ContextNode {
50 id: url.to_string(),
51 name: "Event Stream".to_string(),
52 node_type: NodeType::EventSource,
53 quantum_state: Some(QuantumState {
54 amplitude: 1.0,
55 frequency: 1.0, phase: 0.0,
57 collapse_probability: 0.0, }),
59 children: vec![],
60 metadata: serde_json::json!({
61 "url": url,
62 "status": "connecting",
63 "event_count": 0
64 }),
65 entanglements: vec![],
66 };
67
68 root.children = vec![
71 ContextNode {
72 id: format!("{}/types", url),
73 name: "Event Types".to_string(),
74 node_type: NodeType::Directory,
75 quantum_state: None,
76 children: vec![
77 self.create_event_type_node("user_action", 10.5),
78 self.create_event_type_node("system_update", 2.0),
79 self.create_event_type_node("error", 0.1),
80 ],
81 metadata: serde_json::json!({}),
82 entanglements: vec![],
83 },
84 ContextNode {
85 id: format!("{}/timeline", url),
86 name: "Timeline".to_string(),
87 node_type: NodeType::Directory,
88 quantum_state: None,
89 children: self.create_timeline_nodes(),
90 metadata: serde_json::json!({}),
91 entanglements: vec![],
92 },
93 ];
94
95 Ok(root)
96 }
97
98 async fn parse_sse_data(&self, data: &[u8]) -> Result<ContextNode> {
99 let content = String::from_utf8_lossy(data);
100 let mut events = Vec::new();
101
102 for line in content.lines() {
104 if let Some(event_data) = line.strip_prefix("data: ") {
105 if let Ok(json) = serde_json::from_str::<serde_json::Value>(event_data) {
106 events.push(json);
107 }
108 }
109 }
110
111 Ok(ContextNode {
112 id: "sse_data".to_string(),
113 name: "SSE Events".to_string(),
114 node_type: NodeType::EventSource,
115 quantum_state: None,
116 children: events
117 .iter()
118 .enumerate()
119 .map(|(i, event)| ContextNode {
120 id: format!("event_{}", i),
121 name: event
122 .get("type")
123 .and_then(|t| t.as_str())
124 .unwrap_or("event")
125 .to_string(),
126 node_type: NodeType::EventType,
127 quantum_state: None,
128 children: vec![],
129 metadata: event.clone(),
130 entanglements: vec![],
131 })
132 .collect(),
133 metadata: serde_json::json!({
134 "event_count": events.len()
135 }),
136 entanglements: vec![],
137 })
138 }
139
140 fn create_event_type_node(&self, event_type: &str, frequency: f64) -> ContextNode {
141 ContextNode {
142 id: format!("type_{}", event_type),
143 name: event_type.to_string(),
144 node_type: NodeType::EventType,
145 quantum_state: Some(QuantumState {
146 amplitude: frequency / 10.0, frequency,
148 phase: 0.0,
149 collapse_probability: 0.5,
150 }),
151 children: vec![],
152 metadata: serde_json::json!({
153 "average_per_second": frequency,
154 "total_count": (frequency * 3600.0) as u64, }),
156 entanglements: vec![],
157 }
158 }
159
160 fn create_timeline_nodes(&self) -> Vec<ContextNode> {
161 let now = SystemTime::now();
162 let mut nodes = Vec::new();
163
164 for i in 0..5 {
166 let bucket_time = now - Duration::from_secs(i * 60);
167 nodes.push(ContextNode {
168 id: format!("bucket_{}", i),
169 name: format!("{} min ago", i),
170 node_type: NodeType::Directory,
171 quantum_state: Some(QuantumState {
172 amplitude: 1.0 / (i as f64 + 1.0), frequency: 1.0,
174 phase: i as f64 * std::f64::consts::PI / 5.0,
175 collapse_probability: i as f64 / 5.0,
176 }),
177 children: vec![],
178 metadata: serde_json::json!({
179 "timestamp": bucket_time,
180 "event_count": 42 - (i * 8), }),
182 entanglements: if i > 0 {
183 vec![Entanglement {
184 target_id: format!("bucket_{}", i - 1),
185 strength: 0.8,
186 relationship: "temporal_sequence".to_string(),
187 }]
188 } else {
189 vec![]
190 },
191 });
192 }
193
194 nodes
195 }
196}