1use anyhow::Result;
7use eventsource_client::{self as es, Client};
8use futures::stream::StreamExt;
9use serde::Deserialize;
10use tokio::sync::mpsc;
11
12#[derive(Debug, Clone)]
14pub enum OpenCodeEvent {
15 Connected,
17
18 MessageStart {
20 session_id: String,
21 message_id: String,
22 },
23
24 TextDelta { session_id: String, text: String },
26
27 ToolStart {
29 session_id: String,
30 tool_id: String,
31 tool_name: String,
32 input: serde_json::Value,
33 },
34
35 ToolResult {
37 session_id: String,
38 tool_id: String,
39 tool_name: String,
40 output: String,
41 success: bool,
42 },
43
44 MessageComplete { session_id: String, success: bool },
46
47 SessionError { session_id: String, error: String },
49
50 Unknown { event_type: String, data: String },
52}
53
54#[derive(Debug, Deserialize)]
56struct RawEvent {
57 #[serde(rename = "type")]
58 event_type: String,
59 #[serde(default)]
60 session_id: Option<String>,
61 #[serde(flatten)]
62 data: serde_json::Value,
63}
64
65impl OpenCodeEvent {
66 pub fn parse(event_type: &str, data: &str) -> Self {
68 let parsed: Result<RawEvent, _> = serde_json::from_str(data);
70
71 match parsed {
72 Ok(raw) => Self::from_raw(&raw),
73 Err(_) => {
74 match event_type {
76 "server.connected" | "connected" => OpenCodeEvent::Connected,
77 _ => OpenCodeEvent::Unknown {
78 event_type: event_type.to_string(),
79 data: data.to_string(),
80 },
81 }
82 }
83 }
84 }
85
86 fn from_raw(raw: &RawEvent) -> Self {
87 let session_id = raw.session_id.clone().unwrap_or_default();
88
89 match raw.event_type.as_str() {
90 "message.start" => OpenCodeEvent::MessageStart {
91 session_id,
92 message_id: raw
93 .data
94 .get("message_id")
95 .and_then(|v| v.as_str())
96 .unwrap_or("")
97 .to_string(),
98 },
99
100 "text.delta" | "content.delta" => OpenCodeEvent::TextDelta {
101 session_id,
102 text: raw
103 .data
104 .get("text")
105 .or_else(|| raw.data.get("delta"))
106 .and_then(|v| v.as_str())
107 .unwrap_or("")
108 .to_string(),
109 },
110
111 "tool.start" | "tool_use.start" => OpenCodeEvent::ToolStart {
112 session_id,
113 tool_id: raw
114 .data
115 .get("tool_id")
116 .or_else(|| raw.data.get("id"))
117 .and_then(|v| v.as_str())
118 .unwrap_or("")
119 .to_string(),
120 tool_name: raw
121 .data
122 .get("tool")
123 .or_else(|| raw.data.get("name"))
124 .and_then(|v| v.as_str())
125 .unwrap_or("")
126 .to_string(),
127 input: raw
128 .data
129 .get("input")
130 .cloned()
131 .unwrap_or(serde_json::Value::Null),
132 },
133
134 "tool.result" | "tool_use.result" => OpenCodeEvent::ToolResult {
135 session_id,
136 tool_id: raw
137 .data
138 .get("tool_id")
139 .or_else(|| raw.data.get("id"))
140 .and_then(|v| v.as_str())
141 .unwrap_or("")
142 .to_string(),
143 tool_name: raw
144 .data
145 .get("tool")
146 .or_else(|| raw.data.get("name"))
147 .and_then(|v| v.as_str())
148 .unwrap_or("")
149 .to_string(),
150 output: raw
151 .data
152 .get("output")
153 .and_then(|v| v.as_str())
154 .unwrap_or("")
155 .to_string(),
156 success: raw
157 .data
158 .get("success")
159 .and_then(|v| v.as_bool())
160 .unwrap_or(true),
161 },
162
163 "message.complete" | "message.done" => OpenCodeEvent::MessageComplete {
164 session_id,
165 success: raw
166 .data
167 .get("success")
168 .and_then(|v| v.as_bool())
169 .unwrap_or(true),
170 },
171
172 "session.error" | "error" => OpenCodeEvent::SessionError {
173 session_id,
174 error: raw
175 .data
176 .get("error")
177 .and_then(|v| v.as_str())
178 .unwrap_or("Unknown error")
179 .to_string(),
180 },
181
182 _ => OpenCodeEvent::Unknown {
183 event_type: raw.event_type.clone(),
184 data: serde_json::to_string(&raw.data).unwrap_or_default(),
185 },
186 }
187 }
188
189 pub fn session_id(&self) -> Option<&str> {
191 match self {
192 OpenCodeEvent::MessageStart { session_id, .. } => Some(session_id),
193 OpenCodeEvent::TextDelta { session_id, .. } => Some(session_id),
194 OpenCodeEvent::ToolStart { session_id, .. } => Some(session_id),
195 OpenCodeEvent::ToolResult { session_id, .. } => Some(session_id),
196 OpenCodeEvent::MessageComplete { session_id, .. } => Some(session_id),
197 OpenCodeEvent::SessionError { session_id, .. } => Some(session_id),
198 _ => None,
199 }
200 }
201
202 pub fn is_terminal(&self) -> bool {
204 matches!(
205 self,
206 OpenCodeEvent::MessageComplete { .. } | OpenCodeEvent::SessionError { .. }
207 )
208 }
209}
210
211pub struct EventStream {
213 rx: mpsc::Receiver<OpenCodeEvent>,
214 _handle: tokio::task::JoinHandle<()>,
215}
216
217impl EventStream {
218 pub async fn connect(url: &str) -> Result<Self> {
220 let (tx, rx) = mpsc::channel(1000);
221 let url = url.to_string();
222
223 let handle = tokio::spawn(async move {
224 let client = match es::ClientBuilder::for_url(&url) {
225 Ok(builder) => builder.build(),
226 Err(e) => {
227 eprintln!("Failed to create SSE client: {}", e);
228 return;
229 }
230 };
231
232 let mut stream = Box::pin(client.stream());
233
234 while let Some(event) = stream.next().await {
235 match event {
236 Ok(es::SSE::Event(ev)) => {
237 let parsed = OpenCodeEvent::parse(&ev.event_type, &ev.data);
238 if tx.send(parsed).await.is_err() {
239 break; }
241 }
242 Ok(es::SSE::Comment(_)) => continue,
243 Ok(es::SSE::Connected(_)) => {
244 let _ = tx.send(OpenCodeEvent::Connected).await;
245 }
246 Err(e) => {
247 eprintln!("SSE error: {}", e);
248 break;
249 }
250 }
251 }
252 });
253
254 Ok(Self {
255 rx,
256 _handle: handle,
257 })
258 }
259
260 pub async fn recv(&mut self) -> Option<OpenCodeEvent> {
262 self.rx.recv().await
263 }
264
265 pub fn try_recv(&mut self) -> Option<OpenCodeEvent> {
267 self.rx.try_recv().ok()
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_parse_message_start() {
277 let data = r#"{"type": "message.start", "session_id": "abc123", "message_id": "msg1"}"#;
278 let event = OpenCodeEvent::parse("message", data);
279
280 match event {
281 OpenCodeEvent::MessageStart {
282 session_id,
283 message_id,
284 } => {
285 assert_eq!(session_id, "abc123");
286 assert_eq!(message_id, "msg1");
287 }
288 _ => panic!("Expected MessageStart, got {:?}", event),
289 }
290 }
291
292 #[test]
293 fn test_parse_text_delta() {
294 let data = r#"{"type": "text.delta", "session_id": "abc", "text": "Hello world"}"#;
295 let event = OpenCodeEvent::parse("text", data);
296
297 match event {
298 OpenCodeEvent::TextDelta { session_id, text } => {
299 assert_eq!(session_id, "abc");
300 assert_eq!(text, "Hello world");
301 }
302 _ => panic!("Expected TextDelta, got {:?}", event),
303 }
304 }
305
306 #[test]
307 fn test_parse_text_delta_with_delta_field() {
308 let data = r#"{"type": "content.delta", "session_id": "abc", "delta": "content"}"#;
310 let event = OpenCodeEvent::parse("content", data);
311
312 match event {
313 OpenCodeEvent::TextDelta { text, .. } => {
314 assert_eq!(text, "content");
315 }
316 _ => panic!("Expected TextDelta"),
317 }
318 }
319
320 #[test]
321 fn test_parse_tool_start() {
322 let data = r#"{"type": "tool.start", "session_id": "abc123", "tool": "read_file", "input": {"path": "src/main.rs"}}"#;
323 let event = OpenCodeEvent::parse("tool", data);
324
325 match event {
326 OpenCodeEvent::ToolStart {
327 session_id,
328 tool_name,
329 input,
330 ..
331 } => {
332 assert_eq!(session_id, "abc123");
333 assert_eq!(tool_name, "read_file");
334 assert_eq!(input["path"], "src/main.rs");
335 }
336 _ => panic!("Expected ToolStart, got {:?}", event),
337 }
338 }
339
340 #[test]
341 fn test_parse_tool_result() {
342 let data = r#"{"type": "tool.result", "session_id": "abc", "tool": "bash", "output": "done", "success": true}"#;
343 let event = OpenCodeEvent::parse("tool", data);
344
345 match event {
346 OpenCodeEvent::ToolResult {
347 tool_name,
348 output,
349 success,
350 ..
351 } => {
352 assert_eq!(tool_name, "bash");
353 assert_eq!(output, "done");
354 assert!(success);
355 }
356 _ => panic!("Expected ToolResult"),
357 }
358 }
359
360 #[test]
361 fn test_parse_message_complete() {
362 let data = r#"{"type": "message.complete", "session_id": "xyz", "success": true}"#;
363 let event = OpenCodeEvent::parse("message", data);
364
365 match event {
366 OpenCodeEvent::MessageComplete {
367 session_id,
368 success,
369 } => {
370 assert_eq!(session_id, "xyz");
371 assert!(success);
372 }
373 _ => panic!("Expected MessageComplete"),
374 }
375 }
376
377 #[test]
378 fn test_parse_session_error() {
379 let data =
380 r#"{"type": "session.error", "session_id": "err1", "error": "Connection failed"}"#;
381 let event = OpenCodeEvent::parse("error", data);
382
383 match event {
384 OpenCodeEvent::SessionError { session_id, error } => {
385 assert_eq!(session_id, "err1");
386 assert_eq!(error, "Connection failed");
387 }
388 _ => panic!("Expected SessionError"),
389 }
390 }
391
392 #[test]
393 fn test_parse_unknown_event() {
394 let data = r#"{"type": "custom.event", "foo": "bar"}"#;
395 let event = OpenCodeEvent::parse("custom", data);
396
397 match event {
398 OpenCodeEvent::Unknown { event_type, .. } => {
399 assert_eq!(event_type, "custom.event");
400 }
401 _ => panic!("Expected Unknown"),
402 }
403 }
404
405 #[test]
406 fn test_parse_connected() {
407 let event = OpenCodeEvent::parse("server.connected", "");
408 assert!(matches!(event, OpenCodeEvent::Connected));
409
410 let event = OpenCodeEvent::parse("connected", "");
411 assert!(matches!(event, OpenCodeEvent::Connected));
412 }
413
414 #[test]
415 fn test_session_id_extraction() {
416 let event = OpenCodeEvent::TextDelta {
417 session_id: "test123".to_string(),
418 text: "hi".to_string(),
419 };
420 assert_eq!(event.session_id(), Some("test123"));
421
422 let event = OpenCodeEvent::Connected;
423 assert_eq!(event.session_id(), None);
424 }
425
426 #[test]
427 fn test_is_terminal() {
428 let complete = OpenCodeEvent::MessageComplete {
429 session_id: "a".to_string(),
430 success: true,
431 };
432 assert!(complete.is_terminal());
433
434 let error = OpenCodeEvent::SessionError {
435 session_id: "b".to_string(),
436 error: "fail".to_string(),
437 };
438 assert!(error.is_terminal());
439
440 let delta = OpenCodeEvent::TextDelta {
441 session_id: "c".to_string(),
442 text: "hi".to_string(),
443 };
444 assert!(!delta.is_terminal());
445 }
446}