1use std::collections::HashMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use serde::{Deserialize, Serialize};
8use tokio::time::sleep;
9
10use crate::config::{NotifyConfig, NotifyPolicyConfig, load_config, proxy_home_dir};
11
12#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "kebab-case")]
14enum CodexNotificationType {
15 AgentTurnComplete,
16 #[serde(other)]
17 Unknown,
18}
19
20#[derive(Debug, Clone, Deserialize, PartialEq)]
21#[serde(rename_all = "kebab-case")]
22struct CodexNotificationInput {
23 r#type: CodexNotificationType,
24 #[serde(default)]
25 thread_id: Option<String>,
26 #[serde(default)]
27 turn_id: Option<String>,
28 #[serde(default)]
29 cwd: Option<String>,
30 #[serde(default)]
31 input_messages: Option<Vec<String>>,
32 #[serde(default)]
33 last_assistant_message: Option<String>,
34}
35
36#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
37struct FinishedRequestLite {
38 #[serde(default)]
39 session_id: Option<String>,
40 #[serde(default)]
41 cwd: Option<String>,
42 service: String,
43 method: String,
44 path: String,
45 status_code: u16,
46 duration_ms: u64,
47 ended_at_ms: u64,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
51struct QueuedEvent {
52 thread_id: String,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 turn_id: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 cwd: Option<String>,
57 duration_ms: u64,
58 ended_at_ms: u64,
59 queued_at_ms: u64,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 last_assistant_preview: Option<String>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, Default)]
65struct NotifyState {
66 version: u32,
67 #[serde(default)]
68 pending: Vec<QueuedEvent>,
69 #[serde(default)]
70 last_toast_ms: Option<u64>,
71 #[serde(default)]
72 per_thread_last_toast_ms: HashMap<String, u64>,
73 #[serde(default)]
74 suppressed_since_last_toast: u64,
75}
76
77fn now_ms() -> u64 {
78 SystemTime::now()
79 .duration_since(UNIX_EPOCH)
80 .map(|d| d.as_millis() as u64)
81 .unwrap_or(0)
82}
83
84fn read_payload(notification_json: Option<String>) -> std::io::Result<Option<String>> {
85 if let Some(s) = notification_json {
86 return Ok(Some(s));
87 }
88
89 if atty::is(atty::Stream::Stdin) {
90 return Ok(None);
91 }
92
93 let mut buf = String::new();
94 std::io::stdin().read_to_string(&mut buf)?;
95 let buf = buf.trim().to_string();
96 if buf.is_empty() {
97 Ok(None)
98 } else {
99 Ok(Some(buf))
100 }
101}
102
103fn shorten(input: &str, max_chars: usize) -> String {
104 let s = input.trim();
105 if s.chars().count() <= max_chars {
106 return s.to_string();
107 }
108 let mut out: String = s.chars().take(max_chars).collect();
109 out.push_str("...");
110 out
111}
112
113fn notify_state_path() -> PathBuf {
114 proxy_home_dir().join("notify_state.json")
115}
116
117fn notify_lock_path() -> PathBuf {
118 proxy_home_dir().join("notify_state.lock")
119}
120
121fn codex_proxy_base_url_from_codex_config_text(text: &str) -> Option<String> {
122 let value: toml::Value = text.parse().ok()?;
123 let table = value.as_table()?;
124 let providers = table.get("model_providers")?.as_table()?;
125 let proxy = providers.get("codex_proxy")?.as_table()?;
126 proxy
127 .get("base_url")
128 .and_then(|v| v.as_str())
129 .map(|s| s.to_string())
130}
131
132async fn get_proxy_base_url() -> Option<String> {
133 if let Ok(v) = std::env::var("CODEX_HELPER_NOTIFY_PROXY_BASE_URL")
134 && !v.trim().is_empty()
135 {
136 return Some(v);
137 }
138
139 let codex_cfg_path = crate::config::codex_config_path();
140 let text = tokio::fs::read_to_string(codex_cfg_path).await.ok()?;
141 codex_proxy_base_url_from_codex_config_text(&text)
142}
143
144fn pick_best_recent_request(
145 thread_id: &str,
146 cwd: Option<&str>,
147 now_ms: u64,
148 policy: &NotifyPolicyConfig,
149 recent: &[FinishedRequestLite],
150) -> Option<FinishedRequestLite> {
151 let min_ended_at = now_ms.saturating_sub(policy.recent_search_window_ms);
152
153 let mut candidates = recent
154 .iter()
155 .filter(|r| r.service == "codex")
156 .filter(|r| r.ended_at_ms >= min_ended_at)
157 .filter(|r| r.session_id.as_deref() == Some(thread_id))
158 .cloned()
159 .collect::<Vec<_>>();
160
161 if candidates.is_empty()
162 && let Some(cwd) = cwd
163 {
164 candidates = recent
165 .iter()
166 .filter(|r| r.service == "codex")
167 .filter(|r| r.ended_at_ms >= min_ended_at)
168 .filter(|r| r.cwd.as_deref() == Some(cwd))
169 .cloned()
170 .collect::<Vec<_>>();
171 }
172
173 candidates
174 .into_iter()
175 .max_by_key(|r| (request_path_score(&r.path), r.ended_at_ms))
176}
177
178fn request_path_score(path: &str) -> u8 {
179 let p = path.to_ascii_lowercase();
180 if p.contains("responses") {
181 2
182 } else if p.contains("chat") {
183 1
184 } else {
185 0
186 }
187}
188
189async fn fetch_recent_finished(
190 proxy_base_url: &str,
191 timeout_ms: u64,
192) -> anyhow::Result<Vec<FinishedRequestLite>> {
193 let url = format!(
194 "{}/__codex_helper/status/recent?limit=200",
195 proxy_base_url.trim_end_matches('/')
196 );
197 let client = reqwest::Client::builder()
198 .timeout(Duration::from_millis(timeout_ms))
199 .build()?;
200 let resp = client.get(url).send().await?;
201 let status = resp.status();
202 if !status.is_success() {
203 anyhow::bail!("proxy status/recent returned {}", status.as_u16());
204 }
205 let items = resp.json::<Vec<FinishedRequestLite>>().await?;
206 Ok(items)
207}
208
209async fn queue_event_and_spawn_flush(
210 cfg: &NotifyConfig,
211 event: QueuedEvent,
212 force_toast: bool,
213) -> anyhow::Result<()> {
214 let _lock = acquire_notify_lock().await?;
215
216 let mut state = load_state().await.unwrap_or_default();
217 if state.version == 0 {
218 state.version = 1;
219 }
220
221 let cutoff = now_ms().saturating_sub(30 * 60_000);
223 state.pending.retain(|e| e.queued_at_ms >= cutoff);
224 state.pending.push(event);
225 save_state(&state).await?;
226
227 if cfg.enabled && (cfg.system.enabled || (cfg.exec.enabled && !cfg.exec.command.is_empty())) {
228 spawn_flush_process(force_toast)?;
229 }
230 Ok(())
231}
232
233pub async fn handle_codex_notify(
234 notification_json: Option<String>,
235 no_toast: bool,
236 force_toast: bool,
237) -> anyhow::Result<()> {
238 let Some(payload) = read_payload(notification_json)? else {
239 return Ok(());
240 };
241
242 let cfg = load_config().await?;
243 let notify_cfg = cfg.notify;
244 let system_enabled =
245 force_toast || (notify_cfg.enabled && notify_cfg.system.enabled && !no_toast);
246 let exec_enabled =
247 notify_cfg.enabled && notify_cfg.exec.enabled && !notify_cfg.exec.command.is_empty();
248
249 if !system_enabled && !exec_enabled {
250 return Ok(());
251 }
252
253 let payload: CodexNotificationInput = match serde_json::from_str(&payload) {
254 Ok(v) => v,
255 Err(err) => {
256 eprintln!("codex-helper notify: failed to parse notification JSON: {err}");
257 return Ok(());
258 }
259 };
260
261 if payload.r#type != CodexNotificationType::AgentTurnComplete {
262 return Ok(());
263 }
264
265 let Some(thread_id) = payload
266 .thread_id
267 .as_deref()
268 .filter(|s| !s.trim().is_empty())
269 else {
270 return Ok(());
271 };
272
273 let proxy_base_url = match get_proxy_base_url().await {
274 Some(v) => v,
275 None => {
276 return Ok(());
278 }
279 };
280
281 let recent = match fetch_recent_finished(
282 &proxy_base_url,
283 notify_cfg.policy.recent_endpoint_timeout_ms,
284 )
285 .await
286 {
287 Ok(v) => v,
288 Err(_) => return Ok(()),
289 };
290
291 let now = now_ms();
292 let best = pick_best_recent_request(
293 thread_id,
294 payload.cwd.as_deref(),
295 now,
296 ¬ify_cfg.policy,
297 &recent,
298 );
299 let Some(best) = best else {
300 return Ok(());
301 };
302
303 if best.duration_ms < notify_cfg.policy.min_duration_ms {
304 return Ok(());
305 }
306
307 let preview = payload
308 .last_assistant_message
309 .as_deref()
310 .map(|s| shorten(s, 160))
311 .filter(|s| !s.trim().is_empty());
312
313 let event = QueuedEvent {
314 thread_id: thread_id.to_string(),
315 turn_id: payload.turn_id.clone(),
316 cwd: payload.cwd.clone(),
317 duration_ms: best.duration_ms,
318 ended_at_ms: best.ended_at_ms,
319 queued_at_ms: now,
320 last_assistant_preview: preview,
321 };
322
323 let mut cfg_for_queue = notify_cfg.clone();
326 if force_toast {
327 cfg_for_queue.enabled = true;
328 cfg_for_queue.system.enabled = true;
329 }
330 if no_toast {
331 cfg_for_queue.system.enabled = false;
332 }
333
334 queue_event_and_spawn_flush(&cfg_for_queue, event, force_toast).await
335}
336
337pub async fn handle_codex_flush() -> anyhow::Result<()> {
338 let cfg = load_config().await?;
339 let notify_cfg = cfg.notify;
340 let force_toast = matches!(
341 std::env::var("CODEX_HELPER_NOTIFY_FORCE_TOAST"),
342 Ok(v) if v == "1" || v.eq_ignore_ascii_case("true")
343 );
344
345 if !notify_cfg.enabled && !force_toast {
346 return Ok(());
347 }
348
349 for _ in 0..20 {
350 let _lock = acquire_notify_lock().await?;
351 let mut state = load_state().await.unwrap_or_default();
352 if state.pending.is_empty() {
353 return Ok(());
354 }
355
356 let now = now_ms();
357 let first_pending = state
358 .pending
359 .iter()
360 .map(|e| e.queued_at_ms)
361 .min()
362 .unwrap_or(now);
363 let due_ms = first_pending.saturating_add(notify_cfg.policy.merge_window_ms);
364
365 if now < due_ms {
366 drop(state);
367 sleep(Duration::from_millis((due_ms - now).min(60_000))).await;
368 continue;
369 }
370
371 if let Some(last) = state.last_toast_ms
372 && now.saturating_sub(last) < notify_cfg.policy.global_cooldown_ms
373 {
374 let wait = notify_cfg.policy.global_cooldown_ms - now.saturating_sub(last);
375 drop(state);
376 sleep(Duration::from_millis(wait.min(60_000))).await;
377 continue;
378 }
379
380 state.pending.sort_by_key(|e| e.ended_at_ms);
382 let mut send: Vec<QueuedEvent> = Vec::new();
383 let mut suppressed = 0u64;
384 for e in state.pending.iter() {
385 let last = state.per_thread_last_toast_ms.get(&e.thread_id).copied();
386 if let Some(last) = last
387 && now.saturating_sub(last) < notify_cfg.policy.per_thread_cooldown_ms
388 {
389 suppressed = suppressed.saturating_add(1);
390 continue;
391 }
392 send.push(e.clone());
393 }
394
395 if send.is_empty() {
396 state.pending.clear();
397 state.suppressed_since_last_toast =
398 state.suppressed_since_last_toast.saturating_add(suppressed);
399 save_state(&state).await?;
400 return Ok(());
401 }
402
403 let system_enabled = notify_cfg.system.enabled || force_toast;
404 let exec_enabled = notify_cfg.exec.enabled && !notify_cfg.exec.command.is_empty();
405 if !system_enabled && !exec_enabled {
406 state.pending.clear();
407 save_state(&state).await?;
408 return Ok(());
409 }
410
411 let title = render_title(send.len(), suppressed, state.suppressed_since_last_toast);
412 let body = render_body(&send);
413 let aggregated = serde_json::json!({
414 "type": "codex-helper-merged-agent-turn-complete",
415 "count": send.len(),
416 "suppressed_in_batch": suppressed,
417 "suppressed_since_last_toast": state.suppressed_since_last_toast,
418 "generated_at_ms": now,
419 "events": send,
420 })
421 .to_string();
422
423 if system_enabled && let Err(err) = send_system_notification(&title, &body) {
424 eprintln!("codex-helper notify: failed to show system notification: {err}");
425 }
426 if exec_enabled && let Err(err) = run_exec_callback(¬ify_cfg.exec.command, &aggregated) {
427 eprintln!("codex-helper notify: exec callback failed: {err}");
428 }
429
430 state.last_toast_ms = Some(now);
431 for e in send.iter() {
432 state
433 .per_thread_last_toast_ms
434 .insert(e.thread_id.clone(), now);
435 }
436 state.suppressed_since_last_toast = 0;
437 state.pending.clear();
438 save_state(&state).await?;
439 return Ok(());
440 }
441
442 Ok(())
443}
444
445fn render_title(count: usize, suppressed_in_batch: u64, suppressed_since_last: u64) -> String {
446 let mut title = if count == 1 {
447 "Codex: turn complete".to_string()
448 } else {
449 format!("Codex: {count} turns complete")
450 };
451 let total_suppressed = suppressed_in_batch.saturating_add(suppressed_since_last);
452 if total_suppressed > 0 {
453 title.push_str(&format!(" (+{total_suppressed} suppressed)"));
454 }
455 title
456}
457
458fn render_body(events: &[QueuedEvent]) -> String {
459 let mut lines: Vec<String> = Vec::new();
460 for e in events.iter().rev().take(3) {
461 let dur_s = (e.duration_ms as f64 / 1000.0).max(0.0);
462 let cwd = e
463 .cwd
464 .as_deref()
465 .and_then(|p| Path::new(p).file_name().and_then(|s| s.to_str()))
466 .unwrap_or("-");
467 if let Some(preview) = e.last_assistant_preview.as_deref() {
468 lines.push(format!("{cwd} ({dur_s:.1}s): {}", shorten(preview, 90)));
469 } else {
470 lines.push(format!("{cwd} ({dur_s:.1}s)"));
471 }
472 }
473 if events.len() > 3 {
474 lines.push(format!("+{} more", events.len() - 3));
475 }
476 lines.join("\n")
477}
478
479fn run_exec_callback(command: &[String], input_json: &str) -> anyhow::Result<()> {
480 if command.is_empty() {
481 return Ok(());
482 }
483 let mut cmd = Command::new(&command[0]);
484 if command.len() > 1 {
485 cmd.args(&command[1..]);
486 }
487 cmd.stdin(Stdio::piped())
488 .stdout(Stdio::null())
489 .stderr(Stdio::null());
490 let mut child = cmd.spawn()?;
491 if let Some(mut stdin) = child.stdin.take() {
492 use std::io::Write;
493 stdin.write_all(input_json.as_bytes())?;
494 }
495 let _ = child.wait();
496 Ok(())
497}
498
499fn spawn_flush_process(force_toast: bool) -> anyhow::Result<()> {
500 let exe = std::env::current_exe()?;
501 let mut cmd = Command::new(exe);
502 cmd.arg("notify").arg("flush-codex");
503 if force_toast {
504 cmd.env("CODEX_HELPER_NOTIFY_FORCE_TOAST", "1");
505 }
506 cmd.stdin(Stdio::null())
507 .stdout(Stdio::null())
508 .stderr(Stdio::null());
509
510 #[cfg(windows)]
511 {
512 use std::os::windows::process::CommandExt;
513 const DETACHED_PROCESS: u32 = 0x00000008;
514 const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;
515 cmd.creation_flags(DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP);
516 }
517
518 let _ = cmd.spawn()?;
519 Ok(())
520}
521
522async fn load_state() -> anyhow::Result<NotifyState> {
523 let path = notify_state_path();
524 if !path.exists() {
525 return Ok(NotifyState {
526 version: 1,
527 ..Default::default()
528 });
529 }
530 let bytes = tokio::fs::read(path).await?;
531 let mut state = serde_json::from_slice::<NotifyState>(&bytes)?;
532 if state.version == 0 {
533 state.version = 1;
534 }
535 Ok(state)
536}
537
538async fn save_state(state: &NotifyState) -> anyhow::Result<()> {
539 let dir = proxy_home_dir();
540 tokio::fs::create_dir_all(&dir).await?;
541 let path = notify_state_path();
542 let tmp = dir.join("notify_state.json.tmp");
543 let data = serde_json::to_vec_pretty(state)?;
544 tokio::fs::write(&tmp, &data).await?;
545 tokio::fs::rename(&tmp, &path).await?;
546 Ok(())
547}
548
549struct NotifyLockGuard {
550 path: PathBuf,
551}
552
553impl Drop for NotifyLockGuard {
554 fn drop(&mut self) {
555 let _ = std::fs::remove_file(&self.path);
556 }
557}
558
559async fn acquire_notify_lock() -> anyhow::Result<NotifyLockGuard> {
560 let path = notify_lock_path();
561 let dir = proxy_home_dir();
562 tokio::fs::create_dir_all(&dir).await?;
563
564 for _ in 0..200 {
565 match std::fs::OpenOptions::new()
566 .write(true)
567 .create_new(true)
568 .open(&path)
569 {
570 Ok(mut f) => {
571 use std::io::Write;
572 let _ = writeln!(f, "{}", now_ms());
573 return Ok(NotifyLockGuard { path });
574 }
575 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
576 if let Ok(meta) = std::fs::metadata(&path)
578 && let Ok(modified) = meta.modified()
579 && let Ok(age) = SystemTime::now().duration_since(modified)
580 && age > Duration::from_secs(120)
581 {
582 let _ = std::fs::remove_file(&path);
583 continue;
584 }
585 sleep(Duration::from_millis(10)).await;
586 }
587 Err(err) => return Err(err.into()),
588 }
589 }
590
591 anyhow::bail!("failed to acquire notify lock: {:?}", path);
592}
593
594fn send_system_notification(title: &str, body: &str) -> anyhow::Result<()> {
595 #[cfg(windows)]
596 {
597 windows_toast::notify(title, body)?;
598 Ok(())
599 }
600 #[cfg(target_os = "macos")]
601 {
602 macos_notification::notify(title, body)?;
603 Ok(())
604 }
605 #[cfg(not(any(windows, target_os = "macos")))]
606 {
607 println!("{title}: {body}");
609 Ok(())
610 }
611}
612
613#[cfg(windows)]
614mod windows_toast {
615 use std::io;
616 use std::process::{Command, Stdio};
617
618 use base64::Engine as _;
619 use base64::engine::general_purpose::STANDARD as BASE64;
620
621 const APP_ID: &str = "codex-helper";
622 const POWERSHELL_EXE: &str = "powershell.exe";
623
624 pub fn notify(title: &str, body: &str) -> io::Result<()> {
625 let encoded_title = encode_argument(title);
626 let encoded_body = encode_argument(body);
627 let encoded_command = build_encoded_command(&encoded_title, &encoded_body);
628
629 let mut command = Command::new(POWERSHELL_EXE);
630 command
631 .arg("-NoProfile")
632 .arg("-NoLogo")
633 .arg("-EncodedCommand")
634 .arg(encoded_command)
635 .stdin(Stdio::null())
636 .stdout(Stdio::null())
637 .stderr(Stdio::null());
638
639 let status = command.status()?;
640 if status.success() {
641 Ok(())
642 } else {
643 Err(io::Error::other(format!(
644 "{POWERSHELL_EXE} exited with status {status}"
645 )))
646 }
647 }
648
649 fn build_encoded_command(encoded_title: &str, encoded_body: &str) -> String {
650 let script = build_ps_script(encoded_title, encoded_body);
651 encode_script_for_powershell(&script)
652 }
653
654 fn build_ps_script(encoded_title: &str, encoded_body: &str) -> String {
655 format!(
656 r#"
657$encoding = [System.Text.Encoding]::UTF8
658$titleText = $encoding.GetString([System.Convert]::FromBase64String("{encoded_title}"))
659$bodyText = $encoding.GetString([System.Convert]::FromBase64String("{encoded_body}"))
660[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null
661$doc = [Windows.UI.Notifications.ToastNotificationManager]::GetTemplateContent([Windows.UI.Notifications.ToastTemplateType]::ToastText02)
662$textNodes = $doc.GetElementsByTagName("text")
663$textNodes.Item(0).AppendChild($doc.CreateTextNode($titleText)) | Out-Null
664$textNodes.Item(1).AppendChild($doc.CreateTextNode($bodyText)) | Out-Null
665$toast = [Windows.UI.Notifications.ToastNotification]::new($doc)
666[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier('{app_id}').Show($toast)
667"#,
668 app_id = APP_ID
669 )
670 }
671
672 fn encode_script_for_powershell(script: &str) -> String {
673 let mut wide: Vec<u8> = Vec::with_capacity((script.len() + 1) * 2);
674 for unit in script.encode_utf16() {
675 wide.extend_from_slice(&unit.to_le_bytes());
676 }
677 BASE64.encode(wide)
678 }
679
680 fn encode_argument(value: &str) -> String {
681 BASE64.encode(escape_for_xml(value))
682 }
683
684 fn escape_for_xml(input: &str) -> String {
685 let mut escaped = String::with_capacity(input.len());
686 for ch in input.chars() {
687 match ch {
688 '&' => escaped.push_str("&"),
689 '<' => escaped.push_str("<"),
690 '>' => escaped.push_str(">"),
691 '"' => escaped.push_str("""),
692 '\'' => escaped.push_str("'"),
693 _ => escaped.push(ch),
694 }
695 }
696 escaped
697 }
698
699 #[cfg(test)]
700 mod tests {
701 use super::escape_for_xml;
702
703 #[test]
704 fn escapes_xml_entities() {
705 assert_eq!(escape_for_xml("a & b"), "a & b");
706 assert_eq!(escape_for_xml("5 > 3"), "5 > 3");
707 assert_eq!(escape_for_xml("<tag>"), "<tag>");
708 assert_eq!(escape_for_xml("\"quoted\""), ""quoted"");
709 assert_eq!(escape_for_xml("single 'quote'"), "single 'quote'");
710 }
711 }
712}
713
714#[cfg(target_os = "macos")]
715mod macos_notification {
716 use std::io;
717 use std::process::{Command, Stdio};
718
719 pub fn notify(title: &str, body: &str) -> io::Result<()> {
720 let script = format!(
721 "display notification {} with title {}",
722 apple_quote(body),
723 apple_quote(title)
724 );
725 let status = Command::new("osascript")
726 .arg("-e")
727 .arg(script)
728 .stdin(Stdio::null())
729 .stdout(Stdio::null())
730 .stderr(Stdio::null())
731 .status()?;
732 if status.success() {
733 Ok(())
734 } else {
735 Err(io::Error::other(format!(
736 "osascript exited with status {status}"
737 )))
738 }
739 }
740
741 fn apple_quote(s: &str) -> String {
742 format!("\"{}\"", s.replace('\\', "\\\\").replace('\"', "\\\""))
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749
750 #[test]
751 fn parses_agent_turn_complete_payload_with_thread_id() {
752 let payload = r#"{
753 "type": "agent-turn-complete",
754 "thread-id": "th1",
755 "turn-id": "t1",
756 "cwd": "/tmp/x",
757 "input-messages": ["run tests"],
758 "last-assistant-message": "ok"
759 }"#;
760 let parsed: CodexNotificationInput = serde_json::from_str(payload).expect("parse");
761 assert_eq!(parsed.r#type, CodexNotificationType::AgentTurnComplete);
762 assert_eq!(parsed.thread_id.as_deref(), Some("th1"));
763 assert_eq!(parsed.turn_id.as_deref(), Some("t1"));
764 }
765
766 #[test]
767 fn picks_best_recent_request_prefers_responses_path() {
768 let policy = NotifyPolicyConfig::default();
769 let now = 1_000_000u64;
770 let recent = vec![
771 FinishedRequestLite {
772 session_id: Some("th1".to_string()),
773 cwd: Some("/p".to_string()),
774 service: "codex".to_string(),
775 method: "POST".to_string(),
776 path: "/v1/chat/completions".to_string(),
777 status_code: 200,
778 duration_ms: 10_000,
779 ended_at_ms: now - 1_000,
780 },
781 FinishedRequestLite {
782 session_id: Some("th1".to_string()),
783 cwd: Some("/p".to_string()),
784 service: "codex".to_string(),
785 method: "POST".to_string(),
786 path: "/v1/responses".to_string(),
787 status_code: 200,
788 duration_ms: 20_000,
789 ended_at_ms: now - 10_000,
790 },
791 ];
792 let best =
793 pick_best_recent_request("th1", Some("/p"), now, &policy, &recent).expect("best");
794 assert_eq!(best.path, "/v1/responses");
795 }
796}