Skip to main content

scud/opencode/
events.rs

1//! SSE event streaming for OpenCode Server
2//!
3//! Provides real-time event streaming via Server-Sent Events (SSE) for
4//! monitoring agent execution, tool calls, and completion status.
5
6use anyhow::Result;
7use eventsource_client::{self as es, Client};
8use futures::stream::StreamExt;
9use serde::Deserialize;
10use tokio::sync::mpsc;
11
12/// Events received from OpenCode Server SSE stream
13#[derive(Debug, Clone)]
14pub enum OpenCodeEvent {
15    /// Server connected
16    Connected,
17
18    /// Message started
19    MessageStart {
20        session_id: String,
21        message_id: String,
22    },
23
24    /// Text delta (streaming output)
25    TextDelta { session_id: String, text: String },
26
27    /// Tool execution started
28    ToolStart {
29        session_id: String,
30        tool_id: String,
31        tool_name: String,
32        input: serde_json::Value,
33    },
34
35    /// Tool execution completed
36    ToolResult {
37        session_id: String,
38        tool_id: String,
39        tool_name: String,
40        output: String,
41        success: bool,
42    },
43
44    /// Message completed
45    MessageComplete { session_id: String, success: bool },
46
47    /// Session error
48    SessionError { session_id: String, error: String },
49
50    /// Unknown event type
51    Unknown { event_type: String, data: String },
52}
53
54/// Raw SSE event from server
55#[derive(Debug, Deserialize)]
56struct RawEvent {
57    #[serde(rename = "type")]
58    event_type: String,
59    #[serde(default)]
60    session_id: Option<String>,
61    #[serde(flatten)]
62    data: serde_json::Value,
63}
64
65impl OpenCodeEvent {
66    /// Parse from SSE event data
67    pub fn parse(event_type: &str, data: &str) -> Self {
68        // Try to parse as JSON
69        let parsed: Result<RawEvent, _> = serde_json::from_str(data);
70
71        match parsed {
72            Ok(raw) => Self::from_raw(&raw),
73            Err(_) => {
74                // Fallback for non-JSON events
75                match event_type {
76                    "server.connected" | "connected" => OpenCodeEvent::Connected,
77                    _ => OpenCodeEvent::Unknown {
78                        event_type: event_type.to_string(),
79                        data: data.to_string(),
80                    },
81                }
82            }
83        }
84    }
85
86    fn from_raw(raw: &RawEvent) -> Self {
87        let session_id = raw.session_id.clone().unwrap_or_default();
88
89        match raw.event_type.as_str() {
90            "message.start" => OpenCodeEvent::MessageStart {
91                session_id,
92                message_id: raw
93                    .data
94                    .get("message_id")
95                    .and_then(|v| v.as_str())
96                    .unwrap_or("")
97                    .to_string(),
98            },
99
100            "text.delta" | "content.delta" => OpenCodeEvent::TextDelta {
101                session_id,
102                text: raw
103                    .data
104                    .get("text")
105                    .or_else(|| raw.data.get("delta"))
106                    .and_then(|v| v.as_str())
107                    .unwrap_or("")
108                    .to_string(),
109            },
110
111            "tool.start" | "tool_use.start" => OpenCodeEvent::ToolStart {
112                session_id,
113                tool_id: raw
114                    .data
115                    .get("tool_id")
116                    .or_else(|| raw.data.get("id"))
117                    .and_then(|v| v.as_str())
118                    .unwrap_or("")
119                    .to_string(),
120                tool_name: raw
121                    .data
122                    .get("tool")
123                    .or_else(|| raw.data.get("name"))
124                    .and_then(|v| v.as_str())
125                    .unwrap_or("")
126                    .to_string(),
127                input: raw
128                    .data
129                    .get("input")
130                    .cloned()
131                    .unwrap_or(serde_json::Value::Null),
132            },
133
134            "tool.result" | "tool_use.result" => OpenCodeEvent::ToolResult {
135                session_id,
136                tool_id: raw
137                    .data
138                    .get("tool_id")
139                    .or_else(|| raw.data.get("id"))
140                    .and_then(|v| v.as_str())
141                    .unwrap_or("")
142                    .to_string(),
143                tool_name: raw
144                    .data
145                    .get("tool")
146                    .or_else(|| raw.data.get("name"))
147                    .and_then(|v| v.as_str())
148                    .unwrap_or("")
149                    .to_string(),
150                output: raw
151                    .data
152                    .get("output")
153                    .and_then(|v| v.as_str())
154                    .unwrap_or("")
155                    .to_string(),
156                success: raw
157                    .data
158                    .get("success")
159                    .and_then(|v| v.as_bool())
160                    .unwrap_or(true),
161            },
162
163            "message.complete" | "message.done" => OpenCodeEvent::MessageComplete {
164                session_id,
165                success: raw
166                    .data
167                    .get("success")
168                    .and_then(|v| v.as_bool())
169                    .unwrap_or(true),
170            },
171
172            "session.error" | "error" => OpenCodeEvent::SessionError {
173                session_id,
174                error: raw
175                    .data
176                    .get("error")
177                    .and_then(|v| v.as_str())
178                    .unwrap_or("Unknown error")
179                    .to_string(),
180            },
181
182            _ => OpenCodeEvent::Unknown {
183                event_type: raw.event_type.clone(),
184                data: serde_json::to_string(&raw.data).unwrap_or_default(),
185            },
186        }
187    }
188
189    /// Get session ID if event is session-specific
190    pub fn session_id(&self) -> Option<&str> {
191        match self {
192            OpenCodeEvent::MessageStart { session_id, .. } => Some(session_id),
193            OpenCodeEvent::TextDelta { session_id, .. } => Some(session_id),
194            OpenCodeEvent::ToolStart { session_id, .. } => Some(session_id),
195            OpenCodeEvent::ToolResult { session_id, .. } => Some(session_id),
196            OpenCodeEvent::MessageComplete { session_id, .. } => Some(session_id),
197            OpenCodeEvent::SessionError { session_id, .. } => Some(session_id),
198            _ => None,
199        }
200    }
201
202    /// Check if this event indicates a session has ended
203    pub fn is_terminal(&self) -> bool {
204        matches!(
205            self,
206            OpenCodeEvent::MessageComplete { .. } | OpenCodeEvent::SessionError { .. }
207        )
208    }
209}
210
211/// Event stream subscription
212pub struct EventStream {
213    rx: mpsc::Receiver<OpenCodeEvent>,
214    _handle: tokio::task::JoinHandle<()>,
215}
216
217impl EventStream {
218    /// Create a new event stream connected to the given URL
219    pub async fn connect(url: &str) -> Result<Self> {
220        let (tx, rx) = mpsc::channel(1000);
221        let url = url.to_string();
222
223        let handle = tokio::spawn(async move {
224            let client = match es::ClientBuilder::for_url(&url) {
225                Ok(builder) => builder.build(),
226                Err(e) => {
227                    eprintln!("Failed to create SSE client: {}", e);
228                    return;
229                }
230            };
231
232            let mut stream = Box::pin(client.stream());
233
234            while let Some(event) = stream.next().await {
235                match event {
236                    Ok(es::SSE::Event(ev)) => {
237                        let parsed = OpenCodeEvent::parse(&ev.event_type, &ev.data);
238                        if tx.send(parsed).await.is_err() {
239                            break; // Receiver dropped
240                        }
241                    }
242                    Ok(es::SSE::Comment(_)) => continue,
243                    Ok(es::SSE::Connected(_)) => {
244                        let _ = tx.send(OpenCodeEvent::Connected).await;
245                    }
246                    Err(e) => {
247                        eprintln!("SSE error: {}", e);
248                        break;
249                    }
250                }
251            }
252        });
253
254        Ok(Self { rx, _handle: handle })
255    }
256
257    /// Receive next event
258    pub async fn recv(&mut self) -> Option<OpenCodeEvent> {
259        self.rx.recv().await
260    }
261
262    /// Try to receive without blocking
263    pub fn try_recv(&mut self) -> Option<OpenCodeEvent> {
264        self.rx.try_recv().ok()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_parse_message_start() {
274        let data =
275            r#"{"type": "message.start", "session_id": "abc123", "message_id": "msg1"}"#;
276        let event = OpenCodeEvent::parse("message", data);
277
278        match event {
279            OpenCodeEvent::MessageStart {
280                session_id,
281                message_id,
282            } => {
283                assert_eq!(session_id, "abc123");
284                assert_eq!(message_id, "msg1");
285            }
286            _ => panic!("Expected MessageStart, got {:?}", event),
287        }
288    }
289
290    #[test]
291    fn test_parse_text_delta() {
292        let data = r#"{"type": "text.delta", "session_id": "abc", "text": "Hello world"}"#;
293        let event = OpenCodeEvent::parse("text", data);
294
295        match event {
296            OpenCodeEvent::TextDelta { session_id, text } => {
297                assert_eq!(session_id, "abc");
298                assert_eq!(text, "Hello world");
299            }
300            _ => panic!("Expected TextDelta, got {:?}", event),
301        }
302    }
303
304    #[test]
305    fn test_parse_text_delta_with_delta_field() {
306        // Some servers use "delta" instead of "text"
307        let data = r#"{"type": "content.delta", "session_id": "abc", "delta": "content"}"#;
308        let event = OpenCodeEvent::parse("content", data);
309
310        match event {
311            OpenCodeEvent::TextDelta { text, .. } => {
312                assert_eq!(text, "content");
313            }
314            _ => panic!("Expected TextDelta"),
315        }
316    }
317
318    #[test]
319    fn test_parse_tool_start() {
320        let data = r#"{"type": "tool.start", "session_id": "abc123", "tool": "read_file", "input": {"path": "src/main.rs"}}"#;
321        let event = OpenCodeEvent::parse("tool", data);
322
323        match event {
324            OpenCodeEvent::ToolStart {
325                session_id,
326                tool_name,
327                input,
328                ..
329            } => {
330                assert_eq!(session_id, "abc123");
331                assert_eq!(tool_name, "read_file");
332                assert_eq!(input["path"], "src/main.rs");
333            }
334            _ => panic!("Expected ToolStart, got {:?}", event),
335        }
336    }
337
338    #[test]
339    fn test_parse_tool_result() {
340        let data = r#"{"type": "tool.result", "session_id": "abc", "tool": "bash", "output": "done", "success": true}"#;
341        let event = OpenCodeEvent::parse("tool", data);
342
343        match event {
344            OpenCodeEvent::ToolResult {
345                tool_name,
346                output,
347                success,
348                ..
349            } => {
350                assert_eq!(tool_name, "bash");
351                assert_eq!(output, "done");
352                assert!(success);
353            }
354            _ => panic!("Expected ToolResult"),
355        }
356    }
357
358    #[test]
359    fn test_parse_message_complete() {
360        let data = r#"{"type": "message.complete", "session_id": "xyz", "success": true}"#;
361        let event = OpenCodeEvent::parse("message", data);
362
363        match event {
364            OpenCodeEvent::MessageComplete { session_id, success } => {
365                assert_eq!(session_id, "xyz");
366                assert!(success);
367            }
368            _ => panic!("Expected MessageComplete"),
369        }
370    }
371
372    #[test]
373    fn test_parse_session_error() {
374        let data = r#"{"type": "session.error", "session_id": "err1", "error": "Connection failed"}"#;
375        let event = OpenCodeEvent::parse("error", data);
376
377        match event {
378            OpenCodeEvent::SessionError { session_id, error } => {
379                assert_eq!(session_id, "err1");
380                assert_eq!(error, "Connection failed");
381            }
382            _ => panic!("Expected SessionError"),
383        }
384    }
385
386    #[test]
387    fn test_parse_unknown_event() {
388        let data = r#"{"type": "custom.event", "foo": "bar"}"#;
389        let event = OpenCodeEvent::parse("custom", data);
390
391        match event {
392            OpenCodeEvent::Unknown { event_type, .. } => {
393                assert_eq!(event_type, "custom.event");
394            }
395            _ => panic!("Expected Unknown"),
396        }
397    }
398
399    #[test]
400    fn test_parse_connected() {
401        let event = OpenCodeEvent::parse("server.connected", "");
402        assert!(matches!(event, OpenCodeEvent::Connected));
403
404        let event = OpenCodeEvent::parse("connected", "");
405        assert!(matches!(event, OpenCodeEvent::Connected));
406    }
407
408    #[test]
409    fn test_session_id_extraction() {
410        let event = OpenCodeEvent::TextDelta {
411            session_id: "test123".to_string(),
412            text: "hi".to_string(),
413        };
414        assert_eq!(event.session_id(), Some("test123"));
415
416        let event = OpenCodeEvent::Connected;
417        assert_eq!(event.session_id(), None);
418    }
419
420    #[test]
421    fn test_is_terminal() {
422        let complete = OpenCodeEvent::MessageComplete {
423            session_id: "a".to_string(),
424            success: true,
425        };
426        assert!(complete.is_terminal());
427
428        let error = OpenCodeEvent::SessionError {
429            session_id: "b".to_string(),
430            error: "fail".to_string(),
431        };
432        assert!(error.is_terminal());
433
434        let delta = OpenCodeEvent::TextDelta {
435            session_id: "c".to_string(),
436            text: "hi".to_string(),
437        };
438        assert!(!delta.is_terminal());
439    }
440}