Skip to main content

codex_helper_core/
notify.rs

1use std::collections::HashMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use serde::{Deserialize, Serialize};
8use tokio::time::sleep;
9
10use crate::config::{NotifyConfig, NotifyPolicyConfig, load_config, proxy_home_dir};
11
12#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "kebab-case")]
14enum CodexNotificationType {
15    AgentTurnComplete,
16    #[serde(other)]
17    Unknown,
18}
19
20#[derive(Debug, Clone, Deserialize, PartialEq)]
21#[serde(rename_all = "kebab-case")]
22struct CodexNotificationInput {
23    r#type: CodexNotificationType,
24    #[serde(default)]
25    thread_id: Option<String>,
26    #[serde(default)]
27    turn_id: Option<String>,
28    #[serde(default)]
29    cwd: Option<String>,
30    #[serde(default)]
31    input_messages: Option<Vec<String>>,
32    #[serde(default)]
33    last_assistant_message: Option<String>,
34}
35
36#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
37struct FinishedRequestLite {
38    #[serde(default)]
39    session_id: Option<String>,
40    #[serde(default)]
41    cwd: Option<String>,
42    service: String,
43    method: String,
44    path: String,
45    status_code: u16,
46    duration_ms: u64,
47    ended_at_ms: u64,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
51struct QueuedEvent {
52    thread_id: String,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    turn_id: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    cwd: Option<String>,
57    duration_ms: u64,
58    ended_at_ms: u64,
59    queued_at_ms: u64,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    last_assistant_preview: Option<String>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, Default)]
65struct NotifyState {
66    version: u32,
67    #[serde(default)]
68    pending: Vec<QueuedEvent>,
69    #[serde(default)]
70    last_toast_ms: Option<u64>,
71    #[serde(default)]
72    per_thread_last_toast_ms: HashMap<String, u64>,
73    #[serde(default)]
74    suppressed_since_last_toast: u64,
75}
76
77fn now_ms() -> u64 {
78    SystemTime::now()
79        .duration_since(UNIX_EPOCH)
80        .map(|d| d.as_millis() as u64)
81        .unwrap_or(0)
82}
83
84fn read_payload(notification_json: Option<String>) -> std::io::Result<Option<String>> {
85    if let Some(s) = notification_json {
86        return Ok(Some(s));
87    }
88
89    if atty::is(atty::Stream::Stdin) {
90        return Ok(None);
91    }
92
93    let mut buf = String::new();
94    std::io::stdin().read_to_string(&mut buf)?;
95    let buf = buf.trim().to_string();
96    if buf.is_empty() {
97        Ok(None)
98    } else {
99        Ok(Some(buf))
100    }
101}
102
103fn shorten(input: &str, max_chars: usize) -> String {
104    let s = input.trim();
105    if s.chars().count() <= max_chars {
106        return s.to_string();
107    }
108    let mut out: String = s.chars().take(max_chars).collect();
109    out.push_str("...");
110    out
111}
112
113fn notify_state_path() -> PathBuf {
114    proxy_home_dir().join("notify_state.json")
115}
116
117fn notify_lock_path() -> PathBuf {
118    proxy_home_dir().join("notify_state.lock")
119}
120
121fn codex_proxy_base_url_from_codex_config_text(text: &str) -> Option<String> {
122    let value: toml::Value = text.parse().ok()?;
123    let table = value.as_table()?;
124    let providers = table.get("model_providers")?.as_table()?;
125    let proxy = providers.get("codex_proxy")?.as_table()?;
126    proxy
127        .get("base_url")
128        .and_then(|v| v.as_str())
129        .map(|s| s.to_string())
130}
131
132async fn get_proxy_base_url() -> Option<String> {
133    if let Ok(v) = std::env::var("CODEX_HELPER_NOTIFY_PROXY_BASE_URL")
134        && !v.trim().is_empty()
135    {
136        return Some(v);
137    }
138
139    let codex_cfg_path = crate::config::codex_config_path();
140    let text = tokio::fs::read_to_string(codex_cfg_path).await.ok()?;
141    codex_proxy_base_url_from_codex_config_text(&text)
142}
143
144fn pick_best_recent_request(
145    thread_id: &str,
146    cwd: Option<&str>,
147    now_ms: u64,
148    policy: &NotifyPolicyConfig,
149    recent: &[FinishedRequestLite],
150) -> Option<FinishedRequestLite> {
151    let min_ended_at = now_ms.saturating_sub(policy.recent_search_window_ms);
152
153    let mut candidates = recent
154        .iter()
155        .filter(|r| r.service == "codex")
156        .filter(|r| r.ended_at_ms >= min_ended_at)
157        .filter(|r| r.session_id.as_deref() == Some(thread_id))
158        .cloned()
159        .collect::<Vec<_>>();
160
161    if candidates.is_empty()
162        && let Some(cwd) = cwd
163    {
164        candidates = recent
165            .iter()
166            .filter(|r| r.service == "codex")
167            .filter(|r| r.ended_at_ms >= min_ended_at)
168            .filter(|r| r.cwd.as_deref() == Some(cwd))
169            .cloned()
170            .collect::<Vec<_>>();
171    }
172
173    candidates
174        .into_iter()
175        .max_by_key(|r| (request_path_score(&r.path), r.ended_at_ms))
176}
177
178fn request_path_score(path: &str) -> u8 {
179    let p = path.to_ascii_lowercase();
180    if p.contains("responses") {
181        2
182    } else if p.contains("chat") {
183        1
184    } else {
185        0
186    }
187}
188
189async fn fetch_recent_finished(
190    proxy_base_url: &str,
191    timeout_ms: u64,
192) -> anyhow::Result<Vec<FinishedRequestLite>> {
193    let url = format!(
194        "{}/__codex_helper/status/recent?limit=200",
195        proxy_base_url.trim_end_matches('/')
196    );
197    let client = reqwest::Client::builder()
198        .timeout(Duration::from_millis(timeout_ms))
199        .build()?;
200    let resp = client.get(url).send().await?;
201    let status = resp.status();
202    if !status.is_success() {
203        anyhow::bail!("proxy status/recent returned {}", status.as_u16());
204    }
205    let items = resp.json::<Vec<FinishedRequestLite>>().await?;
206    Ok(items)
207}
208
209async fn queue_event_and_spawn_flush(
210    cfg: &NotifyConfig,
211    event: QueuedEvent,
212    force_toast: bool,
213) -> anyhow::Result<()> {
214    let _lock = acquire_notify_lock().await?;
215
216    let mut state = load_state().await.unwrap_or_default();
217    if state.version == 0 {
218        state.version = 1;
219    }
220
221    // Drop very old pending items to avoid unbounded growth.
222    let cutoff = now_ms().saturating_sub(30 * 60_000);
223    state.pending.retain(|e| e.queued_at_ms >= cutoff);
224    state.pending.push(event);
225    save_state(&state).await?;
226
227    if cfg.enabled && (cfg.system.enabled || (cfg.exec.enabled && !cfg.exec.command.is_empty())) {
228        spawn_flush_process(force_toast)?;
229    }
230    Ok(())
231}
232
233pub async fn handle_codex_notify(
234    notification_json: Option<String>,
235    no_toast: bool,
236    force_toast: bool,
237) -> anyhow::Result<()> {
238    let Some(payload) = read_payload(notification_json)? else {
239        return Ok(());
240    };
241
242    let cfg = load_config().await?;
243    let notify_cfg = cfg.notify;
244    let system_enabled =
245        force_toast || (notify_cfg.enabled && notify_cfg.system.enabled && !no_toast);
246    let exec_enabled =
247        notify_cfg.enabled && notify_cfg.exec.enabled && !notify_cfg.exec.command.is_empty();
248
249    if !system_enabled && !exec_enabled {
250        return Ok(());
251    }
252
253    let payload: CodexNotificationInput = match serde_json::from_str(&payload) {
254        Ok(v) => v,
255        Err(err) => {
256            eprintln!("codex-helper notify: failed to parse notification JSON: {err}");
257            return Ok(());
258        }
259    };
260
261    if payload.r#type != CodexNotificationType::AgentTurnComplete {
262        return Ok(());
263    }
264
265    let Some(thread_id) = payload
266        .thread_id
267        .as_deref()
268        .filter(|s| !s.trim().is_empty())
269    else {
270        return Ok(());
271    };
272
273    let proxy_base_url = match get_proxy_base_url().await {
274        Some(v) => v,
275        None => {
276            // Without proxy access we cannot compute duration_ms reliably, so skip (D strategy).
277            return Ok(());
278        }
279    };
280
281    let recent = match fetch_recent_finished(
282        &proxy_base_url,
283        notify_cfg.policy.recent_endpoint_timeout_ms,
284    )
285    .await
286    {
287        Ok(v) => v,
288        Err(_) => return Ok(()),
289    };
290
291    let now = now_ms();
292    let best = pick_best_recent_request(
293        thread_id,
294        payload.cwd.as_deref(),
295        now,
296        &notify_cfg.policy,
297        &recent,
298    );
299    let Some(best) = best else {
300        return Ok(());
301    };
302
303    if best.duration_ms < notify_cfg.policy.min_duration_ms {
304        return Ok(());
305    }
306
307    let preview = payload
308        .last_assistant_message
309        .as_deref()
310        .map(|s| shorten(s, 160))
311        .filter(|s| !s.trim().is_empty());
312
313    let event = QueuedEvent {
314        thread_id: thread_id.to_string(),
315        turn_id: payload.turn_id.clone(),
316        cwd: payload.cwd.clone(),
317        duration_ms: best.duration_ms,
318        ended_at_ms: best.ended_at_ms,
319        queued_at_ms: now,
320        last_assistant_preview: preview,
321    };
322
323    // If user forces toast for this invocation, we still rely on config for policy.
324    // We reuse cfg.notify for queue/flush; system notifications can be enabled only for this run.
325    let mut cfg_for_queue = notify_cfg.clone();
326    if force_toast {
327        cfg_for_queue.enabled = true;
328        cfg_for_queue.system.enabled = true;
329    }
330    if no_toast {
331        cfg_for_queue.system.enabled = false;
332    }
333
334    queue_event_and_spawn_flush(&cfg_for_queue, event, force_toast).await
335}
336
337pub async fn handle_codex_flush() -> anyhow::Result<()> {
338    let cfg = load_config().await?;
339    let notify_cfg = cfg.notify;
340    let force_toast = matches!(
341        std::env::var("CODEX_HELPER_NOTIFY_FORCE_TOAST"),
342        Ok(v) if v == "1" || v.eq_ignore_ascii_case("true")
343    );
344
345    if !notify_cfg.enabled && !force_toast {
346        return Ok(());
347    }
348
349    for _ in 0..20 {
350        let _lock = acquire_notify_lock().await?;
351        let mut state = load_state().await.unwrap_or_default();
352        if state.pending.is_empty() {
353            return Ok(());
354        }
355
356        let now = now_ms();
357        let first_pending = state
358            .pending
359            .iter()
360            .map(|e| e.queued_at_ms)
361            .min()
362            .unwrap_or(now);
363        let due_ms = first_pending.saturating_add(notify_cfg.policy.merge_window_ms);
364
365        if now < due_ms {
366            drop(state);
367            sleep(Duration::from_millis((due_ms - now).min(60_000))).await;
368            continue;
369        }
370
371        if let Some(last) = state.last_toast_ms
372            && now.saturating_sub(last) < notify_cfg.policy.global_cooldown_ms
373        {
374            let wait = notify_cfg.policy.global_cooldown_ms - now.saturating_sub(last);
375            drop(state);
376            sleep(Duration::from_millis(wait.min(60_000))).await;
377            continue;
378        }
379
380        // Apply per-thread cooldown and prepare toast batch.
381        state.pending.sort_by_key(|e| e.ended_at_ms);
382        let mut send: Vec<QueuedEvent> = Vec::new();
383        let mut suppressed = 0u64;
384        for e in state.pending.iter() {
385            let last = state.per_thread_last_toast_ms.get(&e.thread_id).copied();
386            if let Some(last) = last
387                && now.saturating_sub(last) < notify_cfg.policy.per_thread_cooldown_ms
388            {
389                suppressed = suppressed.saturating_add(1);
390                continue;
391            }
392            send.push(e.clone());
393        }
394
395        if send.is_empty() {
396            state.pending.clear();
397            state.suppressed_since_last_toast =
398                state.suppressed_since_last_toast.saturating_add(suppressed);
399            save_state(&state).await?;
400            return Ok(());
401        }
402
403        let system_enabled = notify_cfg.system.enabled || force_toast;
404        let exec_enabled = notify_cfg.exec.enabled && !notify_cfg.exec.command.is_empty();
405        if !system_enabled && !exec_enabled {
406            state.pending.clear();
407            save_state(&state).await?;
408            return Ok(());
409        }
410
411        let title = render_title(send.len(), suppressed, state.suppressed_since_last_toast);
412        let body = render_body(&send);
413        let aggregated = serde_json::json!({
414            "type": "codex-helper-merged-agent-turn-complete",
415            "count": send.len(),
416            "suppressed_in_batch": suppressed,
417            "suppressed_since_last_toast": state.suppressed_since_last_toast,
418            "generated_at_ms": now,
419            "events": send,
420        })
421        .to_string();
422
423        if system_enabled && let Err(err) = send_system_notification(&title, &body) {
424            eprintln!("codex-helper notify: failed to show system notification: {err}");
425        }
426        if exec_enabled && let Err(err) = run_exec_callback(&notify_cfg.exec.command, &aggregated) {
427            eprintln!("codex-helper notify: exec callback failed: {err}");
428        }
429
430        state.last_toast_ms = Some(now);
431        for e in send.iter() {
432            state
433                .per_thread_last_toast_ms
434                .insert(e.thread_id.clone(), now);
435        }
436        state.suppressed_since_last_toast = 0;
437        state.pending.clear();
438        save_state(&state).await?;
439        return Ok(());
440    }
441
442    Ok(())
443}
444
445fn render_title(count: usize, suppressed_in_batch: u64, suppressed_since_last: u64) -> String {
446    let mut title = if count == 1 {
447        "Codex: turn complete".to_string()
448    } else {
449        format!("Codex: {count} turns complete")
450    };
451    let total_suppressed = suppressed_in_batch.saturating_add(suppressed_since_last);
452    if total_suppressed > 0 {
453        title.push_str(&format!(" (+{total_suppressed} suppressed)"));
454    }
455    title
456}
457
458fn render_body(events: &[QueuedEvent]) -> String {
459    let mut lines: Vec<String> = Vec::new();
460    for e in events.iter().rev().take(3) {
461        let dur_s = (e.duration_ms as f64 / 1000.0).max(0.0);
462        let cwd = e
463            .cwd
464            .as_deref()
465            .and_then(|p| Path::new(p).file_name().and_then(|s| s.to_str()))
466            .unwrap_or("-");
467        if let Some(preview) = e.last_assistant_preview.as_deref() {
468            lines.push(format!("{cwd} ({dur_s:.1}s): {}", shorten(preview, 90)));
469        } else {
470            lines.push(format!("{cwd} ({dur_s:.1}s)"));
471        }
472    }
473    if events.len() > 3 {
474        lines.push(format!("+{} more", events.len() - 3));
475    }
476    lines.join("\n")
477}
478
479fn run_exec_callback(command: &[String], input_json: &str) -> anyhow::Result<()> {
480    if command.is_empty() {
481        return Ok(());
482    }
483    let mut cmd = Command::new(&command[0]);
484    if command.len() > 1 {
485        cmd.args(&command[1..]);
486    }
487    cmd.stdin(Stdio::piped())
488        .stdout(Stdio::null())
489        .stderr(Stdio::null());
490    let mut child = cmd.spawn()?;
491    if let Some(mut stdin) = child.stdin.take() {
492        use std::io::Write;
493        stdin.write_all(input_json.as_bytes())?;
494    }
495    let _ = child.wait();
496    Ok(())
497}
498
499fn spawn_flush_process(force_toast: bool) -> anyhow::Result<()> {
500    let exe = std::env::current_exe()?;
501    let mut cmd = Command::new(exe);
502    cmd.arg("notify").arg("flush-codex");
503    if force_toast {
504        cmd.env("CODEX_HELPER_NOTIFY_FORCE_TOAST", "1");
505    }
506    cmd.stdin(Stdio::null())
507        .stdout(Stdio::null())
508        .stderr(Stdio::null());
509
510    #[cfg(windows)]
511    {
512        use std::os::windows::process::CommandExt;
513        const DETACHED_PROCESS: u32 = 0x00000008;
514        const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;
515        cmd.creation_flags(DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP);
516    }
517
518    let _ = cmd.spawn()?;
519    Ok(())
520}
521
522async fn load_state() -> anyhow::Result<NotifyState> {
523    let path = notify_state_path();
524    if !path.exists() {
525        return Ok(NotifyState {
526            version: 1,
527            ..Default::default()
528        });
529    }
530    let bytes = tokio::fs::read(path).await?;
531    let mut state = serde_json::from_slice::<NotifyState>(&bytes)?;
532    if state.version == 0 {
533        state.version = 1;
534    }
535    Ok(state)
536}
537
538async fn save_state(state: &NotifyState) -> anyhow::Result<()> {
539    let dir = proxy_home_dir();
540    tokio::fs::create_dir_all(&dir).await?;
541    let path = notify_state_path();
542    let tmp = dir.join("notify_state.json.tmp");
543    let data = serde_json::to_vec_pretty(state)?;
544    tokio::fs::write(&tmp, &data).await?;
545    tokio::fs::rename(&tmp, &path).await?;
546    Ok(())
547}
548
549struct NotifyLockGuard {
550    path: PathBuf,
551}
552
553impl Drop for NotifyLockGuard {
554    fn drop(&mut self) {
555        let _ = std::fs::remove_file(&self.path);
556    }
557}
558
559async fn acquire_notify_lock() -> anyhow::Result<NotifyLockGuard> {
560    let path = notify_lock_path();
561    let dir = proxy_home_dir();
562    tokio::fs::create_dir_all(&dir).await?;
563
564    for _ in 0..200 {
565        match std::fs::OpenOptions::new()
566            .write(true)
567            .create_new(true)
568            .open(&path)
569        {
570            Ok(mut f) => {
571                use std::io::Write;
572                let _ = writeln!(f, "{}", now_ms());
573                return Ok(NotifyLockGuard { path });
574            }
575            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
576                // Best-effort stale lock cleanup (2 minutes).
577                if let Ok(meta) = std::fs::metadata(&path)
578                    && let Ok(modified) = meta.modified()
579                    && let Ok(age) = SystemTime::now().duration_since(modified)
580                    && age > Duration::from_secs(120)
581                {
582                    let _ = std::fs::remove_file(&path);
583                    continue;
584                }
585                sleep(Duration::from_millis(10)).await;
586            }
587            Err(err) => return Err(err.into()),
588        }
589    }
590
591    anyhow::bail!("failed to acquire notify lock: {:?}", path);
592}
593
594fn send_system_notification(title: &str, body: &str) -> anyhow::Result<()> {
595    #[cfg(windows)]
596    {
597        windows_toast::notify(title, body)?;
598        Ok(())
599    }
600    #[cfg(target_os = "macos")]
601    {
602        macos_notification::notify(title, body)?;
603        Ok(())
604    }
605    #[cfg(not(any(windows, target_os = "macos")))]
606    {
607        // No-op fallback: print a short line for non-supported platforms.
608        println!("{title}: {body}");
609        Ok(())
610    }
611}
612
613#[cfg(windows)]
614mod windows_toast {
615    use std::io;
616    use std::process::{Command, Stdio};
617
618    use base64::Engine as _;
619    use base64::engine::general_purpose::STANDARD as BASE64;
620
621    const APP_ID: &str = "codex-helper";
622    const POWERSHELL_EXE: &str = "powershell.exe";
623
624    pub fn notify(title: &str, body: &str) -> io::Result<()> {
625        let encoded_title = encode_argument(title);
626        let encoded_body = encode_argument(body);
627        let encoded_command = build_encoded_command(&encoded_title, &encoded_body);
628
629        let mut command = Command::new(POWERSHELL_EXE);
630        command
631            .arg("-NoProfile")
632            .arg("-NoLogo")
633            .arg("-EncodedCommand")
634            .arg(encoded_command)
635            .stdin(Stdio::null())
636            .stdout(Stdio::null())
637            .stderr(Stdio::null());
638
639        let status = command.status()?;
640        if status.success() {
641            Ok(())
642        } else {
643            Err(io::Error::other(format!(
644                "{POWERSHELL_EXE} exited with status {status}"
645            )))
646        }
647    }
648
649    fn build_encoded_command(encoded_title: &str, encoded_body: &str) -> String {
650        let script = build_ps_script(encoded_title, encoded_body);
651        encode_script_for_powershell(&script)
652    }
653
654    fn build_ps_script(encoded_title: &str, encoded_body: &str) -> String {
655        format!(
656            r#"
657$encoding = [System.Text.Encoding]::UTF8
658$titleText = $encoding.GetString([System.Convert]::FromBase64String("{encoded_title}"))
659$bodyText = $encoding.GetString([System.Convert]::FromBase64String("{encoded_body}"))
660[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null
661$doc = [Windows.UI.Notifications.ToastNotificationManager]::GetTemplateContent([Windows.UI.Notifications.ToastTemplateType]::ToastText02)
662$textNodes = $doc.GetElementsByTagName("text")
663$textNodes.Item(0).AppendChild($doc.CreateTextNode($titleText)) | Out-Null
664$textNodes.Item(1).AppendChild($doc.CreateTextNode($bodyText)) | Out-Null
665$toast = [Windows.UI.Notifications.ToastNotification]::new($doc)
666[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier('{app_id}').Show($toast)
667"#,
668            app_id = APP_ID
669        )
670    }
671
672    fn encode_script_for_powershell(script: &str) -> String {
673        let mut wide: Vec<u8> = Vec::with_capacity((script.len() + 1) * 2);
674        for unit in script.encode_utf16() {
675            wide.extend_from_slice(&unit.to_le_bytes());
676        }
677        BASE64.encode(wide)
678    }
679
680    fn encode_argument(value: &str) -> String {
681        BASE64.encode(escape_for_xml(value))
682    }
683
684    fn escape_for_xml(input: &str) -> String {
685        let mut escaped = String::with_capacity(input.len());
686        for ch in input.chars() {
687            match ch {
688                '&' => escaped.push_str("&amp;"),
689                '<' => escaped.push_str("&lt;"),
690                '>' => escaped.push_str("&gt;"),
691                '"' => escaped.push_str("&quot;"),
692                '\'' => escaped.push_str("&apos;"),
693                _ => escaped.push(ch),
694            }
695        }
696        escaped
697    }
698
699    #[cfg(test)]
700    mod tests {
701        use super::escape_for_xml;
702
703        #[test]
704        fn escapes_xml_entities() {
705            assert_eq!(escape_for_xml("a & b"), "a &amp; b");
706            assert_eq!(escape_for_xml("5 > 3"), "5 &gt; 3");
707            assert_eq!(escape_for_xml("<tag>"), "&lt;tag&gt;");
708            assert_eq!(escape_for_xml("\"quoted\""), "&quot;quoted&quot;");
709            assert_eq!(escape_for_xml("single 'quote'"), "single &apos;quote&apos;");
710        }
711    }
712}
713
714#[cfg(target_os = "macos")]
715mod macos_notification {
716    use std::io;
717    use std::process::{Command, Stdio};
718
719    pub fn notify(title: &str, body: &str) -> io::Result<()> {
720        let script = format!(
721            "display notification {} with title {}",
722            apple_quote(body),
723            apple_quote(title)
724        );
725        let status = Command::new("osascript")
726            .arg("-e")
727            .arg(script)
728            .stdin(Stdio::null())
729            .stdout(Stdio::null())
730            .stderr(Stdio::null())
731            .status()?;
732        if status.success() {
733            Ok(())
734        } else {
735            Err(io::Error::other(format!(
736                "osascript exited with status {status}"
737            )))
738        }
739    }
740
741    fn apple_quote(s: &str) -> String {
742        format!("\"{}\"", s.replace('\\', "\\\\").replace('\"', "\\\""))
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749
750    #[test]
751    fn parses_agent_turn_complete_payload_with_thread_id() {
752        let payload = r#"{
753            "type": "agent-turn-complete",
754            "thread-id": "th1",
755            "turn-id": "t1",
756            "cwd": "/tmp/x",
757            "input-messages": ["run tests"],
758            "last-assistant-message": "ok"
759        }"#;
760        let parsed: CodexNotificationInput = serde_json::from_str(payload).expect("parse");
761        assert_eq!(parsed.r#type, CodexNotificationType::AgentTurnComplete);
762        assert_eq!(parsed.thread_id.as_deref(), Some("th1"));
763        assert_eq!(parsed.turn_id.as_deref(), Some("t1"));
764    }
765
766    #[test]
767    fn picks_best_recent_request_prefers_responses_path() {
768        let policy = NotifyPolicyConfig::default();
769        let now = 1_000_000u64;
770        let recent = vec![
771            FinishedRequestLite {
772                session_id: Some("th1".to_string()),
773                cwd: Some("/p".to_string()),
774                service: "codex".to_string(),
775                method: "POST".to_string(),
776                path: "/v1/chat/completions".to_string(),
777                status_code: 200,
778                duration_ms: 10_000,
779                ended_at_ms: now - 1_000,
780            },
781            FinishedRequestLite {
782                session_id: Some("th1".to_string()),
783                cwd: Some("/p".to_string()),
784                service: "codex".to_string(),
785                method: "POST".to_string(),
786                path: "/v1/responses".to_string(),
787                status_code: 200,
788                duration_ms: 20_000,
789                ended_at_ms: now - 10_000,
790            },
791        ];
792        let best =
793            pick_best_recent_request("th1", Some("/p"), now, &policy, &recent).expect("best");
794        assert_eq!(best.path, "/v1/responses");
795    }
796}