Skip to main content

codex_runtime/
automation.rs

1//! Minimal session-scoped automation above [`crate::runtime::Session`].
2//!
3//! V1 keeps the boundary intentionally small:
4//! - automate one prepared `Session`
5//! - schedule with absolute `SystemTime` boundaries plus one fixed `Duration`
6//! - require `every > Duration::ZERO`
7//! - run at most one prompt turn at a time
8//! - collapse missed ticks into one next eligible run
9//! - stop permanently on `stop()`, `stop_at`, `max_runs`, closed session, or any
10//!   [`crate::runtime::PromptRunError`]
11//!
12//! Non-goals for this module:
13//! - no cron or human-time parsing
14//! - no session creation or resume orchestration
15//! - no persistence or restart recovery
16//! - no retry or downgrade policy for prompt failures
17
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, SystemTime};
23
24use tokio::sync::{Mutex, Notify};
25use tokio::task::JoinHandle;
26
27use crate::runtime::{PromptRunError, PromptRunResult, Session};
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct AutomationSpec {
31    /// Prompt reused for every scheduled turn.
32    pub prompt: String,
33    /// First eligible run time. `None` starts immediately.
34    pub start_at: Option<SystemTime>,
35    /// Fixed interval between due times. Must be greater than zero.
36    pub every: Duration,
37    /// Exclusive upper bound for starting new runs.
38    pub stop_at: Option<SystemTime>,
39    /// Optional hard cap on completed runs. `Some(0)` stops immediately.
40    pub max_runs: Option<u32>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct AutomationStatus {
45    pub thread_id: String,
46    pub runs_completed: u32,
47    pub next_due_at: Option<SystemTime>,
48    pub last_started_at: Option<SystemTime>,
49    pub last_finished_at: Option<SystemTime>,
50    pub state: AutomationState,
51    pub last_error: Option<String>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum AutomationState {
56    Waiting,
57    Running,
58    Stopped,
59    Failed,
60}
61
62impl AutomationState {
63    fn is_terminal(self) -> bool {
64        matches!(self, Self::Stopped | Self::Failed)
65    }
66}
67
68pub struct AutomationHandle {
69    shared: Arc<SharedState>,
70    join: Mutex<Option<JoinHandle<AutomationStatus>>>,
71}
72
73impl AutomationHandle {
74    pub async fn stop(&self) {
75        self.shared.stop_requested.store(true, Ordering::Release);
76        self.shared.stop_notify.notify_waiters();
77    }
78
79    pub async fn wait(self) -> AutomationStatus {
80        let join = self.join.lock().await.take();
81        match join {
82            Some(join) => match join.await {
83                Ok(status) => status,
84                Err(err) => {
85                    let mut status = self.shared.snapshot().await;
86                    status.state = AutomationState::Failed;
87                    status.next_due_at = None;
88                    status.last_error = Some(format!("automation task join error: {err}"));
89                    status
90                }
91            },
92            None => self.shared.snapshot().await,
93        }
94    }
95
96    pub async fn status(&self) -> AutomationStatus {
97        self.shared.snapshot().await
98    }
99}
100
101/// Start one background automation loop for one prepared session.
102///
103/// Invalid specs do not panic: for example, `every == Duration::ZERO` returns a handle whose
104/// status is immediately terminal with `AutomationState::Failed`.
105pub fn spawn(session: Session, spec: AutomationSpec) -> AutomationHandle {
106    spawn_runner(Arc::new(SessionRunner(session)), spec)
107}
108
109type TurnFuture<'a> =
110    Pin<Box<dyn Future<Output = Result<PromptRunResult, PromptRunError>> + Send + 'a>>;
111
112trait TurnRunner: Send + Sync + 'static {
113    fn thread_id(&self) -> &str;
114    fn is_closed(&self) -> bool;
115    fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a>;
116}
117
118struct SessionRunner(Session);
119
120impl TurnRunner for SessionRunner {
121    fn thread_id(&self) -> &str {
122        &self.0.thread_id
123    }
124
125    fn is_closed(&self) -> bool {
126        self.0.is_closed()
127    }
128
129    fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a> {
130        Box::pin(self.0.ask(prompt.to_owned()))
131    }
132}
133
134struct SharedState {
135    status: Mutex<AutomationStatus>,
136    stop_requested: AtomicBool,
137    stop_notify: Notify,
138}
139
140impl SharedState {
141    fn new(status: AutomationStatus) -> Self {
142        Self {
143            status: Mutex::new(status),
144            stop_requested: AtomicBool::new(false),
145            stop_notify: Notify::new(),
146        }
147    }
148
149    async fn snapshot(&self) -> AutomationStatus {
150        self.status.lock().await.clone()
151    }
152}
153
154fn spawn_runner<R>(runner: Arc<R>, spec: AutomationSpec) -> AutomationHandle
155where
156    R: TurnRunner,
157{
158    let initial = initial_status(runner.thread_id(), runner.is_closed(), &spec);
159    let shared = Arc::new(SharedState::new(initial));
160    let task_shared = Arc::clone(&shared);
161    let task_runner = Arc::clone(&runner);
162    let join = tokio::spawn(async move { run_loop(task_runner, spec, task_shared).await });
163    AutomationHandle {
164        shared,
165        join: Mutex::new(Some(join)),
166    }
167}
168
169async fn run_loop<R>(
170    runner: Arc<R>,
171    spec: AutomationSpec,
172    shared: Arc<SharedState>,
173) -> AutomationStatus
174where
175    R: TurnRunner,
176{
177    {
178        let status = shared.status.lock().await;
179        if status.state.is_terminal() {
180            return status.clone();
181        }
182    }
183
184    let mut due_at = {
185        let status = shared.status.lock().await;
186        match status.next_due_at {
187            Some(due_at) => due_at,
188            None => {
189                drop(status);
190                return mark_failed(
191                    &shared,
192                    None,
193                    "automation status invariant violated: missing next due time".to_owned(),
194                )
195                .await;
196            }
197        }
198    };
199
200    loop {
201        if shared.stop_requested.load(Ordering::Acquire) {
202            return mark_stopped(&shared, None).await;
203        }
204
205        if runner.is_closed() {
206            return mark_failed(&shared, None, "session is closed".to_owned()).await;
207        }
208
209        if let Some(stop_at) = spec.stop_at {
210            if SystemTime::now() >= stop_at || due_at >= stop_at {
211                return mark_stopped(&shared, None).await;
212            }
213        }
214
215        wait_until_due_or_stop(&shared, due_at).await;
216        if shared.stop_requested.load(Ordering::Acquire) {
217            return mark_stopped(&shared, None).await;
218        }
219        if runner.is_closed() {
220            return mark_failed(&shared, None, "session is closed".to_owned()).await;
221        }
222
223        let started_at = SystemTime::now();
224        if let Some(stop_at) = spec.stop_at {
225            if started_at >= stop_at {
226                return mark_stopped(&shared, None).await;
227            }
228        }
229
230        {
231            let mut status = shared.status.lock().await;
232            status.state = AutomationState::Running;
233            status.next_due_at = None;
234            status.last_started_at = Some(started_at);
235        }
236
237        let result = runner.run_prompt(spec.prompt.as_str()).await;
238        let finished_at = SystemTime::now();
239
240        match result {
241            Ok(_) => {
242                let status = {
243                    let mut status = shared.status.lock().await;
244                    status.runs_completed = status.runs_completed.saturating_add(1);
245                    status.last_finished_at = Some(finished_at);
246                    status.last_error = None;
247
248                    if spec
249                        .max_runs
250                        .is_some_and(|limit| status.runs_completed >= limit)
251                    {
252                        status.state = AutomationState::Stopped;
253                        status.next_due_at = None;
254                        return status.clone();
255                    }
256
257                    let Some(next_due) = collapse_next_due(due_at, spec.every, finished_at) else {
258                        status.state = AutomationState::Failed;
259                        status.next_due_at = None;
260                        status.last_error =
261                            Some("automation schedule overflowed next due timestamp".to_owned());
262                        return status.clone();
263                    };
264
265                    if spec.stop_at.is_some_and(|stop_at| next_due >= stop_at) {
266                        status.state = AutomationState::Stopped;
267                        status.next_due_at = None;
268                        return status.clone();
269                    }
270
271                    status.state = AutomationState::Waiting;
272                    status.next_due_at = Some(next_due);
273                    due_at = next_due;
274                    status.clone()
275                };
276
277                if status.state.is_terminal() {
278                    return status;
279                }
280            }
281            Err(err) => {
282                return mark_failed(&shared, Some(finished_at), err.to_string()).await;
283            }
284        }
285    }
286}
287
288fn initial_status(
289    thread_id: &str,
290    session_closed: bool,
291    spec: &AutomationSpec,
292) -> AutomationStatus {
293    let now = SystemTime::now();
294    let mut status = AutomationStatus {
295        thread_id: thread_id.to_owned(),
296        runs_completed: 0,
297        next_due_at: None,
298        last_started_at: None,
299        last_finished_at: None,
300        state: AutomationState::Waiting,
301        last_error: None,
302    };
303
304    if spec.every.is_zero() {
305        status.state = AutomationState::Failed;
306        status.last_error = Some("automation interval must be greater than zero".to_owned());
307        return status;
308    }
309
310    if session_closed {
311        status.state = AutomationState::Failed;
312        status.last_error = Some("session is closed".to_owned());
313        return status;
314    }
315
316    if spec.max_runs == Some(0) {
317        status.state = AutomationState::Stopped;
318        return status;
319    }
320
321    let due_at = initial_due_at(spec.start_at, now);
322    if spec
323        .stop_at
324        .is_some_and(|stop_at| due_at >= stop_at || now >= stop_at)
325    {
326        status.state = AutomationState::Stopped;
327        return status;
328    }
329
330    status.next_due_at = Some(due_at);
331    status
332}
333
334fn initial_due_at(start_at: Option<SystemTime>, now: SystemTime) -> SystemTime {
335    match start_at {
336        Some(start_at) if start_at > now => start_at,
337        _ => now,
338    }
339}
340
341fn collapse_next_due(
342    last_due_at: SystemTime,
343    every: Duration,
344    now: SystemTime,
345) -> Option<SystemTime> {
346    let next_due = checked_add_system_time(last_due_at, every)?;
347    if next_due > now {
348        return Some(next_due);
349    }
350
351    let overdue = now.duration_since(last_due_at).unwrap_or_default();
352    let every_nanos = every.as_nanos();
353    if every_nanos == 0 {
354        return None;
355    }
356    let steps = (overdue.as_nanos() / every_nanos).saturating_add(1);
357    checked_add_system_time_by_factor(last_due_at, every, steps)
358}
359
360fn checked_add_system_time(base: SystemTime, delta: Duration) -> Option<SystemTime> {
361    base.checked_add(delta)
362}
363
364fn checked_add_system_time_by_factor(
365    base: SystemTime,
366    delta: Duration,
367    factor: u128,
368) -> Option<SystemTime> {
369    let scaled = checked_mul_duration(delta, factor)?;
370    base.checked_add(scaled)
371}
372
373fn checked_mul_duration(delta: Duration, factor: u128) -> Option<Duration> {
374    let secs = (delta.as_secs() as u128).checked_mul(factor)?;
375    let nanos = (delta.subsec_nanos() as u128).checked_mul(factor)?;
376    let carry_secs = nanos / 1_000_000_000;
377    let secs = secs.checked_add(carry_secs)?;
378    let nanos = (nanos % 1_000_000_000) as u32;
379    let secs = u64::try_from(secs).ok()?;
380    Some(Duration::new(secs, nanos))
381}
382
383async fn wait_until_due_or_stop(shared: &SharedState, due_at: SystemTime) {
384    let now = SystemTime::now();
385    let Some(delay) = due_at.duration_since(now).ok() else {
386        return;
387    };
388    tokio::select! {
389        _ = tokio::time::sleep(delay) => {}
390        _ = shared.stop_notify.notified() => {}
391    }
392}
393
394async fn mark_stopped(shared: &SharedState, finished_at: Option<SystemTime>) -> AutomationStatus {
395    let mut status = shared.status.lock().await;
396    status.state = AutomationState::Stopped;
397    status.next_due_at = None;
398    if let Some(finished_at) = finished_at {
399        status.last_finished_at = Some(finished_at);
400    }
401    status.clone()
402}
403
404async fn mark_failed(
405    shared: &SharedState,
406    finished_at: Option<SystemTime>,
407    last_error: String,
408) -> AutomationStatus {
409    let mut status = shared.status.lock().await;
410    status.state = AutomationState::Failed;
411    status.next_due_at = None;
412    if let Some(finished_at) = finished_at {
413        status.last_finished_at = Some(finished_at);
414    }
415    status.last_error = Some(last_error);
416    status.clone()
417}
418
419#[cfg(test)]
420mod tests {
421    use std::collections::VecDeque;
422    use std::fs;
423    use std::path::{Path, PathBuf};
424    use std::sync::atomic::{AtomicUsize, Ordering};
425    use std::sync::{Arc, Mutex as StdMutex};
426
427    use super::*;
428    use crate::runtime::{Client, ClientConfig, SessionConfig};
429
430    #[test]
431    fn collapse_next_due_skips_missed_ticks() {
432        let base = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
433        let now = base + Duration::from_millis(35);
434        let next_due = collapse_next_due(base, Duration::from_millis(10), now).expect("next due");
435        assert_eq!(next_due, base + Duration::from_millis(40));
436    }
437
438    #[test]
439    fn initial_status_stops_when_start_hits_stop_boundary() {
440        let now = SystemTime::now();
441        let at = now + Duration::from_millis(30);
442        let spec = AutomationSpec {
443            prompt: "night run".to_owned(),
444            start_at: Some(at),
445            every: Duration::from_millis(10),
446            stop_at: Some(at),
447            max_runs: None,
448        };
449        let status = initial_status("thr_test", false, &spec);
450        assert_eq!(status.state, AutomationState::Stopped);
451        assert_eq!(status.next_due_at, None);
452    }
453
454    #[test]
455    fn initial_status_fails_when_interval_is_zero() {
456        let spec = AutomationSpec {
457            prompt: "night run".to_owned(),
458            start_at: None,
459            every: Duration::ZERO,
460            stop_at: None,
461            max_runs: None,
462        };
463        let status = initial_status("thr_test", false, &spec);
464        assert_eq!(status.state, AutomationState::Failed);
465        assert_eq!(
466            status.last_error.as_deref(),
467            Some("automation interval must be greater than zero")
468        );
469    }
470
471    #[tokio::test(flavor = "current_thread")]
472    async fn runner_stop_signal_keeps_single_flight() {
473        let state = Arc::new(FakeRunnerState::new(
474            Duration::from_millis(40),
475            vec![Ok(()), Ok(())],
476        ));
477        let handle = spawn_runner(
478            Arc::new(FakeRunner::new("thr_stop", Arc::clone(&state))),
479            AutomationSpec {
480                prompt: "keep going".to_owned(),
481                start_at: None,
482                every: Duration::from_millis(5),
483                stop_at: None,
484                max_runs: None,
485            },
486        );
487
488        state.first_run_started.notified().await;
489        handle.stop().await;
490        let status = handle.wait().await;
491
492        assert_eq!(status.state, AutomationState::Stopped);
493        assert_eq!(status.runs_completed, 1);
494        assert_eq!(state.max_active.load(Ordering::SeqCst), 1);
495    }
496
497    #[tokio::test(flavor = "current_thread")]
498    async fn runner_marks_failure_on_prompt_error() {
499        let state = Arc::new(FakeRunnerState::new(
500            Duration::ZERO,
501            vec![Err(PromptRunError::TurnFailed)],
502        ));
503
504        let handle = spawn_runner(
505            Arc::new(FakeRunner::new("thr_failed", Arc::clone(&state))),
506            AutomationSpec {
507                prompt: "fail once".to_owned(),
508                start_at: None,
509                every: Duration::from_millis(10),
510                stop_at: None,
511                max_runs: Some(3),
512            },
513        );
514
515        let status = handle.wait().await;
516        assert_eq!(status.state, AutomationState::Failed);
517        assert_eq!(status.runs_completed, 0);
518        assert_eq!(status.last_error.as_deref(), Some("turn failed"));
519    }
520
521    #[tokio::test(flavor = "current_thread")]
522    async fn runner_respects_delayed_start_and_max_runs() {
523        let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
524        let start_at = SystemTime::now() + Duration::from_millis(30);
525
526        let handle = spawn_runner(
527            Arc::new(FakeRunner::new("thr_delayed", Arc::clone(&state))),
528            AutomationSpec {
529                prompt: "delayed".to_owned(),
530                start_at: Some(start_at),
531                every: Duration::from_millis(15),
532                stop_at: None,
533                max_runs: Some(2),
534            },
535        );
536
537        let waiting = handle.status().await;
538        assert_eq!(waiting.state, AutomationState::Waiting);
539        assert_eq!(waiting.next_due_at, Some(start_at));
540
541        let status = handle.wait().await;
542        assert_eq!(status.state, AutomationState::Stopped);
543        assert_eq!(status.runs_completed, 2);
544    }
545
546    #[tokio::test(flavor = "current_thread")]
547    async fn runner_stops_at_stop_at_boundary_after_completed_run() {
548        let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
549        let start_at = SystemTime::now() + Duration::from_millis(10);
550        let stop_at = start_at + Duration::from_millis(20);
551
552        let handle = spawn_runner(
553            Arc::new(FakeRunner::new("thr_stop_at", Arc::clone(&state))),
554            AutomationSpec {
555                prompt: "bounded".to_owned(),
556                start_at: Some(start_at),
557                every: Duration::from_millis(20),
558                stop_at: Some(stop_at),
559                max_runs: None,
560            },
561        );
562
563        let status = handle.wait().await;
564        assert_eq!(status.state, AutomationState::Stopped);
565        assert_eq!(status.runs_completed, 1);
566        assert_eq!(status.next_due_at, None);
567    }
568
569    #[tokio::test(flavor = "current_thread")]
570    async fn automation_reuses_loaded_session_thread_for_repeated_runs() {
571        let temp = TempDir::new("automation_reuse");
572        let cli = write_resume_sensitive_cli_script(&temp.root, 0);
573        let client = Client::connect(
574            ClientConfig::new()
575                .with_cli_bin(cli)
576                .without_compatibility_guard(),
577        )
578        .await
579        .expect("connect client");
580        let session = client
581            .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
582            .await
583            .expect("start session");
584        let thread_id = session.thread_id.clone();
585
586        let handle = spawn(
587            session,
588            AutomationSpec {
589                prompt: "repeat".to_owned(),
590                start_at: None,
591                every: Duration::from_millis(10),
592                stop_at: None,
593                max_runs: Some(2),
594            },
595        );
596
597        let status = handle.wait().await;
598        assert_eq!(status.state, AutomationState::Stopped);
599        assert_eq!(status.runs_completed, 2);
600        assert_eq!(status.thread_id, thread_id);
601
602        client.shutdown().await.expect("shutdown client");
603    }
604
605    #[tokio::test(flavor = "current_thread")]
606    async fn automation_fails_when_session_is_closed_before_due_run() {
607        let temp = TempDir::new("automation_closed");
608        let cli = write_resume_sensitive_cli_script(&temp.root, 0);
609        let client = Client::connect(
610            ClientConfig::new()
611                .with_cli_bin(cli)
612                .without_compatibility_guard(),
613        )
614        .await
615        .expect("connect client");
616        let session = client
617            .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
618            .await
619            .expect("start session");
620        let session_to_close = session.clone();
621
622        let handle = spawn(
623            session,
624            AutomationSpec {
625                prompt: "repeat".to_owned(),
626                start_at: Some(SystemTime::now() + Duration::from_millis(30)),
627                every: Duration::from_millis(10),
628                stop_at: None,
629                max_runs: Some(2),
630            },
631        );
632
633        session_to_close.close().await.expect("close session");
634        let status = handle.wait().await;
635
636        assert_eq!(status.state, AutomationState::Failed);
637        assert_eq!(status.runs_completed, 0);
638        assert!(status
639            .last_error
640            .as_deref()
641            .is_some_and(|error| error.contains("session is closed")));
642
643        client.shutdown().await.expect("shutdown client");
644    }
645
646    struct FakeRunner {
647        thread_id: String,
648        state: Arc<FakeRunnerState>,
649    }
650
651    impl FakeRunner {
652        fn new(thread_id: &str, state: Arc<FakeRunnerState>) -> Self {
653            Self {
654                thread_id: thread_id.to_owned(),
655                state,
656            }
657        }
658    }
659
660    impl TurnRunner for FakeRunner {
661        fn thread_id(&self) -> &str {
662            &self.thread_id
663        }
664
665        fn is_closed(&self) -> bool {
666            false
667        }
668
669        fn run_prompt<'a>(&'a self, _prompt: &'a str) -> TurnFuture<'a> {
670            Box::pin(async move {
671                let active = self.state.active.fetch_add(1, Ordering::SeqCst) + 1;
672                self.state.max_active.fetch_max(active, Ordering::SeqCst);
673                self.state.first_run_started.notify_waiters();
674                if !self.state.delay.is_zero() {
675                    tokio::time::sleep(self.state.delay).await;
676                }
677                self.state.active.fetch_sub(1, Ordering::SeqCst);
678                let next = self
679                    .state
680                    .results
681                    .lock()
682                    .expect("results lock")
683                    .pop_front()
684                    .unwrap_or(Ok(()));
685                next.map(|_| PromptRunResult {
686                    thread_id: self.thread_id.clone(),
687                    turn_id: format!(
688                        "turn_{}",
689                        self.state.turn_counter.fetch_add(1, Ordering::SeqCst)
690                    ),
691                    assistant_text: "ok".to_owned(),
692                })
693            })
694        }
695    }
696
697    struct FakeRunnerState {
698        delay: Duration,
699        results: StdMutex<VecDeque<Result<(), PromptRunError>>>,
700        active: AtomicUsize,
701        max_active: AtomicUsize,
702        turn_counter: AtomicUsize,
703        first_run_started: Notify,
704    }
705
706    impl FakeRunnerState {
707        fn new(delay: Duration, results: Vec<Result<(), PromptRunError>>) -> Self {
708            Self {
709                delay,
710                results: StdMutex::new(VecDeque::from(results)),
711                active: AtomicUsize::new(0),
712                max_active: AtomicUsize::new(0),
713                turn_counter: AtomicUsize::new(0),
714                first_run_started: Notify::new(),
715            }
716        }
717    }
718
719    #[derive(Debug)]
720    struct TempDir {
721        root: PathBuf,
722    }
723
724    impl TempDir {
725        fn new(prefix: &str) -> Self {
726            let root = std::env::temp_dir().join(format!("{prefix}_{}", uuid::Uuid::new_v4()));
727            fs::create_dir_all(&root).expect("create temp root");
728            Self { root }
729        }
730    }
731
732    impl Drop for TempDir {
733        fn drop(&mut self) {
734            let _ = fs::remove_dir_all(&self.root);
735        }
736    }
737
738    fn write_resume_sensitive_cli_script(root: &Path, allowed_resume_calls: usize) -> PathBuf {
739        let path = root.join("mock_codex_cli_resume_sensitive.py");
740        let script = r#"#!/usr/bin/env python3
741import json
742import sys
743
744allowed_resume_calls = __ALLOWED_RESUME_CALLS__
745resume_calls = 0
746
747for line in sys.stdin:
748    line = line.strip()
749    if not line:
750        continue
751    try:
752        msg = json.loads(line)
753    except Exception:
754        continue
755
756    rpc_id = msg.get("id")
757    method = msg.get("method")
758    params = msg.get("params") or {}
759
760    if rpc_id is None:
761        continue
762
763    if method == "initialize":
764        sys.stdout.write(json.dumps({
765            "id": rpc_id,
766            "result": {"ready": True, "userAgent": "Codex Desktop/0.104.0"}
767        }) + "\n")
768        sys.stdout.flush()
769        continue
770
771    if method == "thread/start":
772        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": "thr_automation"}}}) + "\n")
773        sys.stdout.flush()
774        continue
775
776    if method == "thread/resume":
777        resume_calls += 1
778        if resume_calls > allowed_resume_calls:
779            sys.stdout.write(json.dumps({
780                "id": rpc_id,
781                "error": {"code": -32002, "message": f"unexpected thread/resume call #{resume_calls}"}
782            }) + "\n")
783        else:
784            thread_id = params.get("threadId") or "thr_automation"
785            sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": thread_id}}}) + "\n")
786        sys.stdout.flush()
787        continue
788
789    if method == "turn/start":
790        thread_id = params.get("threadId") or "thr_automation"
791        turn_id = "turn_" + str(resume_calls)
792        text = "ok"
793        input_items = params.get("input") or []
794        if input_items and isinstance(input_items[0], dict):
795            text = input_items[0].get("text") or "ok"
796
797        sys.stdout.write(json.dumps({"method":"turn/started","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
798        sys.stdout.write(json.dumps({"method":"item/started","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","itemType":"agentMessage"}}) + "\n")
799        sys.stdout.write(json.dumps({"method":"item/agentMessage/delta","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","delta":text}}) + "\n")
800        sys.stdout.write(json.dumps({"method":"turn/completed","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
801        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"turn": {"id": turn_id}}}) + "\n")
802        sys.stdout.flush()
803        continue
804
805    if method == "thread/archive":
806        sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
807        sys.stdout.flush()
808        continue
809
810    sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
811    sys.stdout.flush()
812"#;
813        let script = script.replace(
814            "__ALLOWED_RESUME_CALLS__",
815            &allowed_resume_calls.to_string(),
816        );
817        fs::write(&path, script).expect("write cli script");
818        #[cfg(unix)]
819        {
820            use std::os::unix::fs::PermissionsExt;
821
822            let mut permissions = fs::metadata(&path).expect("metadata").permissions();
823            permissions.set_mode(0o755);
824            fs::set_permissions(&path, permissions).expect("set executable");
825        }
826        path
827    }
828}