Skip to main content

opencode_voice/bridge/
events.rs

1//! SSE event stream client for OpenCode's /event endpoint.
2
3use anyhow::Result;
4use base64::{engine::general_purpose::STANDARD, Engine as _};
5use reqwest::Client;
6use std::time::Duration;
7use tokio::time::sleep;
8use tokio_util::sync::CancellationToken;
9
10use crate::approval::types::{PermissionRequest, QuestionRequest};
11
12/// Events received from the OpenCode SSE stream.
13#[derive(Debug, Clone)]
14pub enum SseEvent {
15    PermissionAsked(PermissionRequest),
16    PermissionReplied {
17        session_id: String,
18        request_id: String,
19        reply: String,
20    },
21    QuestionAsked(QuestionRequest),
22    QuestionReplied {
23        session_id: String,
24        request_id: String,
25        answers: Vec<Vec<String>>,
26    },
27    QuestionRejected {
28        session_id: String,
29        request_id: String,
30    },
31    /// Session status changed (busy/idle/retry).
32    ///
33    /// When the session becomes busy it means the AI has resumed work — any
34    /// pending permissions or questions must have been answered (possibly by
35    /// the user interacting directly with the OpenCode TUI).
36    SessionStatus {
37        session_id: String,
38        /// `true` when the session is actively processing ("busy"),
39        /// `false` when idle.
40        busy: bool,
41    },
42    Connected,
43    Disconnected(Option<String>),
44}
45
46/// SSE stream client with automatic reconnection.
47pub struct OpenCodeEvents {
48    base_url: String,
49    password: Option<String>,
50    sender: tokio::sync::mpsc::UnboundedSender<SseEvent>,
51}
52
53impl OpenCodeEvents {
54    pub fn new(
55        base_url: String,
56        password: Option<String>,
57        sender: tokio::sync::mpsc::UnboundedSender<SseEvent>,
58    ) -> Self {
59        OpenCodeEvents {
60            base_url,
61            password,
62            sender,
63        }
64    }
65
66    /// Spawns the reconnecting SSE listener as a background tokio task.
67    ///
68    /// The task runs until the CancellationToken is cancelled.
69    pub fn start(&self, cancel: CancellationToken) -> tokio::task::JoinHandle<()> {
70        let base_url = self.base_url.clone();
71        let password = self.password.clone();
72        let sender = self.sender.clone();
73
74        tokio::spawn(async move {
75            let mut delay_secs: u64 = 1;
76
77            loop {
78                if cancel.is_cancelled() {
79                    break;
80                }
81
82                match connect_and_stream(&base_url, &password, &sender, &cancel).await {
83                    Ok(()) => {
84                        // Clean disconnect (cancel token fired)
85                        break;
86                    }
87                    Err(e) => {
88                        let _ = sender.send(SseEvent::Disconnected(Some(e.to_string())));
89
90                        if cancel.is_cancelled() {
91                            break;
92                        }
93
94                        // Exponential backoff: 1s, 2s, 4s, ..., 30s max.
95                        // Use select! so cancellation wakes us immediately
96                        // instead of waiting for the full sleep to elapse.
97                        tokio::select! {
98                            _ = cancel.cancelled() => break,
99                            _ = sleep(Duration::from_secs(delay_secs)) => {}
100                        }
101                        delay_secs = next_reconnect_delay(delay_secs);
102                    }
103                }
104            }
105        })
106    }
107}
108
109/// Connects to the /event endpoint and streams events until error or cancellation.
110async fn connect_and_stream(
111    base_url: &str,
112    password: &Option<String>,
113    sender: &tokio::sync::mpsc::UnboundedSender<SseEvent>,
114    cancel: &CancellationToken,
115) -> Result<()> {
116    let url = format!("{}/event", base_url);
117    let client = Client::new();
118
119    let mut req = client
120        .get(&url)
121        .header("Accept", "text/event-stream")
122        .header("Cache-Control", "no-cache");
123
124    if let Some(pw) = password {
125        let creds = format!(":{}", pw);
126        req = req.header("Authorization", format!("Basic {}", STANDARD.encode(creds)));
127    }
128
129    let response = req.send().await?;
130
131    if !response.status().is_success() {
132        anyhow::bail!("SSE connection failed with status {}", response.status());
133    }
134
135    // Signal successful connection and reset backoff on the caller side
136    let _ = sender.send(SseEvent::Connected);
137
138    use futures::StreamExt;
139    let mut stream = response.bytes_stream();
140
141    let mut buffer = String::new();
142
143    loop {
144        tokio::select! {
145            _ = cancel.cancelled() => {
146                return Ok(());
147            }
148            chunk = stream.next() => {
149                match chunk {
150                    None => {
151                        // Stream ended
152                        anyhow::bail!("SSE stream ended unexpectedly");
153                    }
154                    Some(Err(e)) => {
155                        anyhow::bail!("SSE stream error: {}", e);
156                    }
157                    Some(Ok(bytes)) => {
158                        let text = String::from_utf8_lossy(&bytes);
159                        buffer.push_str(&text);
160
161                        // Process complete SSE blocks (terminated by \n\n)
162                        while let Some(pos) = buffer.find("\n\n") {
163                            let block = buffer[..pos].to_string();
164                            buffer = buffer[pos + 2..].to_string();
165                            if let Some(event) = parse_sse_block(&block) {
166                                let _ = sender.send(event);
167                            }
168                        }
169                    }
170                }
171            }
172        }
173    }
174}
175
176/// Extracts the data field from an SSE block and returns the parsed event.
177///
178/// Returns `None` for heartbeats, malformed JSON, missing data lines,
179/// unknown event types, and other non-actionable blocks.
180pub fn parse_sse_block(block: &str) -> Option<SseEvent> {
181    // Find "data:" line
182    let data = block
183        .lines()
184        .find(|line| line.starts_with("data:"))
185        .map(|line| line.trim_start_matches("data:").trim());
186
187    let data = match data {
188        Some(d) if !d.is_empty() => d,
189        _ => return None,
190    };
191
192    // Parse JSON
193    let json: serde_json::Value = match serde_json::from_str(data) {
194        Ok(v) => v,
195        Err(_) => return None, // Skip malformed JSON
196    };
197
198    let event_type = json.get("type").and_then(|v| v.as_str())?;
199
200    let props = json
201        .get("properties")
202        .cloned()
203        .unwrap_or(serde_json::Value::Null);
204
205    match event_type {
206        "server.connected" => Some(SseEvent::Connected),
207        "server.heartbeat" => None,
208        "permission.asked" => {
209            serde_json::from_value::<PermissionRequest>(props)
210                .ok()
211                .map(SseEvent::PermissionAsked)
212        }
213        "permission.replied" => {
214            let session_id = props
215                .get("session_id")
216                .and_then(|v| v.as_str())
217                .unwrap_or("")
218                .to_string();
219            let request_id = props
220                .get("request_id")
221                .and_then(|v| v.as_str())
222                .unwrap_or("")
223                .to_string();
224            let reply = props
225                .get("reply")
226                .and_then(|v| v.as_str())
227                .unwrap_or("")
228                .to_string();
229            Some(SseEvent::PermissionReplied {
230                session_id,
231                request_id,
232                reply,
233            })
234        }
235        "question.asked" => {
236            serde_json::from_value::<QuestionRequest>(props)
237                .ok()
238                .map(SseEvent::QuestionAsked)
239        }
240        "question.replied" => {
241            let session_id = props
242                .get("session_id")
243                .and_then(|v| v.as_str())
244                .unwrap_or("")
245                .to_string();
246            let request_id = props
247                .get("request_id")
248                .and_then(|v| v.as_str())
249                .unwrap_or("")
250                .to_string();
251            let answers = props
252                .get("answers")
253                .and_then(|v| serde_json::from_value::<Vec<Vec<String>>>(v.clone()).ok())
254                .unwrap_or_default();
255            Some(SseEvent::QuestionReplied {
256                session_id,
257                request_id,
258                answers,
259            })
260        }
261        "question.rejected" => {
262            let session_id = props
263                .get("session_id")
264                .and_then(|v| v.as_str())
265                .unwrap_or("")
266                .to_string();
267            let request_id = props
268                .get("request_id")
269                .and_then(|v| v.as_str())
270                .unwrap_or("")
271                .to_string();
272            Some(SseEvent::QuestionRejected {
273                session_id,
274                request_id,
275            })
276        }
277        // Events we intentionally ignore (high-frequency or informational).
278        "session.updated" | "session.created" | "session.deleted" | "session.diff"
279        | "session.error" | "session.idle" | "session.compacted"
280        | "message.updated" | "message.removed" | "message.part.updated"
281        | "message.part.delta" | "message.part.removed"
282        | "file.edited" | "file.watcher.updated"
283        | "project.updated" | "vcs.branch.updated"
284        | "todo.updated" | "mcp.tools.changed" | "lsp.updated"
285        | "pty.created" | "pty.updated" | "pty.exited" | "pty.deleted"
286        | "permission.updated"
287        | "installation.updated" | "installation.update-available" => None,
288        "session.status" => {
289            let session_id = props
290                .get("sessionID")
291                .or_else(|| props.get("session_id"))
292                .and_then(|v| v.as_str())
293                .unwrap_or("")
294                .to_string();
295            // The status field is an object like { "type": "busy" } or { "type": "idle" }.
296            let status_type = props
297                .get("status")
298                .and_then(|v| v.get("type"))
299                .and_then(|v| v.as_str())
300                .unwrap_or("");
301            let busy = status_type == "busy";
302            Some(SseEvent::SessionStatus { session_id, busy })
303        }
304        other => {
305            if is_debug() {
306                eprintln!("[voice:debug] Unknown SSE event type: {}", other);
307            }
308            None
309        }
310    }
311}
312
313/// Returns true when verbose debug logging is enabled via `VOICE_DEBUG=1`.
314fn is_debug() -> bool {
315    std::env::var("VOICE_DEBUG").map_or(false, |v| v == "1" || v == "true")
316}
317
318/// Computes the next reconnect delay using exponential backoff, capped at 30s.
319pub fn next_reconnect_delay(current: u64) -> u64 {
320    (current * 2).min(30)
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_parse_connected() {
329        let event = parse_sse_block(
330            "data: {\"type\":\"server.connected\",\"properties\":{}}",
331        );
332        assert!(matches!(event, Some(SseEvent::Connected)));
333    }
334
335    #[test]
336    fn test_parse_heartbeat_ignored() {
337        assert!(parse_sse_block(
338            "data: {\"type\":\"server.heartbeat\",\"properties\":{}}"
339        ).is_none());
340    }
341
342    #[test]
343    fn test_parse_malformed_json() {
344        assert!(parse_sse_block("data: not-valid-json").is_none());
345    }
346
347    #[test]
348    fn test_parse_empty_data() {
349        assert!(parse_sse_block("event: ping\n").is_none());
350    }
351
352    #[test]
353    fn test_parse_unknown_type() {
354        assert!(parse_sse_block(
355            "data: {\"type\":\"unknown.event\",\"properties\":{}}"
356        ).is_none());
357    }
358
359    #[test]
360    fn test_parse_permission_asked() {
361        let json = r#"data: {"type":"permission.asked","properties":{"id":"test-id","session_id":"sess","permission":"bash","patterns":[],"metadata":{},"always":[],"tool":null}}"#;
362        let event = parse_sse_block(json).unwrap();
363        assert!(matches!(event, SseEvent::PermissionAsked(ref req) if req.id == "test-id"));
364    }
365
366    #[test]
367    fn test_parse_question_asked() {
368        let json = r#"data: {"type":"question.asked","properties":{"id":"q1","session_id":"s1","questions":[{"question":"What?","header":"H","options":[],"multiple":false,"custom":true}]}}"#;
369        let event = parse_sse_block(json).unwrap();
370        assert!(matches!(event, SseEvent::QuestionAsked(ref req) if req.id == "q1"));
371    }
372
373    #[test]
374    fn test_parse_permission_replied() {
375        let json = r#"data: {"type":"permission.replied","properties":{"session_id":"s1","request_id":"r1","reply":"once"}}"#;
376        let event = parse_sse_block(json).unwrap();
377        assert!(
378            matches!(event, SseEvent::PermissionReplied { ref session_id, ref request_id, ref reply }
379                if session_id == "s1" && request_id == "r1" && reply == "once")
380        );
381    }
382
383    #[test]
384    fn test_parse_question_replied() {
385        let json = r#"data: {"type":"question.replied","properties":{"session_id":"s1","request_id":"r1","answers":[["yes","no"]]}}"#;
386        let event = parse_sse_block(json).unwrap();
387        assert!(
388            matches!(event, SseEvent::QuestionReplied { ref session_id, ref request_id, ref answers }
389                if session_id == "s1" && request_id == "r1" && answers == &vec![vec!["yes".to_string(), "no".to_string()]])
390        );
391    }
392
393    #[test]
394    fn test_parse_question_rejected() {
395        let json = r#"data: {"type":"question.rejected","properties":{"session_id":"s1","request_id":"r1"}}"#;
396        let event = parse_sse_block(json).unwrap();
397        assert!(
398            matches!(event, SseEvent::QuestionRejected { ref session_id, ref request_id }
399                if session_id == "s1" && request_id == "r1")
400        );
401    }
402
403    #[test]
404    fn test_backoff_calculation() {
405        let mut delay: u64 = 1;
406        let sequence: Vec<u64> = (0..8)
407            .map(|_| {
408                let d = delay;
409                delay = next_reconnect_delay(delay);
410                d
411            })
412            .collect();
413        assert_eq!(sequence, vec![1, 2, 4, 8, 16, 30, 30, 30]);
414    }
415
416    // ── session.status ────────────────────────────────────────────────
417
418    #[test]
419    fn test_parse_session_status_busy() {
420        let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"busy"}}}"#;
421        let event = parse_sse_block(json).unwrap();
422        assert!(
423            matches!(event, SseEvent::SessionStatus { ref session_id, busy }
424                if session_id == "s1" && busy)
425        );
426    }
427
428    #[test]
429    fn test_parse_session_status_idle() {
430        let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"idle"}}}"#;
431        let event = parse_sse_block(json).unwrap();
432        assert!(
433            matches!(event, SseEvent::SessionStatus { ref session_id, busy }
434                if session_id == "s1" && !busy)
435        );
436    }
437
438    #[test]
439    fn test_parse_session_status_retry() {
440        // "retry" is neither "busy" — should parse as not busy.
441        let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"retry"}}}"#;
442        let event = parse_sse_block(json).unwrap();
443        assert!(
444            matches!(event, SseEvent::SessionStatus { busy, .. } if !busy)
445        );
446    }
447
448    #[test]
449    fn test_parse_session_status_missing_status_field() {
450        // Missing status object → defaults to not busy.
451        let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1"}}"#;
452        let event = parse_sse_block(json).unwrap();
453        assert!(
454            matches!(event, SseEvent::SessionStatus { busy, .. } if !busy)
455        );
456    }
457
458    #[test]
459    fn test_parse_session_status_snake_case_session_id() {
460        // Uses "session_id" instead of "sessionID" — both should be supported.
461        let json = r#"data: {"type":"session.status","properties":{"session_id":"s2","status":{"type":"busy"}}}"#;
462        let event = parse_sse_block(json).unwrap();
463        assert!(
464            matches!(event, SseEvent::SessionStatus { ref session_id, busy }
465                if session_id == "s2" && busy)
466        );
467    }
468
469    // ── explicitly ignored events ─────────────────────────────────────
470
471    #[test]
472    fn test_parse_ignored_session_events_return_none() {
473        for event_type in &[
474            "session.updated",
475            "session.created",
476            "session.deleted",
477            "session.diff",
478            "session.error",
479            "session.idle",
480            "session.compacted",
481        ] {
482            let json = format!(
483                r#"data: {{"type":"{}","properties":{{}}}}"#,
484                event_type
485            );
486            assert!(
487                parse_sse_block(&json).is_none(),
488                "{} should be explicitly ignored",
489                event_type
490            );
491        }
492    }
493
494    #[test]
495    fn test_parse_ignored_message_events_return_none() {
496        for event_type in &[
497            "message.updated",
498            "message.removed",
499            "message.part.updated",
500            "message.part.delta",
501            "message.part.removed",
502        ] {
503            let json = format!(
504                r#"data: {{"type":"{}","properties":{{}}}}"#,
505                event_type
506            );
507            assert!(
508                parse_sse_block(&json).is_none(),
509                "{} should be explicitly ignored",
510                event_type
511            );
512        }
513    }
514
515    #[test]
516    fn test_parse_ignored_misc_events_return_none() {
517        for event_type in &[
518            "file.edited",
519            "file.watcher.updated",
520            "project.updated",
521            "vcs.branch.updated",
522            "todo.updated",
523            "mcp.tools.changed",
524            "lsp.updated",
525            "pty.created",
526            "pty.updated",
527            "pty.exited",
528            "pty.deleted",
529            "permission.updated",
530            "installation.updated",
531            "installation.update-available",
532        ] {
533            let json = format!(
534                r#"data: {{"type":"{}","properties":{{}}}}"#,
535                event_type
536            );
537            assert!(
538                parse_sse_block(&json).is_none(),
539                "{} should be explicitly ignored",
540                event_type
541            );
542        }
543    }
544
545    // ── truly unknown events ──────────────────────────────────────────
546
547    #[test]
548    fn test_parse_truly_unknown_event_returns_none() {
549        // An event type not in the ignore list and not handled → still returns None.
550        assert!(parse_sse_block(
551            r#"data: {"type":"some.future.event","properties":{}}"#
552        ).is_none());
553    }
554
555    #[test]
556    fn test_parse_no_type_field() {
557        assert!(parse_sse_block("data: {\"properties\":{}}").is_none());
558    }
559
560    #[test]
561    fn test_parse_missing_properties() {
562        let event = parse_sse_block("data: {\"type\":\"server.connected\"}");
563        assert!(matches!(event, Some(SseEvent::Connected)));
564    }
565}