Skip to main content

scud/opencode/
events.rs

1//! SSE event streaming for OpenCode Server
2//!
3//! Provides real-time event streaming via Server-Sent Events (SSE) for
4//! monitoring agent execution, tool calls, and completion status.
5
6use anyhow::Result;
7use eventsource_client::{self as es, Client};
8use futures::stream::StreamExt;
9use serde::Deserialize;
10use tokio::sync::mpsc;
11
12/// Events received from OpenCode Server SSE stream
13#[derive(Debug, Clone)]
14pub enum OpenCodeEvent {
15    /// Server connected
16    Connected,
17
18    /// Message started
19    MessageStart {
20        session_id: String,
21        message_id: String,
22    },
23
24    /// Text delta (streaming output)
25    TextDelta { session_id: String, text: String },
26
27    /// Tool execution started
28    ToolStart {
29        session_id: String,
30        tool_id: String,
31        tool_name: String,
32        input: serde_json::Value,
33    },
34
35    /// Tool execution completed
36    ToolResult {
37        session_id: String,
38        tool_id: String,
39        tool_name: String,
40        output: String,
41        success: bool,
42    },
43
44    /// Message completed
45    MessageComplete { session_id: String, success: bool },
46
47    /// Session error
48    SessionError { session_id: String, error: String },
49
50    /// Unknown event type
51    Unknown { event_type: String, data: String },
52}
53
54/// Raw SSE event from server
55#[derive(Debug, Deserialize)]
56struct RawEvent {
57    #[serde(rename = "type")]
58    event_type: String,
59    #[serde(default)]
60    session_id: Option<String>,
61    #[serde(flatten)]
62    data: serde_json::Value,
63}
64
65impl OpenCodeEvent {
66    /// Parse from SSE event data
67    pub fn parse(event_type: &str, data: &str) -> Self {
68        // Try to parse as JSON
69        let parsed: Result<RawEvent, _> = serde_json::from_str(data);
70
71        match parsed {
72            Ok(raw) => Self::from_raw(&raw),
73            Err(_) => {
74                // Fallback for non-JSON events
75                match event_type {
76                    "server.connected" | "connected" => OpenCodeEvent::Connected,
77                    _ => OpenCodeEvent::Unknown {
78                        event_type: event_type.to_string(),
79                        data: data.to_string(),
80                    },
81                }
82            }
83        }
84    }
85
86    fn from_raw(raw: &RawEvent) -> Self {
87        let session_id = raw.session_id.clone().unwrap_or_default();
88
89        match raw.event_type.as_str() {
90            "message.start" => OpenCodeEvent::MessageStart {
91                session_id,
92                message_id: raw
93                    .data
94                    .get("message_id")
95                    .and_then(|v| v.as_str())
96                    .unwrap_or("")
97                    .to_string(),
98            },
99
100            "text.delta" | "content.delta" => OpenCodeEvent::TextDelta {
101                session_id,
102                text: raw
103                    .data
104                    .get("text")
105                    .or_else(|| raw.data.get("delta"))
106                    .and_then(|v| v.as_str())
107                    .unwrap_or("")
108                    .to_string(),
109            },
110
111            "tool.start" | "tool_use.start" => OpenCodeEvent::ToolStart {
112                session_id,
113                tool_id: raw
114                    .data
115                    .get("tool_id")
116                    .or_else(|| raw.data.get("id"))
117                    .and_then(|v| v.as_str())
118                    .unwrap_or("")
119                    .to_string(),
120                tool_name: raw
121                    .data
122                    .get("tool")
123                    .or_else(|| raw.data.get("name"))
124                    .and_then(|v| v.as_str())
125                    .unwrap_or("")
126                    .to_string(),
127                input: raw
128                    .data
129                    .get("input")
130                    .cloned()
131                    .unwrap_or(serde_json::Value::Null),
132            },
133
134            "tool.result" | "tool_use.result" => OpenCodeEvent::ToolResult {
135                session_id,
136                tool_id: raw
137                    .data
138                    .get("tool_id")
139                    .or_else(|| raw.data.get("id"))
140                    .and_then(|v| v.as_str())
141                    .unwrap_or("")
142                    .to_string(),
143                tool_name: raw
144                    .data
145                    .get("tool")
146                    .or_else(|| raw.data.get("name"))
147                    .and_then(|v| v.as_str())
148                    .unwrap_or("")
149                    .to_string(),
150                output: raw
151                    .data
152                    .get("output")
153                    .and_then(|v| v.as_str())
154                    .unwrap_or("")
155                    .to_string(),
156                success: raw
157                    .data
158                    .get("success")
159                    .and_then(|v| v.as_bool())
160                    .unwrap_or(true),
161            },
162
163            "message.complete" | "message.done" => OpenCodeEvent::MessageComplete {
164                session_id,
165                success: raw
166                    .data
167                    .get("success")
168                    .and_then(|v| v.as_bool())
169                    .unwrap_or(true),
170            },
171
172            "session.error" | "error" => OpenCodeEvent::SessionError {
173                session_id,
174                error: raw
175                    .data
176                    .get("error")
177                    .and_then(|v| v.as_str())
178                    .unwrap_or("Unknown error")
179                    .to_string(),
180            },
181
182            _ => OpenCodeEvent::Unknown {
183                event_type: raw.event_type.clone(),
184                data: serde_json::to_string(&raw.data).unwrap_or_default(),
185            },
186        }
187    }
188
189    /// Get session ID if event is session-specific
190    pub fn session_id(&self) -> Option<&str> {
191        match self {
192            OpenCodeEvent::MessageStart { session_id, .. } => Some(session_id),
193            OpenCodeEvent::TextDelta { session_id, .. } => Some(session_id),
194            OpenCodeEvent::ToolStart { session_id, .. } => Some(session_id),
195            OpenCodeEvent::ToolResult { session_id, .. } => Some(session_id),
196            OpenCodeEvent::MessageComplete { session_id, .. } => Some(session_id),
197            OpenCodeEvent::SessionError { session_id, .. } => Some(session_id),
198            _ => None,
199        }
200    }
201
202    /// Check if this event indicates a session has ended
203    pub fn is_terminal(&self) -> bool {
204        matches!(
205            self,
206            OpenCodeEvent::MessageComplete { .. } | OpenCodeEvent::SessionError { .. }
207        )
208    }
209}
210
211/// Event stream subscription
212pub struct EventStream {
213    rx: mpsc::Receiver<OpenCodeEvent>,
214    _handle: tokio::task::JoinHandle<()>,
215}
216
217impl EventStream {
218    /// Create a new event stream connected to the given URL
219    pub async fn connect(url: &str) -> Result<Self> {
220        let (tx, rx) = mpsc::channel(1000);
221        let url = url.to_string();
222
223        let handle = tokio::spawn(async move {
224            let client = match es::ClientBuilder::for_url(&url) {
225                Ok(builder) => builder.build(),
226                Err(e) => {
227                    eprintln!("Failed to create SSE client: {}", e);
228                    return;
229                }
230            };
231
232            let mut stream = Box::pin(client.stream());
233
234            while let Some(event) = stream.next().await {
235                match event {
236                    Ok(es::SSE::Event(ev)) => {
237                        let parsed = OpenCodeEvent::parse(&ev.event_type, &ev.data);
238                        if tx.send(parsed).await.is_err() {
239                            break; // Receiver dropped
240                        }
241                    }
242                    Ok(es::SSE::Comment(_)) => continue,
243                    Ok(es::SSE::Connected(_)) => {
244                        let _ = tx.send(OpenCodeEvent::Connected).await;
245                    }
246                    Err(e) => {
247                        eprintln!("SSE error: {}", e);
248                        break;
249                    }
250                }
251            }
252        });
253
254        Ok(Self {
255            rx,
256            _handle: handle,
257        })
258    }
259
260    /// Receive next event
261    pub async fn recv(&mut self) -> Option<OpenCodeEvent> {
262        self.rx.recv().await
263    }
264
265    /// Try to receive without blocking
266    pub fn try_recv(&mut self) -> Option<OpenCodeEvent> {
267        self.rx.try_recv().ok()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_parse_message_start() {
277        let data = r#"{"type": "message.start", "session_id": "abc123", "message_id": "msg1"}"#;
278        let event = OpenCodeEvent::parse("message", data);
279
280        match event {
281            OpenCodeEvent::MessageStart {
282                session_id,
283                message_id,
284            } => {
285                assert_eq!(session_id, "abc123");
286                assert_eq!(message_id, "msg1");
287            }
288            _ => panic!("Expected MessageStart, got {:?}", event),
289        }
290    }
291
292    #[test]
293    fn test_parse_text_delta() {
294        let data = r#"{"type": "text.delta", "session_id": "abc", "text": "Hello world"}"#;
295        let event = OpenCodeEvent::parse("text", data);
296
297        match event {
298            OpenCodeEvent::TextDelta { session_id, text } => {
299                assert_eq!(session_id, "abc");
300                assert_eq!(text, "Hello world");
301            }
302            _ => panic!("Expected TextDelta, got {:?}", event),
303        }
304    }
305
306    #[test]
307    fn test_parse_text_delta_with_delta_field() {
308        // Some servers use "delta" instead of "text"
309        let data = r#"{"type": "content.delta", "session_id": "abc", "delta": "content"}"#;
310        let event = OpenCodeEvent::parse("content", data);
311
312        match event {
313            OpenCodeEvent::TextDelta { text, .. } => {
314                assert_eq!(text, "content");
315            }
316            _ => panic!("Expected TextDelta"),
317        }
318    }
319
320    #[test]
321    fn test_parse_tool_start() {
322        let data = r#"{"type": "tool.start", "session_id": "abc123", "tool": "read_file", "input": {"path": "src/main.rs"}}"#;
323        let event = OpenCodeEvent::parse("tool", data);
324
325        match event {
326            OpenCodeEvent::ToolStart {
327                session_id,
328                tool_name,
329                input,
330                ..
331            } => {
332                assert_eq!(session_id, "abc123");
333                assert_eq!(tool_name, "read_file");
334                assert_eq!(input["path"], "src/main.rs");
335            }
336            _ => panic!("Expected ToolStart, got {:?}", event),
337        }
338    }
339
340    #[test]
341    fn test_parse_tool_result() {
342        let data = r#"{"type": "tool.result", "session_id": "abc", "tool": "bash", "output": "done", "success": true}"#;
343        let event = OpenCodeEvent::parse("tool", data);
344
345        match event {
346            OpenCodeEvent::ToolResult {
347                tool_name,
348                output,
349                success,
350                ..
351            } => {
352                assert_eq!(tool_name, "bash");
353                assert_eq!(output, "done");
354                assert!(success);
355            }
356            _ => panic!("Expected ToolResult"),
357        }
358    }
359
360    #[test]
361    fn test_parse_message_complete() {
362        let data = r#"{"type": "message.complete", "session_id": "xyz", "success": true}"#;
363        let event = OpenCodeEvent::parse("message", data);
364
365        match event {
366            OpenCodeEvent::MessageComplete {
367                session_id,
368                success,
369            } => {
370                assert_eq!(session_id, "xyz");
371                assert!(success);
372            }
373            _ => panic!("Expected MessageComplete"),
374        }
375    }
376
377    #[test]
378    fn test_parse_session_error() {
379        let data =
380            r#"{"type": "session.error", "session_id": "err1", "error": "Connection failed"}"#;
381        let event = OpenCodeEvent::parse("error", data);
382
383        match event {
384            OpenCodeEvent::SessionError { session_id, error } => {
385                assert_eq!(session_id, "err1");
386                assert_eq!(error, "Connection failed");
387            }
388            _ => panic!("Expected SessionError"),
389        }
390    }
391
392    #[test]
393    fn test_parse_unknown_event() {
394        let data = r#"{"type": "custom.event", "foo": "bar"}"#;
395        let event = OpenCodeEvent::parse("custom", data);
396
397        match event {
398            OpenCodeEvent::Unknown { event_type, .. } => {
399                assert_eq!(event_type, "custom.event");
400            }
401            _ => panic!("Expected Unknown"),
402        }
403    }
404
405    #[test]
406    fn test_parse_connected() {
407        let event = OpenCodeEvent::parse("server.connected", "");
408        assert!(matches!(event, OpenCodeEvent::Connected));
409
410        let event = OpenCodeEvent::parse("connected", "");
411        assert!(matches!(event, OpenCodeEvent::Connected));
412    }
413
414    #[test]
415    fn test_session_id_extraction() {
416        let event = OpenCodeEvent::TextDelta {
417            session_id: "test123".to_string(),
418            text: "hi".to_string(),
419        };
420        assert_eq!(event.session_id(), Some("test123"));
421
422        let event = OpenCodeEvent::Connected;
423        assert_eq!(event.session_id(), None);
424    }
425
426    #[test]
427    fn test_is_terminal() {
428        let complete = OpenCodeEvent::MessageComplete {
429            session_id: "a".to_string(),
430            success: true,
431        };
432        assert!(complete.is_terminal());
433
434        let error = OpenCodeEvent::SessionError {
435            session_id: "b".to_string(),
436            error: "fail".to_string(),
437        };
438        assert!(error.is_terminal());
439
440        let delta = OpenCodeEvent::TextDelta {
441            session_id: "c".to_string(),
442            text: "hi".to_string(),
443        };
444        assert!(!delta.is_terminal());
445    }
446}