1use anyhow::Result;
4use base64::{engine::general_purpose::STANDARD, Engine as _};
5use reqwest::Client;
6use std::time::Duration;
7use tokio::time::sleep;
8use tokio_util::sync::CancellationToken;
9
10use crate::approval::types::{PermissionRequest, QuestionRequest};
11
12#[derive(Debug, Clone)]
14pub enum SseEvent {
15 PermissionAsked(PermissionRequest),
16 PermissionReplied {
17 session_id: String,
18 request_id: String,
19 reply: String,
20 },
21 QuestionAsked(QuestionRequest),
22 QuestionReplied {
23 session_id: String,
24 request_id: String,
25 answers: Vec<Vec<String>>,
26 },
27 QuestionRejected {
28 session_id: String,
29 request_id: String,
30 },
31 SessionStatus {
37 session_id: String,
38 busy: bool,
41 },
42 Connected,
43 Disconnected(Option<String>),
44}
45
46pub struct OpenCodeEvents {
48 base_url: String,
49 password: Option<String>,
50 sender: tokio::sync::mpsc::UnboundedSender<SseEvent>,
51}
52
53impl OpenCodeEvents {
54 pub fn new(
55 base_url: String,
56 password: Option<String>,
57 sender: tokio::sync::mpsc::UnboundedSender<SseEvent>,
58 ) -> Self {
59 OpenCodeEvents {
60 base_url,
61 password,
62 sender,
63 }
64 }
65
66 pub fn start(&self, cancel: CancellationToken) -> tokio::task::JoinHandle<()> {
70 let base_url = self.base_url.clone();
71 let password = self.password.clone();
72 let sender = self.sender.clone();
73
74 tokio::spawn(async move {
75 let mut delay_secs: u64 = 1;
76
77 loop {
78 if cancel.is_cancelled() {
79 break;
80 }
81
82 match connect_and_stream(&base_url, &password, &sender, &cancel).await {
83 Ok(()) => {
84 break;
86 }
87 Err(e) => {
88 let _ = sender.send(SseEvent::Disconnected(Some(e.to_string())));
89
90 if cancel.is_cancelled() {
91 break;
92 }
93
94 tokio::select! {
98 _ = cancel.cancelled() => break,
99 _ = sleep(Duration::from_secs(delay_secs)) => {}
100 }
101 delay_secs = next_reconnect_delay(delay_secs);
102 }
103 }
104 }
105 })
106 }
107}
108
109async fn connect_and_stream(
111 base_url: &str,
112 password: &Option<String>,
113 sender: &tokio::sync::mpsc::UnboundedSender<SseEvent>,
114 cancel: &CancellationToken,
115) -> Result<()> {
116 let url = format!("{}/event", base_url);
117 let client = Client::new();
118
119 let mut req = client
120 .get(&url)
121 .header("Accept", "text/event-stream")
122 .header("Cache-Control", "no-cache");
123
124 if let Some(pw) = password {
125 let creds = format!(":{}", pw);
126 req = req.header("Authorization", format!("Basic {}", STANDARD.encode(creds)));
127 }
128
129 let response = req.send().await?;
130
131 if !response.status().is_success() {
132 anyhow::bail!("SSE connection failed with status {}", response.status());
133 }
134
135 let _ = sender.send(SseEvent::Connected);
137
138 use futures::StreamExt;
139 let mut stream = response.bytes_stream();
140
141 let mut buffer = String::new();
142
143 loop {
144 tokio::select! {
145 _ = cancel.cancelled() => {
146 return Ok(());
147 }
148 chunk = stream.next() => {
149 match chunk {
150 None => {
151 anyhow::bail!("SSE stream ended unexpectedly");
153 }
154 Some(Err(e)) => {
155 anyhow::bail!("SSE stream error: {}", e);
156 }
157 Some(Ok(bytes)) => {
158 let text = String::from_utf8_lossy(&bytes);
159 buffer.push_str(&text);
160
161 while let Some(pos) = buffer.find("\n\n") {
163 let block = buffer[..pos].to_string();
164 buffer = buffer[pos + 2..].to_string();
165 if let Some(event) = parse_sse_block(&block) {
166 let _ = sender.send(event);
167 }
168 }
169 }
170 }
171 }
172 }
173 }
174}
175
176pub fn parse_sse_block(block: &str) -> Option<SseEvent> {
181 let data = block
183 .lines()
184 .find(|line| line.starts_with("data:"))
185 .map(|line| line.trim_start_matches("data:").trim());
186
187 let data = match data {
188 Some(d) if !d.is_empty() => d,
189 _ => return None,
190 };
191
192 let json: serde_json::Value = match serde_json::from_str(data) {
194 Ok(v) => v,
195 Err(_) => return None, };
197
198 let event_type = json.get("type").and_then(|v| v.as_str())?;
199
200 let props = json
201 .get("properties")
202 .cloned()
203 .unwrap_or(serde_json::Value::Null);
204
205 match event_type {
206 "server.connected" => Some(SseEvent::Connected),
207 "server.heartbeat" => None,
208 "permission.asked" => {
209 serde_json::from_value::<PermissionRequest>(props)
210 .ok()
211 .map(SseEvent::PermissionAsked)
212 }
213 "permission.replied" => {
214 let session_id = props
215 .get("session_id")
216 .and_then(|v| v.as_str())
217 .unwrap_or("")
218 .to_string();
219 let request_id = props
220 .get("request_id")
221 .and_then(|v| v.as_str())
222 .unwrap_or("")
223 .to_string();
224 let reply = props
225 .get("reply")
226 .and_then(|v| v.as_str())
227 .unwrap_or("")
228 .to_string();
229 Some(SseEvent::PermissionReplied {
230 session_id,
231 request_id,
232 reply,
233 })
234 }
235 "question.asked" => {
236 serde_json::from_value::<QuestionRequest>(props)
237 .ok()
238 .map(SseEvent::QuestionAsked)
239 }
240 "question.replied" => {
241 let session_id = props
242 .get("session_id")
243 .and_then(|v| v.as_str())
244 .unwrap_or("")
245 .to_string();
246 let request_id = props
247 .get("request_id")
248 .and_then(|v| v.as_str())
249 .unwrap_or("")
250 .to_string();
251 let answers = props
252 .get("answers")
253 .and_then(|v| serde_json::from_value::<Vec<Vec<String>>>(v.clone()).ok())
254 .unwrap_or_default();
255 Some(SseEvent::QuestionReplied {
256 session_id,
257 request_id,
258 answers,
259 })
260 }
261 "question.rejected" => {
262 let session_id = props
263 .get("session_id")
264 .and_then(|v| v.as_str())
265 .unwrap_or("")
266 .to_string();
267 let request_id = props
268 .get("request_id")
269 .and_then(|v| v.as_str())
270 .unwrap_or("")
271 .to_string();
272 Some(SseEvent::QuestionRejected {
273 session_id,
274 request_id,
275 })
276 }
277 "session.updated" | "session.created" | "session.deleted" | "session.diff"
279 | "session.error" | "session.idle" | "session.compacted"
280 | "message.updated" | "message.removed" | "message.part.updated"
281 | "message.part.delta" | "message.part.removed"
282 | "file.edited" | "file.watcher.updated"
283 | "project.updated" | "vcs.branch.updated"
284 | "todo.updated" | "mcp.tools.changed" | "lsp.updated"
285 | "pty.created" | "pty.updated" | "pty.exited" | "pty.deleted"
286 | "permission.updated"
287 | "installation.updated" | "installation.update-available" => None,
288 "session.status" => {
289 let session_id = props
290 .get("sessionID")
291 .or_else(|| props.get("session_id"))
292 .and_then(|v| v.as_str())
293 .unwrap_or("")
294 .to_string();
295 let status_type = props
297 .get("status")
298 .and_then(|v| v.get("type"))
299 .and_then(|v| v.as_str())
300 .unwrap_or("");
301 let busy = status_type == "busy";
302 Some(SseEvent::SessionStatus { session_id, busy })
303 }
304 other => {
305 if is_debug() {
306 eprintln!("[voice:debug] Unknown SSE event type: {}", other);
307 }
308 None
309 }
310 }
311}
312
313fn is_debug() -> bool {
315 std::env::var("VOICE_DEBUG").map_or(false, |v| v == "1" || v == "true")
316}
317
318pub fn next_reconnect_delay(current: u64) -> u64 {
320 (current * 2).min(30)
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_parse_connected() {
329 let event = parse_sse_block(
330 "data: {\"type\":\"server.connected\",\"properties\":{}}",
331 );
332 assert!(matches!(event, Some(SseEvent::Connected)));
333 }
334
335 #[test]
336 fn test_parse_heartbeat_ignored() {
337 assert!(parse_sse_block(
338 "data: {\"type\":\"server.heartbeat\",\"properties\":{}}"
339 ).is_none());
340 }
341
342 #[test]
343 fn test_parse_malformed_json() {
344 assert!(parse_sse_block("data: not-valid-json").is_none());
345 }
346
347 #[test]
348 fn test_parse_empty_data() {
349 assert!(parse_sse_block("event: ping\n").is_none());
350 }
351
352 #[test]
353 fn test_parse_unknown_type() {
354 assert!(parse_sse_block(
355 "data: {\"type\":\"unknown.event\",\"properties\":{}}"
356 ).is_none());
357 }
358
359 #[test]
360 fn test_parse_permission_asked() {
361 let json = r#"data: {"type":"permission.asked","properties":{"id":"test-id","session_id":"sess","permission":"bash","patterns":[],"metadata":{},"always":[],"tool":null}}"#;
362 let event = parse_sse_block(json).unwrap();
363 assert!(matches!(event, SseEvent::PermissionAsked(ref req) if req.id == "test-id"));
364 }
365
366 #[test]
367 fn test_parse_question_asked() {
368 let json = r#"data: {"type":"question.asked","properties":{"id":"q1","session_id":"s1","questions":[{"question":"What?","header":"H","options":[],"multiple":false,"custom":true}]}}"#;
369 let event = parse_sse_block(json).unwrap();
370 assert!(matches!(event, SseEvent::QuestionAsked(ref req) if req.id == "q1"));
371 }
372
373 #[test]
374 fn test_parse_permission_replied() {
375 let json = r#"data: {"type":"permission.replied","properties":{"session_id":"s1","request_id":"r1","reply":"once"}}"#;
376 let event = parse_sse_block(json).unwrap();
377 assert!(
378 matches!(event, SseEvent::PermissionReplied { ref session_id, ref request_id, ref reply }
379 if session_id == "s1" && request_id == "r1" && reply == "once")
380 );
381 }
382
383 #[test]
384 fn test_parse_question_replied() {
385 let json = r#"data: {"type":"question.replied","properties":{"session_id":"s1","request_id":"r1","answers":[["yes","no"]]}}"#;
386 let event = parse_sse_block(json).unwrap();
387 assert!(
388 matches!(event, SseEvent::QuestionReplied { ref session_id, ref request_id, ref answers }
389 if session_id == "s1" && request_id == "r1" && answers == &vec![vec!["yes".to_string(), "no".to_string()]])
390 );
391 }
392
393 #[test]
394 fn test_parse_question_rejected() {
395 let json = r#"data: {"type":"question.rejected","properties":{"session_id":"s1","request_id":"r1"}}"#;
396 let event = parse_sse_block(json).unwrap();
397 assert!(
398 matches!(event, SseEvent::QuestionRejected { ref session_id, ref request_id }
399 if session_id == "s1" && request_id == "r1")
400 );
401 }
402
403 #[test]
404 fn test_backoff_calculation() {
405 let mut delay: u64 = 1;
406 let sequence: Vec<u64> = (0..8)
407 .map(|_| {
408 let d = delay;
409 delay = next_reconnect_delay(delay);
410 d
411 })
412 .collect();
413 assert_eq!(sequence, vec![1, 2, 4, 8, 16, 30, 30, 30]);
414 }
415
416 #[test]
419 fn test_parse_session_status_busy() {
420 let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"busy"}}}"#;
421 let event = parse_sse_block(json).unwrap();
422 assert!(
423 matches!(event, SseEvent::SessionStatus { ref session_id, busy }
424 if session_id == "s1" && busy)
425 );
426 }
427
428 #[test]
429 fn test_parse_session_status_idle() {
430 let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"idle"}}}"#;
431 let event = parse_sse_block(json).unwrap();
432 assert!(
433 matches!(event, SseEvent::SessionStatus { ref session_id, busy }
434 if session_id == "s1" && !busy)
435 );
436 }
437
438 #[test]
439 fn test_parse_session_status_retry() {
440 let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1","status":{"type":"retry"}}}"#;
442 let event = parse_sse_block(json).unwrap();
443 assert!(
444 matches!(event, SseEvent::SessionStatus { busy, .. } if !busy)
445 );
446 }
447
448 #[test]
449 fn test_parse_session_status_missing_status_field() {
450 let json = r#"data: {"type":"session.status","properties":{"sessionID":"s1"}}"#;
452 let event = parse_sse_block(json).unwrap();
453 assert!(
454 matches!(event, SseEvent::SessionStatus { busy, .. } if !busy)
455 );
456 }
457
458 #[test]
459 fn test_parse_session_status_snake_case_session_id() {
460 let json = r#"data: {"type":"session.status","properties":{"session_id":"s2","status":{"type":"busy"}}}"#;
462 let event = parse_sse_block(json).unwrap();
463 assert!(
464 matches!(event, SseEvent::SessionStatus { ref session_id, busy }
465 if session_id == "s2" && busy)
466 );
467 }
468
469 #[test]
472 fn test_parse_ignored_session_events_return_none() {
473 for event_type in &[
474 "session.updated",
475 "session.created",
476 "session.deleted",
477 "session.diff",
478 "session.error",
479 "session.idle",
480 "session.compacted",
481 ] {
482 let json = format!(
483 r#"data: {{"type":"{}","properties":{{}}}}"#,
484 event_type
485 );
486 assert!(
487 parse_sse_block(&json).is_none(),
488 "{} should be explicitly ignored",
489 event_type
490 );
491 }
492 }
493
494 #[test]
495 fn test_parse_ignored_message_events_return_none() {
496 for event_type in &[
497 "message.updated",
498 "message.removed",
499 "message.part.updated",
500 "message.part.delta",
501 "message.part.removed",
502 ] {
503 let json = format!(
504 r#"data: {{"type":"{}","properties":{{}}}}"#,
505 event_type
506 );
507 assert!(
508 parse_sse_block(&json).is_none(),
509 "{} should be explicitly ignored",
510 event_type
511 );
512 }
513 }
514
515 #[test]
516 fn test_parse_ignored_misc_events_return_none() {
517 for event_type in &[
518 "file.edited",
519 "file.watcher.updated",
520 "project.updated",
521 "vcs.branch.updated",
522 "todo.updated",
523 "mcp.tools.changed",
524 "lsp.updated",
525 "pty.created",
526 "pty.updated",
527 "pty.exited",
528 "pty.deleted",
529 "permission.updated",
530 "installation.updated",
531 "installation.update-available",
532 ] {
533 let json = format!(
534 r#"data: {{"type":"{}","properties":{{}}}}"#,
535 event_type
536 );
537 assert!(
538 parse_sse_block(&json).is_none(),
539 "{} should be explicitly ignored",
540 event_type
541 );
542 }
543 }
544
545 #[test]
548 fn test_parse_truly_unknown_event_returns_none() {
549 assert!(parse_sse_block(
551 r#"data: {"type":"some.future.event","properties":{}}"#
552 ).is_none());
553 }
554
555 #[test]
556 fn test_parse_no_type_field() {
557 assert!(parse_sse_block("data: {\"properties\":{}}").is_none());
558 }
559
560 #[test]
561 fn test_parse_missing_properties() {
562 let event = parse_sse_block("data: {\"type\":\"server.connected\"}");
563 assert!(matches!(event, Some(SseEvent::Connected)));
564 }
565}