Skip to main content

codex_runtime/
automation.rs

1//! Minimal session-scoped automation above [`crate::runtime::Session`].
2//!
3//! V1 keeps the boundary intentionally small:
4//! - automate one prepared `Session`
5//! - schedule with absolute `SystemTime` boundaries plus one fixed `Duration`
6//! - require `every > Duration::ZERO`
7//! - run at most one prompt turn at a time
8//! - collapse missed ticks into one next eligible run
9//! - stop permanently on `stop()`, `stop_at`, `max_runs`, closed session, or any
10//!   [`crate::runtime::PromptRunError`]
11//!
12//! Non-goals for this module:
13//! - no cron or human-time parsing
14//! - no session creation or resume orchestration
15//! - no persistence or restart recovery
16//! - no retry or downgrade policy for prompt failures
17
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, SystemTime};
23
24use tokio::sync::{Mutex, Notify};
25use tokio::task::JoinHandle;
26
27use crate::runtime::{PromptRunError, PromptRunResult, Session};
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct AutomationSpec {
31    /// Prompt reused for every scheduled turn.
32    pub prompt: String,
33    /// First eligible run time. `None` starts immediately.
34    pub start_at: Option<SystemTime>,
35    /// Fixed interval between due times. Must be greater than zero.
36    pub every: Duration,
37    /// Exclusive upper bound for starting new runs.
38    pub stop_at: Option<SystemTime>,
39    /// Optional hard cap on completed runs. `Some(0)` stops immediately.
40    pub max_runs: Option<u32>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct AutomationStatus {
45    pub thread_id: String,
46    pub runs_completed: u32,
47    pub next_due_at: Option<SystemTime>,
48    pub last_started_at: Option<SystemTime>,
49    pub last_finished_at: Option<SystemTime>,
50    pub state: AutomationState,
51    pub last_error: Option<String>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum AutomationState {
56    Waiting,
57    Running,
58    Stopped,
59    Failed,
60}
61
62impl AutomationState {
63    fn is_terminal(self) -> bool {
64        matches!(self, Self::Stopped | Self::Failed)
65    }
66}
67
68pub struct AutomationHandle {
69    shared: Arc<SharedState>,
70    join: Mutex<Option<JoinHandle<AutomationStatus>>>,
71}
72
73impl AutomationHandle {
74    pub async fn stop(&self) {
75        self.shared.stop_requested.store(true, Ordering::Release);
76        self.shared.stop_notify.notify_waiters();
77        {
78            let mut status = self.shared.status.lock().await;
79            if !status.state.is_terminal() {
80                status.state = AutomationState::Stopped;
81                status.next_due_at = None;
82            }
83        }
84        if let Some(join) = self.join.lock().await.as_ref() {
85            join.abort();
86        }
87    }
88
89    pub async fn wait(self) -> AutomationStatus {
90        let join = self.join.lock().await.take();
91        match join {
92            Some(join) => match join.await {
93                Ok(status) => status,
94                Err(err) => {
95                    let status = self.shared.snapshot().await;
96                    if err.is_cancelled() && status.state == AutomationState::Stopped {
97                        return status;
98                    }
99                    let mut status = self.shared.snapshot().await;
100                    status.state = AutomationState::Failed;
101                    status.next_due_at = None;
102                    status.last_error = Some(format!("automation task join error: {err}"));
103                    status
104                }
105            },
106            None => self.shared.snapshot().await,
107        }
108    }
109
110    pub async fn status(&self) -> AutomationStatus {
111        self.shared.snapshot().await
112    }
113}
114
115/// Start one background automation loop for one prepared session.
116///
117/// Invalid specs do not panic: for example, `every == Duration::ZERO` returns a handle whose
118/// status is immediately terminal with `AutomationState::Failed`.
119pub fn spawn(session: Session, spec: AutomationSpec) -> AutomationHandle {
120    spawn_runner(Arc::new(SessionRunner(session)), spec)
121}
122
123type TurnFuture<'a> =
124    Pin<Box<dyn Future<Output = Result<PromptRunResult, PromptRunError>> + Send + 'a>>;
125
126trait TurnRunner: Send + Sync + 'static {
127    fn thread_id(&self) -> &str;
128    fn is_closed(&self) -> bool;
129    fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a>;
130}
131
132struct SessionRunner(Session);
133
134impl TurnRunner for SessionRunner {
135    fn thread_id(&self) -> &str {
136        &self.0.thread_id
137    }
138
139    fn is_closed(&self) -> bool {
140        self.0.is_closed()
141    }
142
143    fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a> {
144        Box::pin(self.0.ask(prompt.to_owned()))
145    }
146}
147
148struct SharedState {
149    status: Mutex<AutomationStatus>,
150    stop_requested: AtomicBool,
151    stop_notify: Notify,
152}
153
154impl SharedState {
155    fn new(status: AutomationStatus) -> Self {
156        Self {
157            status: Mutex::new(status),
158            stop_requested: AtomicBool::new(false),
159            stop_notify: Notify::new(),
160        }
161    }
162
163    async fn snapshot(&self) -> AutomationStatus {
164        self.status.lock().await.clone()
165    }
166}
167
168fn spawn_runner<R>(runner: Arc<R>, spec: AutomationSpec) -> AutomationHandle
169where
170    R: TurnRunner,
171{
172    let initial = initial_status(runner.thread_id(), runner.is_closed(), &spec);
173    let shared = Arc::new(SharedState::new(initial));
174    let task_shared = Arc::clone(&shared);
175    let task_runner = Arc::clone(&runner);
176    let join = tokio::spawn(async move { run_loop(task_runner, spec, task_shared).await });
177    AutomationHandle {
178        shared,
179        join: Mutex::new(Some(join)),
180    }
181}
182
183async fn run_loop<R>(
184    runner: Arc<R>,
185    spec: AutomationSpec,
186    shared: Arc<SharedState>,
187) -> AutomationStatus
188where
189    R: TurnRunner,
190{
191    {
192        let status = shared.status.lock().await;
193        if status.state.is_terminal() {
194            return status.clone();
195        }
196    }
197
198    let mut due_at = {
199        let status = shared.status.lock().await;
200        match status.next_due_at {
201            Some(due_at) => due_at,
202            None => {
203                drop(status);
204                return mark_failed(
205                    &shared,
206                    None,
207                    "automation status invariant violated: missing next due time".to_owned(),
208                )
209                .await;
210            }
211        }
212    };
213
214    loop {
215        if shared.stop_requested.load(Ordering::Acquire) {
216            return mark_stopped(&shared, None).await;
217        }
218
219        if runner.is_closed() {
220            return mark_failed(&shared, None, "session is closed".to_owned()).await;
221        }
222
223        if let Some(stop_at) = spec.stop_at {
224            if SystemTime::now() >= stop_at || due_at >= stop_at {
225                return mark_stopped(&shared, None).await;
226            }
227        }
228
229        wait_until_due_or_stop(&shared, due_at).await;
230        if shared.stop_requested.load(Ordering::Acquire) {
231            return mark_stopped(&shared, None).await;
232        }
233        if runner.is_closed() {
234            return mark_failed(&shared, None, "session is closed".to_owned()).await;
235        }
236
237        let started_at = SystemTime::now();
238        if let Some(stop_at) = spec.stop_at {
239            if started_at >= stop_at {
240                return mark_stopped(&shared, None).await;
241            }
242        }
243
244        {
245            let mut status = shared.status.lock().await;
246            status.state = AutomationState::Running;
247            status.next_due_at = None;
248            status.last_started_at = Some(started_at);
249        }
250
251        let result = runner.run_prompt(spec.prompt.as_str()).await;
252        let finished_at = SystemTime::now();
253
254        match result {
255            Ok(_) => {
256                let status = {
257                    let mut status = shared.status.lock().await;
258                    status.runs_completed = status.runs_completed.saturating_add(1);
259                    status.last_finished_at = Some(finished_at);
260                    status.last_error = None;
261
262                    if spec
263                        .max_runs
264                        .is_some_and(|limit| status.runs_completed >= limit)
265                    {
266                        status.state = AutomationState::Stopped;
267                        status.next_due_at = None;
268                        return status.clone();
269                    }
270
271                    let Some(next_due) = collapse_next_due(due_at, spec.every, finished_at) else {
272                        status.state = AutomationState::Failed;
273                        status.next_due_at = None;
274                        status.last_error =
275                            Some("automation schedule overflowed next due timestamp".to_owned());
276                        return status.clone();
277                    };
278
279                    if spec.stop_at.is_some_and(|stop_at| next_due >= stop_at) {
280                        status.state = AutomationState::Stopped;
281                        status.next_due_at = None;
282                        return status.clone();
283                    }
284
285                    status.state = AutomationState::Waiting;
286                    status.next_due_at = Some(next_due);
287                    due_at = next_due;
288                    status.clone()
289                };
290
291                if status.state.is_terminal() {
292                    return status;
293                }
294            }
295            Err(err) => {
296                return mark_failed(&shared, Some(finished_at), err.to_string()).await;
297            }
298        }
299    }
300}
301
302fn initial_status(
303    thread_id: &str,
304    session_closed: bool,
305    spec: &AutomationSpec,
306) -> AutomationStatus {
307    let now = SystemTime::now();
308    let mut status = AutomationStatus {
309        thread_id: thread_id.to_owned(),
310        runs_completed: 0,
311        next_due_at: None,
312        last_started_at: None,
313        last_finished_at: None,
314        state: AutomationState::Waiting,
315        last_error: None,
316    };
317
318    if spec.every.is_zero() {
319        status.state = AutomationState::Failed;
320        status.last_error = Some("automation interval must be greater than zero".to_owned());
321        return status;
322    }
323
324    if session_closed {
325        status.state = AutomationState::Failed;
326        status.last_error = Some("session is closed".to_owned());
327        return status;
328    }
329
330    if spec.max_runs == Some(0) {
331        status.state = AutomationState::Stopped;
332        return status;
333    }
334
335    let due_at = initial_due_at(spec.start_at, now);
336    if spec
337        .stop_at
338        .is_some_and(|stop_at| due_at >= stop_at || now >= stop_at)
339    {
340        status.state = AutomationState::Stopped;
341        return status;
342    }
343
344    status.next_due_at = Some(due_at);
345    status
346}
347
348fn initial_due_at(start_at: Option<SystemTime>, now: SystemTime) -> SystemTime {
349    match start_at {
350        Some(start_at) if start_at > now => start_at,
351        _ => now,
352    }
353}
354
355fn collapse_next_due(
356    last_due_at: SystemTime,
357    every: Duration,
358    now: SystemTime,
359) -> Option<SystemTime> {
360    let next_due = checked_add_system_time(last_due_at, every)?;
361    if next_due > now {
362        return Some(next_due);
363    }
364
365    let overdue = now.duration_since(last_due_at).unwrap_or_default();
366    let every_nanos = every.as_nanos();
367    if every_nanos == 0 {
368        return None;
369    }
370    let steps = (overdue.as_nanos() / every_nanos).saturating_add(1);
371    checked_add_system_time_by_factor(last_due_at, every, steps)
372}
373
374fn checked_add_system_time(base: SystemTime, delta: Duration) -> Option<SystemTime> {
375    base.checked_add(delta)
376}
377
378fn checked_add_system_time_by_factor(
379    base: SystemTime,
380    delta: Duration,
381    factor: u128,
382) -> Option<SystemTime> {
383    let scaled = checked_mul_duration(delta, factor)?;
384    base.checked_add(scaled)
385}
386
387fn checked_mul_duration(delta: Duration, factor: u128) -> Option<Duration> {
388    let secs = (delta.as_secs() as u128).checked_mul(factor)?;
389    let nanos = (delta.subsec_nanos() as u128).checked_mul(factor)?;
390    let carry_secs = nanos / 1_000_000_000;
391    let secs = secs.checked_add(carry_secs)?;
392    let nanos = (nanos % 1_000_000_000) as u32;
393    let secs = u64::try_from(secs).ok()?;
394    Some(Duration::new(secs, nanos))
395}
396
397async fn wait_until_due_or_stop(shared: &SharedState, due_at: SystemTime) {
398    let now = SystemTime::now();
399    let Some(delay) = due_at.duration_since(now).ok() else {
400        return;
401    };
402    tokio::select! {
403        _ = tokio::time::sleep(delay) => {}
404        _ = shared.stop_notify.notified() => {}
405    }
406}
407
408async fn mark_stopped(shared: &SharedState, finished_at: Option<SystemTime>) -> AutomationStatus {
409    let mut status = shared.status.lock().await;
410    status.state = AutomationState::Stopped;
411    status.next_due_at = None;
412    if let Some(finished_at) = finished_at {
413        status.last_finished_at = Some(finished_at);
414    }
415    status.clone()
416}
417
418async fn mark_failed(
419    shared: &SharedState,
420    finished_at: Option<SystemTime>,
421    last_error: String,
422) -> AutomationStatus {
423    let mut status = shared.status.lock().await;
424    status.state = AutomationState::Failed;
425    status.next_due_at = None;
426    if let Some(finished_at) = finished_at {
427        status.last_finished_at = Some(finished_at);
428    }
429    status.last_error = Some(last_error);
430    status.clone()
431}
432
433#[cfg(test)]
434mod tests {
435    use std::collections::VecDeque;
436    use std::fs;
437    use std::path::{Path, PathBuf};
438    use std::sync::atomic::{AtomicUsize, Ordering};
439    use std::sync::{Arc, Mutex as StdMutex};
440
441    use super::*;
442    use crate::runtime::{Client, ClientConfig, SessionConfig};
443
444    #[test]
445    fn collapse_next_due_skips_missed_ticks() {
446        let base = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
447        let now = base + Duration::from_millis(35);
448        let next_due = collapse_next_due(base, Duration::from_millis(10), now).expect("next due");
449        assert_eq!(next_due, base + Duration::from_millis(40));
450    }
451
452    #[test]
453    fn initial_status_stops_when_start_hits_stop_boundary() {
454        let now = SystemTime::now();
455        let at = now + Duration::from_millis(30);
456        let spec = AutomationSpec {
457            prompt: "night run".to_owned(),
458            start_at: Some(at),
459            every: Duration::from_millis(10),
460            stop_at: Some(at),
461            max_runs: None,
462        };
463        let status = initial_status("thr_test", false, &spec);
464        assert_eq!(status.state, AutomationState::Stopped);
465        assert_eq!(status.next_due_at, None);
466    }
467
468    #[test]
469    fn initial_status_fails_when_interval_is_zero() {
470        let spec = AutomationSpec {
471            prompt: "night run".to_owned(),
472            start_at: None,
473            every: Duration::ZERO,
474            stop_at: None,
475            max_runs: None,
476        };
477        let status = initial_status("thr_test", false, &spec);
478        assert_eq!(status.state, AutomationState::Failed);
479        assert_eq!(
480            status.last_error.as_deref(),
481            Some("automation interval must be greater than zero")
482        );
483    }
484
485    #[tokio::test(flavor = "current_thread")]
486    async fn runner_stop_signal_keeps_single_flight() {
487        let state = Arc::new(FakeRunnerState::new(
488            Duration::from_millis(40),
489            vec![Ok(()), Ok(())],
490        ));
491        let handle = spawn_runner(
492            Arc::new(FakeRunner::new("thr_stop", Arc::clone(&state))),
493            AutomationSpec {
494                prompt: "keep going".to_owned(),
495                start_at: None,
496                every: Duration::from_millis(5),
497                stop_at: None,
498                max_runs: None,
499            },
500        );
501
502        state.first_run_started.notified().await;
503        handle.stop().await;
504        let status = handle.wait().await;
505
506        assert_eq!(status.state, AutomationState::Stopped);
507        assert_eq!(status.runs_completed, 0);
508        assert_eq!(state.max_active.load(Ordering::SeqCst), 1);
509    }
510
511    #[tokio::test(flavor = "current_thread")]
512    async fn stop_aborts_in_flight_run_prompt() {
513        let state = Arc::new(FakeRunnerState::new(Duration::from_secs(60), vec![Ok(())]));
514        let handle = spawn_runner(
515            Arc::new(FakeRunner::new("thr_abort", Arc::clone(&state))),
516            AutomationSpec {
517                prompt: "block".to_owned(),
518                start_at: None,
519                every: Duration::from_millis(5),
520                stop_at: None,
521                max_runs: None,
522            },
523        );
524
525        state.first_run_started.notified().await;
526        handle.stop().await;
527        let status = tokio::time::timeout(Duration::from_secs(1), handle.wait())
528            .await
529            .expect("wait should not hang after stop");
530
531        assert_eq!(status.state, AutomationState::Stopped);
532        assert_eq!(status.runs_completed, 0);
533    }
534
535    #[tokio::test(flavor = "current_thread")]
536    async fn runner_marks_failure_on_prompt_error() {
537        let state = Arc::new(FakeRunnerState::new(
538            Duration::ZERO,
539            vec![Err(PromptRunError::TurnFailed)],
540        ));
541
542        let handle = spawn_runner(
543            Arc::new(FakeRunner::new("thr_failed", Arc::clone(&state))),
544            AutomationSpec {
545                prompt: "fail once".to_owned(),
546                start_at: None,
547                every: Duration::from_millis(10),
548                stop_at: None,
549                max_runs: Some(3),
550            },
551        );
552
553        let status = handle.wait().await;
554        assert_eq!(status.state, AutomationState::Failed);
555        assert_eq!(status.runs_completed, 0);
556        assert_eq!(status.last_error.as_deref(), Some("turn failed"));
557    }
558
559    #[tokio::test(flavor = "current_thread")]
560    async fn runner_respects_delayed_start_and_max_runs() {
561        let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
562        let start_at = SystemTime::now() + Duration::from_millis(30);
563
564        let handle = spawn_runner(
565            Arc::new(FakeRunner::new("thr_delayed", Arc::clone(&state))),
566            AutomationSpec {
567                prompt: "delayed".to_owned(),
568                start_at: Some(start_at),
569                every: Duration::from_millis(15),
570                stop_at: None,
571                max_runs: Some(2),
572            },
573        );
574
575        let waiting = handle.status().await;
576        assert_eq!(waiting.state, AutomationState::Waiting);
577        assert_eq!(waiting.next_due_at, Some(start_at));
578
579        let status = handle.wait().await;
580        assert_eq!(status.state, AutomationState::Stopped);
581        assert_eq!(status.runs_completed, 2);
582    }
583
584    #[tokio::test(flavor = "current_thread")]
585    async fn runner_stops_at_stop_at_boundary_after_completed_run() {
586        let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
587        let start_at = SystemTime::now() + Duration::from_millis(10);
588        let stop_at = start_at + Duration::from_millis(20);
589
590        let handle = spawn_runner(
591            Arc::new(FakeRunner::new("thr_stop_at", Arc::clone(&state))),
592            AutomationSpec {
593                prompt: "bounded".to_owned(),
594                start_at: Some(start_at),
595                every: Duration::from_millis(20),
596                stop_at: Some(stop_at),
597                max_runs: None,
598            },
599        );
600
601        let status = handle.wait().await;
602        assert_eq!(status.state, AutomationState::Stopped);
603        assert_eq!(status.runs_completed, 1);
604        assert_eq!(status.next_due_at, None);
605    }
606
607    #[tokio::test(flavor = "current_thread")]
608    async fn automation_reuses_loaded_session_thread_for_repeated_runs() {
609        let temp = TempDir::new("automation_reuse");
610        let cli = write_resume_sensitive_cli_script(&temp.root, 0);
611        let client = Client::connect(
612            ClientConfig::new()
613                .with_cli_bin(cli)
614                .without_compatibility_guard(),
615        )
616        .await
617        .expect("connect client");
618        let session = client
619            .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
620            .await
621            .expect("start session");
622        let thread_id = session.thread_id.clone();
623
624        let handle = spawn(
625            session,
626            AutomationSpec {
627                prompt: "repeat".to_owned(),
628                start_at: None,
629                every: Duration::from_millis(10),
630                stop_at: None,
631                max_runs: Some(2),
632            },
633        );
634
635        let status = handle.wait().await;
636        assert_eq!(status.state, AutomationState::Stopped);
637        assert_eq!(status.runs_completed, 2);
638        assert_eq!(status.thread_id, thread_id);
639
640        client.shutdown().await.expect("shutdown client");
641    }
642
643    #[tokio::test(flavor = "current_thread")]
644    async fn automation_fails_when_session_is_closed_before_due_run() {
645        let temp = TempDir::new("automation_closed");
646        let cli = write_resume_sensitive_cli_script(&temp.root, 0);
647        let client = Client::connect(
648            ClientConfig::new()
649                .with_cli_bin(cli)
650                .without_compatibility_guard(),
651        )
652        .await
653        .expect("connect client");
654        let session = client
655            .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
656            .await
657            .expect("start session");
658        let session_to_close = session.clone();
659
660        let handle = spawn(
661            session,
662            AutomationSpec {
663                prompt: "repeat".to_owned(),
664                start_at: Some(SystemTime::now() + Duration::from_millis(30)),
665                every: Duration::from_millis(10),
666                stop_at: None,
667                max_runs: Some(2),
668            },
669        );
670
671        session_to_close.close().await.expect("close session");
672        let status = handle.wait().await;
673
674        assert_eq!(status.state, AutomationState::Failed);
675        assert_eq!(status.runs_completed, 0);
676        assert!(status
677            .last_error
678            .as_deref()
679            .is_some_and(|error| error.contains("session is closed")));
680
681        client.shutdown().await.expect("shutdown client");
682    }
683
684    struct FakeRunner {
685        thread_id: String,
686        state: Arc<FakeRunnerState>,
687    }
688
689    impl FakeRunner {
690        fn new(thread_id: &str, state: Arc<FakeRunnerState>) -> Self {
691            Self {
692                thread_id: thread_id.to_owned(),
693                state,
694            }
695        }
696    }
697
698    impl TurnRunner for FakeRunner {
699        fn thread_id(&self) -> &str {
700            &self.thread_id
701        }
702
703        fn is_closed(&self) -> bool {
704            false
705        }
706
707        fn run_prompt<'a>(&'a self, _prompt: &'a str) -> TurnFuture<'a> {
708            Box::pin(async move {
709                let active = self.state.active.fetch_add(1, Ordering::SeqCst) + 1;
710                self.state.max_active.fetch_max(active, Ordering::SeqCst);
711                self.state.first_run_started.notify_waiters();
712                if !self.state.delay.is_zero() {
713                    tokio::time::sleep(self.state.delay).await;
714                }
715                self.state.active.fetch_sub(1, Ordering::SeqCst);
716                let next = self
717                    .state
718                    .results
719                    .lock()
720                    .expect("results lock")
721                    .pop_front()
722                    .unwrap_or(Ok(()));
723                next.map(|_| PromptRunResult {
724                    thread_id: self.thread_id.clone(),
725                    turn_id: format!(
726                        "turn_{}",
727                        self.state.turn_counter.fetch_add(1, Ordering::SeqCst)
728                    ),
729                    assistant_text: "ok".to_owned(),
730                })
731            })
732        }
733    }
734
735    struct FakeRunnerState {
736        delay: Duration,
737        results: StdMutex<VecDeque<Result<(), PromptRunError>>>,
738        active: AtomicUsize,
739        max_active: AtomicUsize,
740        turn_counter: AtomicUsize,
741        first_run_started: Notify,
742    }
743
744    impl FakeRunnerState {
745        fn new(delay: Duration, results: Vec<Result<(), PromptRunError>>) -> Self {
746            Self {
747                delay,
748                results: StdMutex::new(VecDeque::from(results)),
749                active: AtomicUsize::new(0),
750                max_active: AtomicUsize::new(0),
751                turn_counter: AtomicUsize::new(0),
752                first_run_started: Notify::new(),
753            }
754        }
755    }
756
757    #[derive(Debug)]
758    struct TempDir {
759        root: PathBuf,
760    }
761
762    impl TempDir {
763        fn new(prefix: &str) -> Self {
764            let root = std::env::temp_dir().join(format!("{prefix}_{}", uuid::Uuid::new_v4()));
765            fs::create_dir_all(&root).expect("create temp root");
766            Self { root }
767        }
768    }
769
770    impl Drop for TempDir {
771        fn drop(&mut self) {
772            let _ = fs::remove_dir_all(&self.root);
773        }
774    }
775
776    fn write_resume_sensitive_cli_script(root: &Path, allowed_resume_calls: usize) -> PathBuf {
777        let path = root.join("mock_codex_cli_resume_sensitive.py");
778        let script = r#"#!/usr/bin/env python3
779import json
780import sys
781
782allowed_resume_calls = __ALLOWED_RESUME_CALLS__
783resume_calls = 0
784
785for line in sys.stdin:
786    line = line.strip()
787    if not line:
788        continue
789    try:
790        msg = json.loads(line)
791    except Exception:
792        continue
793
794    rpc_id = msg.get("id")
795    method = msg.get("method")
796    params = msg.get("params") or {}
797
798    if rpc_id is None:
799        continue
800
801    if method == "initialize":
802        sys.stdout.write(json.dumps({
803            "id": rpc_id,
804            "result": {"ready": True, "userAgent": "Codex Desktop/0.104.0"}
805        }) + "\n")
806        sys.stdout.flush()
807        continue
808
809    if method == "thread/start":
810        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": "thr_automation"}}}) + "\n")
811        sys.stdout.flush()
812        continue
813
814    if method == "thread/resume":
815        resume_calls += 1
816        if resume_calls > allowed_resume_calls:
817            sys.stdout.write(json.dumps({
818                "id": rpc_id,
819                "error": {"code": -32002, "message": f"unexpected thread/resume call #{resume_calls}"}
820            }) + "\n")
821        else:
822            thread_id = params.get("threadId") or "thr_automation"
823            sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": thread_id}}}) + "\n")
824        sys.stdout.flush()
825        continue
826
827    if method == "turn/start":
828        thread_id = params.get("threadId") or "thr_automation"
829        turn_id = "turn_" + str(resume_calls)
830        text = "ok"
831        input_items = params.get("input") or []
832        if input_items and isinstance(input_items[0], dict):
833            text = input_items[0].get("text") or "ok"
834
835        sys.stdout.write(json.dumps({"method":"turn/started","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
836        sys.stdout.write(json.dumps({"method":"item/started","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","itemType":"agentMessage"}}) + "\n")
837        sys.stdout.write(json.dumps({"method":"item/agentMessage/delta","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","delta":text}}) + "\n")
838        sys.stdout.write(json.dumps({"method":"turn/completed","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
839        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"turn": {"id": turn_id}}}) + "\n")
840        sys.stdout.flush()
841        continue
842
843    if method == "thread/archive":
844        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
845        sys.stdout.flush()
846        continue
847
848    sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
849    sys.stdout.flush()
850"#;
851        let script = script.replace(
852            "__ALLOWED_RESUME_CALLS__",
853            &allowed_resume_calls.to_string(),
854        );
855        fs::write(&path, script).expect("write cli script");
856        #[cfg(unix)]
857        {
858            use std::os::unix::fs::PermissionsExt;
859
860            let mut permissions = fs::metadata(&path).expect("metadata").permissions();
861            permissions.set_mode(0o755);
862            fs::set_permissions(&path, permissions).expect("set executable");
863        }
864        path
865    }
866}