1use std::future::Future;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, SystemTime};
23
24use tokio::sync::{Mutex, Notify};
25use tokio::task::JoinHandle;
26
27use crate::runtime::{PromptRunError, PromptRunResult, Session};
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct AutomationSpec {
31 pub prompt: String,
33 pub start_at: Option<SystemTime>,
35 pub every: Duration,
37 pub stop_at: Option<SystemTime>,
39 pub max_runs: Option<u32>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct AutomationStatus {
45 pub thread_id: String,
46 pub runs_completed: u32,
47 pub next_due_at: Option<SystemTime>,
48 pub last_started_at: Option<SystemTime>,
49 pub last_finished_at: Option<SystemTime>,
50 pub state: AutomationState,
51 pub last_error: Option<String>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum AutomationState {
56 Waiting,
57 Running,
58 Stopped,
59 Failed,
60}
61
62impl AutomationState {
63 fn is_terminal(self) -> bool {
64 matches!(self, Self::Stopped | Self::Failed)
65 }
66}
67
68pub struct AutomationHandle {
69 shared: Arc<SharedState>,
70 join: Mutex<Option<JoinHandle<AutomationStatus>>>,
71}
72
73impl AutomationHandle {
74 pub async fn stop(&self) {
75 self.shared.stop_requested.store(true, Ordering::Release);
76 self.shared.stop_notify.notify_waiters();
77 {
78 let mut status = self.shared.status.lock().await;
79 if !status.state.is_terminal() {
80 status.state = AutomationState::Stopped;
81 status.next_due_at = None;
82 }
83 }
84 if let Some(join) = self.join.lock().await.as_ref() {
85 join.abort();
86 }
87 }
88
89 pub async fn wait(self) -> AutomationStatus {
90 let join = self.join.lock().await.take();
91 match join {
92 Some(join) => match join.await {
93 Ok(status) => status,
94 Err(err) => {
95 let status = self.shared.snapshot().await;
96 if err.is_cancelled() && status.state == AutomationState::Stopped {
97 return status;
98 }
99 let mut status = self.shared.snapshot().await;
100 status.state = AutomationState::Failed;
101 status.next_due_at = None;
102 status.last_error = Some(format!("automation task join error: {err}"));
103 status
104 }
105 },
106 None => self.shared.snapshot().await,
107 }
108 }
109
110 pub async fn status(&self) -> AutomationStatus {
111 self.shared.snapshot().await
112 }
113}
114
115pub fn spawn(session: Session, spec: AutomationSpec) -> AutomationHandle {
120 spawn_runner(Arc::new(SessionRunner(session)), spec)
121}
122
123type TurnFuture<'a> =
124 Pin<Box<dyn Future<Output = Result<PromptRunResult, PromptRunError>> + Send + 'a>>;
125
126trait TurnRunner: Send + Sync + 'static {
127 fn thread_id(&self) -> &str;
128 fn is_closed(&self) -> bool;
129 fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a>;
130}
131
132struct SessionRunner(Session);
133
134impl TurnRunner for SessionRunner {
135 fn thread_id(&self) -> &str {
136 &self.0.thread_id
137 }
138
139 fn is_closed(&self) -> bool {
140 self.0.is_closed()
141 }
142
143 fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a> {
144 Box::pin(self.0.ask(prompt.to_owned()))
145 }
146}
147
148struct SharedState {
149 status: Mutex<AutomationStatus>,
150 stop_requested: AtomicBool,
151 stop_notify: Notify,
152}
153
154impl SharedState {
155 fn new(status: AutomationStatus) -> Self {
156 Self {
157 status: Mutex::new(status),
158 stop_requested: AtomicBool::new(false),
159 stop_notify: Notify::new(),
160 }
161 }
162
163 async fn snapshot(&self) -> AutomationStatus {
164 self.status.lock().await.clone()
165 }
166}
167
168fn spawn_runner<R>(runner: Arc<R>, spec: AutomationSpec) -> AutomationHandle
169where
170 R: TurnRunner,
171{
172 let initial = initial_status(runner.thread_id(), runner.is_closed(), &spec);
173 let shared = Arc::new(SharedState::new(initial));
174 let task_shared = Arc::clone(&shared);
175 let task_runner = Arc::clone(&runner);
176 let join = tokio::spawn(async move { run_loop(task_runner, spec, task_shared).await });
177 AutomationHandle {
178 shared,
179 join: Mutex::new(Some(join)),
180 }
181}
182
183async fn run_loop<R>(
184 runner: Arc<R>,
185 spec: AutomationSpec,
186 shared: Arc<SharedState>,
187) -> AutomationStatus
188where
189 R: TurnRunner,
190{
191 {
192 let status = shared.status.lock().await;
193 if status.state.is_terminal() {
194 return status.clone();
195 }
196 }
197
198 let mut due_at = {
199 let status = shared.status.lock().await;
200 match status.next_due_at {
201 Some(due_at) => due_at,
202 None => {
203 drop(status);
204 return mark_failed(
205 &shared,
206 None,
207 "automation status invariant violated: missing next due time".to_owned(),
208 )
209 .await;
210 }
211 }
212 };
213
214 loop {
215 if shared.stop_requested.load(Ordering::Acquire) {
216 return mark_stopped(&shared, None).await;
217 }
218
219 if runner.is_closed() {
220 return mark_failed(&shared, None, "session is closed".to_owned()).await;
221 }
222
223 if let Some(stop_at) = spec.stop_at {
224 if SystemTime::now() >= stop_at || due_at >= stop_at {
225 return mark_stopped(&shared, None).await;
226 }
227 }
228
229 wait_until_due_or_stop(&shared, due_at).await;
230 if shared.stop_requested.load(Ordering::Acquire) {
231 return mark_stopped(&shared, None).await;
232 }
233 if runner.is_closed() {
234 return mark_failed(&shared, None, "session is closed".to_owned()).await;
235 }
236
237 let started_at = SystemTime::now();
238 if let Some(stop_at) = spec.stop_at {
239 if started_at >= stop_at {
240 return mark_stopped(&shared, None).await;
241 }
242 }
243
244 {
245 let mut status = shared.status.lock().await;
246 status.state = AutomationState::Running;
247 status.next_due_at = None;
248 status.last_started_at = Some(started_at);
249 }
250
251 let result = runner.run_prompt(spec.prompt.as_str()).await;
252 let finished_at = SystemTime::now();
253
254 match result {
255 Ok(_) => {
256 let status = {
257 let mut status = shared.status.lock().await;
258 status.runs_completed = status.runs_completed.saturating_add(1);
259 status.last_finished_at = Some(finished_at);
260 status.last_error = None;
261
262 if spec
263 .max_runs
264 .is_some_and(|limit| status.runs_completed >= limit)
265 {
266 status.state = AutomationState::Stopped;
267 status.next_due_at = None;
268 return status.clone();
269 }
270
271 let Some(next_due) = collapse_next_due(due_at, spec.every, finished_at) else {
272 status.state = AutomationState::Failed;
273 status.next_due_at = None;
274 status.last_error =
275 Some("automation schedule overflowed next due timestamp".to_owned());
276 return status.clone();
277 };
278
279 if spec.stop_at.is_some_and(|stop_at| next_due >= stop_at) {
280 status.state = AutomationState::Stopped;
281 status.next_due_at = None;
282 return status.clone();
283 }
284
285 status.state = AutomationState::Waiting;
286 status.next_due_at = Some(next_due);
287 due_at = next_due;
288 status.clone()
289 };
290
291 if status.state.is_terminal() {
292 return status;
293 }
294 }
295 Err(err) => {
296 return mark_failed(&shared, Some(finished_at), err.to_string()).await;
297 }
298 }
299 }
300}
301
302fn initial_status(
303 thread_id: &str,
304 session_closed: bool,
305 spec: &AutomationSpec,
306) -> AutomationStatus {
307 let now = SystemTime::now();
308 let mut status = AutomationStatus {
309 thread_id: thread_id.to_owned(),
310 runs_completed: 0,
311 next_due_at: None,
312 last_started_at: None,
313 last_finished_at: None,
314 state: AutomationState::Waiting,
315 last_error: None,
316 };
317
318 if spec.every.is_zero() {
319 status.state = AutomationState::Failed;
320 status.last_error = Some("automation interval must be greater than zero".to_owned());
321 return status;
322 }
323
324 if session_closed {
325 status.state = AutomationState::Failed;
326 status.last_error = Some("session is closed".to_owned());
327 return status;
328 }
329
330 if spec.max_runs == Some(0) {
331 status.state = AutomationState::Stopped;
332 return status;
333 }
334
335 let due_at = initial_due_at(spec.start_at, now);
336 if spec
337 .stop_at
338 .is_some_and(|stop_at| due_at >= stop_at || now >= stop_at)
339 {
340 status.state = AutomationState::Stopped;
341 return status;
342 }
343
344 status.next_due_at = Some(due_at);
345 status
346}
347
348fn initial_due_at(start_at: Option<SystemTime>, now: SystemTime) -> SystemTime {
349 match start_at {
350 Some(start_at) if start_at > now => start_at,
351 _ => now,
352 }
353}
354
355fn collapse_next_due(
356 last_due_at: SystemTime,
357 every: Duration,
358 now: SystemTime,
359) -> Option<SystemTime> {
360 let next_due = checked_add_system_time(last_due_at, every)?;
361 if next_due > now {
362 return Some(next_due);
363 }
364
365 let overdue = now.duration_since(last_due_at).unwrap_or_default();
366 let every_nanos = every.as_nanos();
367 if every_nanos == 0 {
368 return None;
369 }
370 let steps = (overdue.as_nanos() / every_nanos).saturating_add(1);
371 checked_add_system_time_by_factor(last_due_at, every, steps)
372}
373
374fn checked_add_system_time(base: SystemTime, delta: Duration) -> Option<SystemTime> {
375 base.checked_add(delta)
376}
377
378fn checked_add_system_time_by_factor(
379 base: SystemTime,
380 delta: Duration,
381 factor: u128,
382) -> Option<SystemTime> {
383 let scaled = checked_mul_duration(delta, factor)?;
384 base.checked_add(scaled)
385}
386
387fn checked_mul_duration(delta: Duration, factor: u128) -> Option<Duration> {
388 let secs = (delta.as_secs() as u128).checked_mul(factor)?;
389 let nanos = (delta.subsec_nanos() as u128).checked_mul(factor)?;
390 let carry_secs = nanos / 1_000_000_000;
391 let secs = secs.checked_add(carry_secs)?;
392 let nanos = (nanos % 1_000_000_000) as u32;
393 let secs = u64::try_from(secs).ok()?;
394 Some(Duration::new(secs, nanos))
395}
396
397async fn wait_until_due_or_stop(shared: &SharedState, due_at: SystemTime) {
398 let now = SystemTime::now();
399 let Some(delay) = due_at.duration_since(now).ok() else {
400 return;
401 };
402 tokio::select! {
403 _ = tokio::time::sleep(delay) => {}
404 _ = shared.stop_notify.notified() => {}
405 }
406}
407
408async fn mark_stopped(shared: &SharedState, finished_at: Option<SystemTime>) -> AutomationStatus {
409 let mut status = shared.status.lock().await;
410 status.state = AutomationState::Stopped;
411 status.next_due_at = None;
412 if let Some(finished_at) = finished_at {
413 status.last_finished_at = Some(finished_at);
414 }
415 status.clone()
416}
417
418async fn mark_failed(
419 shared: &SharedState,
420 finished_at: Option<SystemTime>,
421 last_error: String,
422) -> AutomationStatus {
423 let mut status = shared.status.lock().await;
424 status.state = AutomationState::Failed;
425 status.next_due_at = None;
426 if let Some(finished_at) = finished_at {
427 status.last_finished_at = Some(finished_at);
428 }
429 status.last_error = Some(last_error);
430 status.clone()
431}
432
433#[cfg(test)]
434mod tests {
435 use std::collections::VecDeque;
436 use std::fs;
437 use std::path::{Path, PathBuf};
438 use std::sync::atomic::{AtomicUsize, Ordering};
439 use std::sync::{Arc, Mutex as StdMutex};
440
441 use super::*;
442 use crate::runtime::{Client, ClientConfig, SessionConfig};
443
444 #[test]
445 fn collapse_next_due_skips_missed_ticks() {
446 let base = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
447 let now = base + Duration::from_millis(35);
448 let next_due = collapse_next_due(base, Duration::from_millis(10), now).expect("next due");
449 assert_eq!(next_due, base + Duration::from_millis(40));
450 }
451
452 #[test]
453 fn initial_status_stops_when_start_hits_stop_boundary() {
454 let now = SystemTime::now();
455 let at = now + Duration::from_millis(30);
456 let spec = AutomationSpec {
457 prompt: "night run".to_owned(),
458 start_at: Some(at),
459 every: Duration::from_millis(10),
460 stop_at: Some(at),
461 max_runs: None,
462 };
463 let status = initial_status("thr_test", false, &spec);
464 assert_eq!(status.state, AutomationState::Stopped);
465 assert_eq!(status.next_due_at, None);
466 }
467
468 #[test]
469 fn initial_status_fails_when_interval_is_zero() {
470 let spec = AutomationSpec {
471 prompt: "night run".to_owned(),
472 start_at: None,
473 every: Duration::ZERO,
474 stop_at: None,
475 max_runs: None,
476 };
477 let status = initial_status("thr_test", false, &spec);
478 assert_eq!(status.state, AutomationState::Failed);
479 assert_eq!(
480 status.last_error.as_deref(),
481 Some("automation interval must be greater than zero")
482 );
483 }
484
485 #[tokio::test(flavor = "current_thread")]
486 async fn runner_stop_signal_keeps_single_flight() {
487 let state = Arc::new(FakeRunnerState::new(
488 Duration::from_millis(40),
489 vec![Ok(()), Ok(())],
490 ));
491 let handle = spawn_runner(
492 Arc::new(FakeRunner::new("thr_stop", Arc::clone(&state))),
493 AutomationSpec {
494 prompt: "keep going".to_owned(),
495 start_at: None,
496 every: Duration::from_millis(5),
497 stop_at: None,
498 max_runs: None,
499 },
500 );
501
502 state.first_run_started.notified().await;
503 handle.stop().await;
504 let status = handle.wait().await;
505
506 assert_eq!(status.state, AutomationState::Stopped);
507 assert_eq!(status.runs_completed, 0);
508 assert_eq!(state.max_active.load(Ordering::SeqCst), 1);
509 }
510
511 #[tokio::test(flavor = "current_thread")]
512 async fn stop_aborts_in_flight_run_prompt() {
513 let state = Arc::new(FakeRunnerState::new(Duration::from_secs(60), vec![Ok(())]));
514 let handle = spawn_runner(
515 Arc::new(FakeRunner::new("thr_abort", Arc::clone(&state))),
516 AutomationSpec {
517 prompt: "block".to_owned(),
518 start_at: None,
519 every: Duration::from_millis(5),
520 stop_at: None,
521 max_runs: None,
522 },
523 );
524
525 state.first_run_started.notified().await;
526 handle.stop().await;
527 let status = tokio::time::timeout(Duration::from_secs(1), handle.wait())
528 .await
529 .expect("wait should not hang after stop");
530
531 assert_eq!(status.state, AutomationState::Stopped);
532 assert_eq!(status.runs_completed, 0);
533 }
534
535 #[tokio::test(flavor = "current_thread")]
536 async fn runner_marks_failure_on_prompt_error() {
537 let state = Arc::new(FakeRunnerState::new(
538 Duration::ZERO,
539 vec![Err(PromptRunError::TurnFailed)],
540 ));
541
542 let handle = spawn_runner(
543 Arc::new(FakeRunner::new("thr_failed", Arc::clone(&state))),
544 AutomationSpec {
545 prompt: "fail once".to_owned(),
546 start_at: None,
547 every: Duration::from_millis(10),
548 stop_at: None,
549 max_runs: Some(3),
550 },
551 );
552
553 let status = handle.wait().await;
554 assert_eq!(status.state, AutomationState::Failed);
555 assert_eq!(status.runs_completed, 0);
556 assert_eq!(status.last_error.as_deref(), Some("turn failed"));
557 }
558
559 #[tokio::test(flavor = "current_thread")]
560 async fn runner_respects_delayed_start_and_max_runs() {
561 let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
562 let start_at = SystemTime::now() + Duration::from_millis(30);
563
564 let handle = spawn_runner(
565 Arc::new(FakeRunner::new("thr_delayed", Arc::clone(&state))),
566 AutomationSpec {
567 prompt: "delayed".to_owned(),
568 start_at: Some(start_at),
569 every: Duration::from_millis(15),
570 stop_at: None,
571 max_runs: Some(2),
572 },
573 );
574
575 let waiting = handle.status().await;
576 assert_eq!(waiting.state, AutomationState::Waiting);
577 assert_eq!(waiting.next_due_at, Some(start_at));
578
579 let status = handle.wait().await;
580 assert_eq!(status.state, AutomationState::Stopped);
581 assert_eq!(status.runs_completed, 2);
582 }
583
584 #[tokio::test(flavor = "current_thread")]
585 async fn runner_stops_at_stop_at_boundary_after_completed_run() {
586 let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
587 let start_at = SystemTime::now() + Duration::from_millis(10);
588 let stop_at = start_at + Duration::from_millis(20);
589
590 let handle = spawn_runner(
591 Arc::new(FakeRunner::new("thr_stop_at", Arc::clone(&state))),
592 AutomationSpec {
593 prompt: "bounded".to_owned(),
594 start_at: Some(start_at),
595 every: Duration::from_millis(20),
596 stop_at: Some(stop_at),
597 max_runs: None,
598 },
599 );
600
601 let status = handle.wait().await;
602 assert_eq!(status.state, AutomationState::Stopped);
603 assert_eq!(status.runs_completed, 1);
604 assert_eq!(status.next_due_at, None);
605 }
606
607 #[tokio::test(flavor = "current_thread")]
608 async fn automation_reuses_loaded_session_thread_for_repeated_runs() {
609 let temp = TempDir::new("automation_reuse");
610 let cli = write_resume_sensitive_cli_script(&temp.root, 0);
611 let client = Client::connect(
612 ClientConfig::new()
613 .with_cli_bin(cli)
614 .without_compatibility_guard(),
615 )
616 .await
617 .expect("connect client");
618 let session = client
619 .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
620 .await
621 .expect("start session");
622 let thread_id = session.thread_id.clone();
623
624 let handle = spawn(
625 session,
626 AutomationSpec {
627 prompt: "repeat".to_owned(),
628 start_at: None,
629 every: Duration::from_millis(10),
630 stop_at: None,
631 max_runs: Some(2),
632 },
633 );
634
635 let status = handle.wait().await;
636 assert_eq!(status.state, AutomationState::Stopped);
637 assert_eq!(status.runs_completed, 2);
638 assert_eq!(status.thread_id, thread_id);
639
640 client.shutdown().await.expect("shutdown client");
641 }
642
643 #[tokio::test(flavor = "current_thread")]
644 async fn automation_fails_when_session_is_closed_before_due_run() {
645 let temp = TempDir::new("automation_closed");
646 let cli = write_resume_sensitive_cli_script(&temp.root, 0);
647 let client = Client::connect(
648 ClientConfig::new()
649 .with_cli_bin(cli)
650 .without_compatibility_guard(),
651 )
652 .await
653 .expect("connect client");
654 let session = client
655 .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
656 .await
657 .expect("start session");
658 let session_to_close = session.clone();
659
660 let handle = spawn(
661 session,
662 AutomationSpec {
663 prompt: "repeat".to_owned(),
664 start_at: Some(SystemTime::now() + Duration::from_millis(30)),
665 every: Duration::from_millis(10),
666 stop_at: None,
667 max_runs: Some(2),
668 },
669 );
670
671 session_to_close.close().await.expect("close session");
672 let status = handle.wait().await;
673
674 assert_eq!(status.state, AutomationState::Failed);
675 assert_eq!(status.runs_completed, 0);
676 assert!(status
677 .last_error
678 .as_deref()
679 .is_some_and(|error| error.contains("session is closed")));
680
681 client.shutdown().await.expect("shutdown client");
682 }
683
684 struct FakeRunner {
685 thread_id: String,
686 state: Arc<FakeRunnerState>,
687 }
688
689 impl FakeRunner {
690 fn new(thread_id: &str, state: Arc<FakeRunnerState>) -> Self {
691 Self {
692 thread_id: thread_id.to_owned(),
693 state,
694 }
695 }
696 }
697
698 impl TurnRunner for FakeRunner {
699 fn thread_id(&self) -> &str {
700 &self.thread_id
701 }
702
703 fn is_closed(&self) -> bool {
704 false
705 }
706
707 fn run_prompt<'a>(&'a self, _prompt: &'a str) -> TurnFuture<'a> {
708 Box::pin(async move {
709 let active = self.state.active.fetch_add(1, Ordering::SeqCst) + 1;
710 self.state.max_active.fetch_max(active, Ordering::SeqCst);
711 self.state.first_run_started.notify_waiters();
712 if !self.state.delay.is_zero() {
713 tokio::time::sleep(self.state.delay).await;
714 }
715 self.state.active.fetch_sub(1, Ordering::SeqCst);
716 let next = self
717 .state
718 .results
719 .lock()
720 .expect("results lock")
721 .pop_front()
722 .unwrap_or(Ok(()));
723 next.map(|_| PromptRunResult {
724 thread_id: self.thread_id.clone(),
725 turn_id: format!(
726 "turn_{}",
727 self.state.turn_counter.fetch_add(1, Ordering::SeqCst)
728 ),
729 assistant_text: "ok".to_owned(),
730 })
731 })
732 }
733 }
734
735 struct FakeRunnerState {
736 delay: Duration,
737 results: StdMutex<VecDeque<Result<(), PromptRunError>>>,
738 active: AtomicUsize,
739 max_active: AtomicUsize,
740 turn_counter: AtomicUsize,
741 first_run_started: Notify,
742 }
743
744 impl FakeRunnerState {
745 fn new(delay: Duration, results: Vec<Result<(), PromptRunError>>) -> Self {
746 Self {
747 delay,
748 results: StdMutex::new(VecDeque::from(results)),
749 active: AtomicUsize::new(0),
750 max_active: AtomicUsize::new(0),
751 turn_counter: AtomicUsize::new(0),
752 first_run_started: Notify::new(),
753 }
754 }
755 }
756
757 #[derive(Debug)]
758 struct TempDir {
759 root: PathBuf,
760 }
761
762 impl TempDir {
763 fn new(prefix: &str) -> Self {
764 let root = std::env::temp_dir().join(format!("{prefix}_{}", uuid::Uuid::new_v4()));
765 fs::create_dir_all(&root).expect("create temp root");
766 Self { root }
767 }
768 }
769
770 impl Drop for TempDir {
771 fn drop(&mut self) {
772 let _ = fs::remove_dir_all(&self.root);
773 }
774 }
775
776 fn write_resume_sensitive_cli_script(root: &Path, allowed_resume_calls: usize) -> PathBuf {
777 let path = root.join("mock_codex_cli_resume_sensitive.py");
778 let script = r#"#!/usr/bin/env python3
779import json
780import sys
781
782allowed_resume_calls = __ALLOWED_RESUME_CALLS__
783resume_calls = 0
784
785for line in sys.stdin:
786 line = line.strip()
787 if not line:
788 continue
789 try:
790 msg = json.loads(line)
791 except Exception:
792 continue
793
794 rpc_id = msg.get("id")
795 method = msg.get("method")
796 params = msg.get("params") or {}
797
798 if rpc_id is None:
799 continue
800
801 if method == "initialize":
802 sys.stdout.write(json.dumps({
803 "id": rpc_id,
804 "result": {"ready": True, "userAgent": "Codex Desktop/0.104.0"}
805 }) + "\n")
806 sys.stdout.flush()
807 continue
808
809 if method == "thread/start":
810 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": "thr_automation"}}}) + "\n")
811 sys.stdout.flush()
812 continue
813
814 if method == "thread/resume":
815 resume_calls += 1
816 if resume_calls > allowed_resume_calls:
817 sys.stdout.write(json.dumps({
818 "id": rpc_id,
819 "error": {"code": -32002, "message": f"unexpected thread/resume call #{resume_calls}"}
820 }) + "\n")
821 else:
822 thread_id = params.get("threadId") or "thr_automation"
823 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": thread_id}}}) + "\n")
824 sys.stdout.flush()
825 continue
826
827 if method == "turn/start":
828 thread_id = params.get("threadId") or "thr_automation"
829 turn_id = "turn_" + str(resume_calls)
830 text = "ok"
831 input_items = params.get("input") or []
832 if input_items and isinstance(input_items[0], dict):
833 text = input_items[0].get("text") or "ok"
834
835 sys.stdout.write(json.dumps({"method":"turn/started","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
836 sys.stdout.write(json.dumps({"method":"item/started","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","itemType":"agentMessage"}}) + "\n")
837 sys.stdout.write(json.dumps({"method":"item/agentMessage/delta","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","delta":text}}) + "\n")
838 sys.stdout.write(json.dumps({"method":"turn/completed","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
839 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"turn": {"id": turn_id}}}) + "\n")
840 sys.stdout.flush()
841 continue
842
843 if method == "thread/archive":
844 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
845 sys.stdout.flush()
846 continue
847
848 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
849 sys.stdout.flush()
850"#;
851 let script = script.replace(
852 "__ALLOWED_RESUME_CALLS__",
853 &allowed_resume_calls.to_string(),
854 );
855 fs::write(&path, script).expect("write cli script");
856 #[cfg(unix)]
857 {
858 use std::os::unix::fs::PermissionsExt;
859
860 let mut permissions = fs::metadata(&path).expect("metadata").permissions();
861 permissions.set_mode(0o755);
862 fs::set_permissions(&path, permissions).expect("set executable");
863 }
864 path
865 }
866}