1use anyhow::Result;
7use eventsource_client::{self as es, Client};
8use futures::stream::StreamExt;
9use serde::Deserialize;
10use tokio::sync::mpsc;
11
12#[derive(Debug, Clone)]
14pub enum OpenCodeEvent {
15 Connected,
17
18 MessageStart {
20 session_id: String,
21 message_id: String,
22 },
23
24 TextDelta { session_id: String, text: String },
26
27 ToolStart {
29 session_id: String,
30 tool_id: String,
31 tool_name: String,
32 input: serde_json::Value,
33 },
34
35 ToolResult {
37 session_id: String,
38 tool_id: String,
39 tool_name: String,
40 output: String,
41 success: bool,
42 },
43
44 MessageComplete { session_id: String, success: bool },
46
47 SessionError { session_id: String, error: String },
49
50 Unknown { event_type: String, data: String },
52}
53
54#[derive(Debug, Deserialize)]
56struct RawEvent {
57 #[serde(rename = "type")]
58 event_type: String,
59 #[serde(default)]
60 session_id: Option<String>,
61 #[serde(flatten)]
62 data: serde_json::Value,
63}
64
65impl OpenCodeEvent {
66 pub fn parse(event_type: &str, data: &str) -> Self {
68 let parsed: Result<RawEvent, _> = serde_json::from_str(data);
70
71 match parsed {
72 Ok(raw) => Self::from_raw(&raw),
73 Err(_) => {
74 match event_type {
76 "server.connected" | "connected" => OpenCodeEvent::Connected,
77 _ => OpenCodeEvent::Unknown {
78 event_type: event_type.to_string(),
79 data: data.to_string(),
80 },
81 }
82 }
83 }
84 }
85
86 fn from_raw(raw: &RawEvent) -> Self {
87 let session_id = raw.session_id.clone().unwrap_or_default();
88
89 match raw.event_type.as_str() {
90 "message.start" => OpenCodeEvent::MessageStart {
91 session_id,
92 message_id: raw
93 .data
94 .get("message_id")
95 .and_then(|v| v.as_str())
96 .unwrap_or("")
97 .to_string(),
98 },
99
100 "text.delta" | "content.delta" => OpenCodeEvent::TextDelta {
101 session_id,
102 text: raw
103 .data
104 .get("text")
105 .or_else(|| raw.data.get("delta"))
106 .and_then(|v| v.as_str())
107 .unwrap_or("")
108 .to_string(),
109 },
110
111 "tool.start" | "tool_use.start" => OpenCodeEvent::ToolStart {
112 session_id,
113 tool_id: raw
114 .data
115 .get("tool_id")
116 .or_else(|| raw.data.get("id"))
117 .and_then(|v| v.as_str())
118 .unwrap_or("")
119 .to_string(),
120 tool_name: raw
121 .data
122 .get("tool")
123 .or_else(|| raw.data.get("name"))
124 .and_then(|v| v.as_str())
125 .unwrap_or("")
126 .to_string(),
127 input: raw
128 .data
129 .get("input")
130 .cloned()
131 .unwrap_or(serde_json::Value::Null),
132 },
133
134 "tool.result" | "tool_use.result" => OpenCodeEvent::ToolResult {
135 session_id,
136 tool_id: raw
137 .data
138 .get("tool_id")
139 .or_else(|| raw.data.get("id"))
140 .and_then(|v| v.as_str())
141 .unwrap_or("")
142 .to_string(),
143 tool_name: raw
144 .data
145 .get("tool")
146 .or_else(|| raw.data.get("name"))
147 .and_then(|v| v.as_str())
148 .unwrap_or("")
149 .to_string(),
150 output: raw
151 .data
152 .get("output")
153 .and_then(|v| v.as_str())
154 .unwrap_or("")
155 .to_string(),
156 success: raw
157 .data
158 .get("success")
159 .and_then(|v| v.as_bool())
160 .unwrap_or(true),
161 },
162
163 "message.complete" | "message.done" => OpenCodeEvent::MessageComplete {
164 session_id,
165 success: raw
166 .data
167 .get("success")
168 .and_then(|v| v.as_bool())
169 .unwrap_or(true),
170 },
171
172 "session.error" | "error" => OpenCodeEvent::SessionError {
173 session_id,
174 error: raw
175 .data
176 .get("error")
177 .and_then(|v| v.as_str())
178 .unwrap_or("Unknown error")
179 .to_string(),
180 },
181
182 _ => OpenCodeEvent::Unknown {
183 event_type: raw.event_type.clone(),
184 data: serde_json::to_string(&raw.data).unwrap_or_default(),
185 },
186 }
187 }
188
189 pub fn session_id(&self) -> Option<&str> {
191 match self {
192 OpenCodeEvent::MessageStart { session_id, .. } => Some(session_id),
193 OpenCodeEvent::TextDelta { session_id, .. } => Some(session_id),
194 OpenCodeEvent::ToolStart { session_id, .. } => Some(session_id),
195 OpenCodeEvent::ToolResult { session_id, .. } => Some(session_id),
196 OpenCodeEvent::MessageComplete { session_id, .. } => Some(session_id),
197 OpenCodeEvent::SessionError { session_id, .. } => Some(session_id),
198 _ => None,
199 }
200 }
201
202 pub fn is_terminal(&self) -> bool {
204 matches!(
205 self,
206 OpenCodeEvent::MessageComplete { .. } | OpenCodeEvent::SessionError { .. }
207 )
208 }
209}
210
211pub struct EventStream {
213 rx: mpsc::Receiver<OpenCodeEvent>,
214 _handle: tokio::task::JoinHandle<()>,
215}
216
217impl EventStream {
218 pub async fn connect(url: &str) -> Result<Self> {
220 let (tx, rx) = mpsc::channel(1000);
221 let url = url.to_string();
222
223 let handle = tokio::spawn(async move {
224 let client = match es::ClientBuilder::for_url(&url) {
225 Ok(builder) => builder.build(),
226 Err(e) => {
227 eprintln!("Failed to create SSE client: {}", e);
228 return;
229 }
230 };
231
232 let mut stream = Box::pin(client.stream());
233
234 while let Some(event) = stream.next().await {
235 match event {
236 Ok(es::SSE::Event(ev)) => {
237 let parsed = OpenCodeEvent::parse(&ev.event_type, &ev.data);
238 if tx.send(parsed).await.is_err() {
239 break; }
241 }
242 Ok(es::SSE::Comment(_)) => continue,
243 Ok(es::SSE::Connected(_)) => {
244 let _ = tx.send(OpenCodeEvent::Connected).await;
245 }
246 Err(e) => {
247 eprintln!("SSE error: {}", e);
248 break;
249 }
250 }
251 }
252 });
253
254 Ok(Self { rx, _handle: handle })
255 }
256
257 pub async fn recv(&mut self) -> Option<OpenCodeEvent> {
259 self.rx.recv().await
260 }
261
262 pub fn try_recv(&mut self) -> Option<OpenCodeEvent> {
264 self.rx.try_recv().ok()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn test_parse_message_start() {
274 let data =
275 r#"{"type": "message.start", "session_id": "abc123", "message_id": "msg1"}"#;
276 let event = OpenCodeEvent::parse("message", data);
277
278 match event {
279 OpenCodeEvent::MessageStart {
280 session_id,
281 message_id,
282 } => {
283 assert_eq!(session_id, "abc123");
284 assert_eq!(message_id, "msg1");
285 }
286 _ => panic!("Expected MessageStart, got {:?}", event),
287 }
288 }
289
290 #[test]
291 fn test_parse_text_delta() {
292 let data = r#"{"type": "text.delta", "session_id": "abc", "text": "Hello world"}"#;
293 let event = OpenCodeEvent::parse("text", data);
294
295 match event {
296 OpenCodeEvent::TextDelta { session_id, text } => {
297 assert_eq!(session_id, "abc");
298 assert_eq!(text, "Hello world");
299 }
300 _ => panic!("Expected TextDelta, got {:?}", event),
301 }
302 }
303
304 #[test]
305 fn test_parse_text_delta_with_delta_field() {
306 let data = r#"{"type": "content.delta", "session_id": "abc", "delta": "content"}"#;
308 let event = OpenCodeEvent::parse("content", data);
309
310 match event {
311 OpenCodeEvent::TextDelta { text, .. } => {
312 assert_eq!(text, "content");
313 }
314 _ => panic!("Expected TextDelta"),
315 }
316 }
317
318 #[test]
319 fn test_parse_tool_start() {
320 let data = r#"{"type": "tool.start", "session_id": "abc123", "tool": "read_file", "input": {"path": "src/main.rs"}}"#;
321 let event = OpenCodeEvent::parse("tool", data);
322
323 match event {
324 OpenCodeEvent::ToolStart {
325 session_id,
326 tool_name,
327 input,
328 ..
329 } => {
330 assert_eq!(session_id, "abc123");
331 assert_eq!(tool_name, "read_file");
332 assert_eq!(input["path"], "src/main.rs");
333 }
334 _ => panic!("Expected ToolStart, got {:?}", event),
335 }
336 }
337
338 #[test]
339 fn test_parse_tool_result() {
340 let data = r#"{"type": "tool.result", "session_id": "abc", "tool": "bash", "output": "done", "success": true}"#;
341 let event = OpenCodeEvent::parse("tool", data);
342
343 match event {
344 OpenCodeEvent::ToolResult {
345 tool_name,
346 output,
347 success,
348 ..
349 } => {
350 assert_eq!(tool_name, "bash");
351 assert_eq!(output, "done");
352 assert!(success);
353 }
354 _ => panic!("Expected ToolResult"),
355 }
356 }
357
358 #[test]
359 fn test_parse_message_complete() {
360 let data = r#"{"type": "message.complete", "session_id": "xyz", "success": true}"#;
361 let event = OpenCodeEvent::parse("message", data);
362
363 match event {
364 OpenCodeEvent::MessageComplete { session_id, success } => {
365 assert_eq!(session_id, "xyz");
366 assert!(success);
367 }
368 _ => panic!("Expected MessageComplete"),
369 }
370 }
371
372 #[test]
373 fn test_parse_session_error() {
374 let data = r#"{"type": "session.error", "session_id": "err1", "error": "Connection failed"}"#;
375 let event = OpenCodeEvent::parse("error", data);
376
377 match event {
378 OpenCodeEvent::SessionError { session_id, error } => {
379 assert_eq!(session_id, "err1");
380 assert_eq!(error, "Connection failed");
381 }
382 _ => panic!("Expected SessionError"),
383 }
384 }
385
386 #[test]
387 fn test_parse_unknown_event() {
388 let data = r#"{"type": "custom.event", "foo": "bar"}"#;
389 let event = OpenCodeEvent::parse("custom", data);
390
391 match event {
392 OpenCodeEvent::Unknown { event_type, .. } => {
393 assert_eq!(event_type, "custom.event");
394 }
395 _ => panic!("Expected Unknown"),
396 }
397 }
398
399 #[test]
400 fn test_parse_connected() {
401 let event = OpenCodeEvent::parse("server.connected", "");
402 assert!(matches!(event, OpenCodeEvent::Connected));
403
404 let event = OpenCodeEvent::parse("connected", "");
405 assert!(matches!(event, OpenCodeEvent::Connected));
406 }
407
408 #[test]
409 fn test_session_id_extraction() {
410 let event = OpenCodeEvent::TextDelta {
411 session_id: "test123".to_string(),
412 text: "hi".to_string(),
413 };
414 assert_eq!(event.session_id(), Some("test123"));
415
416 let event = OpenCodeEvent::Connected;
417 assert_eq!(event.session_id(), None);
418 }
419
420 #[test]
421 fn test_is_terminal() {
422 let complete = OpenCodeEvent::MessageComplete {
423 session_id: "a".to_string(),
424 success: true,
425 };
426 assert!(complete.is_terminal());
427
428 let error = OpenCodeEvent::SessionError {
429 session_id: "b".to_string(),
430 error: "fail".to_string(),
431 };
432 assert!(error.is_terminal());
433
434 let delta = OpenCodeEvent::TextDelta {
435 session_id: "c".to_string(),
436 text: "hi".to_string(),
437 };
438 assert!(!delta.is_terminal());
439 }
440}