1use std::future::Future;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, SystemTime};
23
24use tokio::sync::{Mutex, Notify};
25use tokio::task::JoinHandle;
26
27use crate::runtime::{PromptRunError, PromptRunResult, Session};
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct AutomationSpec {
31 pub prompt: String,
33 pub start_at: Option<SystemTime>,
35 pub every: Duration,
37 pub stop_at: Option<SystemTime>,
39 pub max_runs: Option<u32>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct AutomationStatus {
45 pub thread_id: String,
46 pub runs_completed: u32,
47 pub next_due_at: Option<SystemTime>,
48 pub last_started_at: Option<SystemTime>,
49 pub last_finished_at: Option<SystemTime>,
50 pub state: AutomationState,
51 pub last_error: Option<String>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum AutomationState {
56 Waiting,
57 Running,
58 Stopped,
59 Failed,
60}
61
62impl AutomationState {
63 fn is_terminal(self) -> bool {
64 matches!(self, Self::Stopped | Self::Failed)
65 }
66}
67
68pub struct AutomationHandle {
69 shared: Arc<SharedState>,
70 join: Mutex<Option<JoinHandle<AutomationStatus>>>,
71}
72
73impl AutomationHandle {
74 pub async fn stop(&self) {
75 self.shared.stop_requested.store(true, Ordering::Release);
76 self.shared.stop_notify.notify_waiters();
77 }
78
79 pub async fn wait(self) -> AutomationStatus {
80 let join = self.join.lock().await.take();
81 match join {
82 Some(join) => match join.await {
83 Ok(status) => status,
84 Err(err) => {
85 let mut status = self.shared.snapshot().await;
86 status.state = AutomationState::Failed;
87 status.next_due_at = None;
88 status.last_error = Some(format!("automation task join error: {err}"));
89 status
90 }
91 },
92 None => self.shared.snapshot().await,
93 }
94 }
95
96 pub async fn status(&self) -> AutomationStatus {
97 self.shared.snapshot().await
98 }
99}
100
101pub fn spawn(session: Session, spec: AutomationSpec) -> AutomationHandle {
106 spawn_runner(Arc::new(SessionRunner(session)), spec)
107}
108
109type TurnFuture<'a> =
110 Pin<Box<dyn Future<Output = Result<PromptRunResult, PromptRunError>> + Send + 'a>>;
111
112trait TurnRunner: Send + Sync + 'static {
113 fn thread_id(&self) -> &str;
114 fn is_closed(&self) -> bool;
115 fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a>;
116}
117
118struct SessionRunner(Session);
119
120impl TurnRunner for SessionRunner {
121 fn thread_id(&self) -> &str {
122 &self.0.thread_id
123 }
124
125 fn is_closed(&self) -> bool {
126 self.0.is_closed()
127 }
128
129 fn run_prompt<'a>(&'a self, prompt: &'a str) -> TurnFuture<'a> {
130 Box::pin(self.0.ask(prompt.to_owned()))
131 }
132}
133
134struct SharedState {
135 status: Mutex<AutomationStatus>,
136 stop_requested: AtomicBool,
137 stop_notify: Notify,
138}
139
140impl SharedState {
141 fn new(status: AutomationStatus) -> Self {
142 Self {
143 status: Mutex::new(status),
144 stop_requested: AtomicBool::new(false),
145 stop_notify: Notify::new(),
146 }
147 }
148
149 async fn snapshot(&self) -> AutomationStatus {
150 self.status.lock().await.clone()
151 }
152}
153
154fn spawn_runner<R>(runner: Arc<R>, spec: AutomationSpec) -> AutomationHandle
155where
156 R: TurnRunner,
157{
158 let initial = initial_status(runner.thread_id(), runner.is_closed(), &spec);
159 let shared = Arc::new(SharedState::new(initial));
160 let task_shared = Arc::clone(&shared);
161 let task_runner = Arc::clone(&runner);
162 let join = tokio::spawn(async move { run_loop(task_runner, spec, task_shared).await });
163 AutomationHandle {
164 shared,
165 join: Mutex::new(Some(join)),
166 }
167}
168
169async fn run_loop<R>(
170 runner: Arc<R>,
171 spec: AutomationSpec,
172 shared: Arc<SharedState>,
173) -> AutomationStatus
174where
175 R: TurnRunner,
176{
177 {
178 let status = shared.status.lock().await;
179 if status.state.is_terminal() {
180 return status.clone();
181 }
182 }
183
184 let mut due_at = {
185 let status = shared.status.lock().await;
186 match status.next_due_at {
187 Some(due_at) => due_at,
188 None => {
189 drop(status);
190 return mark_failed(
191 &shared,
192 None,
193 "automation status invariant violated: missing next due time".to_owned(),
194 )
195 .await;
196 }
197 }
198 };
199
200 loop {
201 if shared.stop_requested.load(Ordering::Acquire) {
202 return mark_stopped(&shared, None).await;
203 }
204
205 if runner.is_closed() {
206 return mark_failed(&shared, None, "session is closed".to_owned()).await;
207 }
208
209 if let Some(stop_at) = spec.stop_at {
210 if SystemTime::now() >= stop_at || due_at >= stop_at {
211 return mark_stopped(&shared, None).await;
212 }
213 }
214
215 wait_until_due_or_stop(&shared, due_at).await;
216 if shared.stop_requested.load(Ordering::Acquire) {
217 return mark_stopped(&shared, None).await;
218 }
219 if runner.is_closed() {
220 return mark_failed(&shared, None, "session is closed".to_owned()).await;
221 }
222
223 let started_at = SystemTime::now();
224 if let Some(stop_at) = spec.stop_at {
225 if started_at >= stop_at {
226 return mark_stopped(&shared, None).await;
227 }
228 }
229
230 {
231 let mut status = shared.status.lock().await;
232 status.state = AutomationState::Running;
233 status.next_due_at = None;
234 status.last_started_at = Some(started_at);
235 }
236
237 let result = runner.run_prompt(spec.prompt.as_str()).await;
238 let finished_at = SystemTime::now();
239
240 match result {
241 Ok(_) => {
242 let status = {
243 let mut status = shared.status.lock().await;
244 status.runs_completed = status.runs_completed.saturating_add(1);
245 status.last_finished_at = Some(finished_at);
246 status.last_error = None;
247
248 if spec
249 .max_runs
250 .is_some_and(|limit| status.runs_completed >= limit)
251 {
252 status.state = AutomationState::Stopped;
253 status.next_due_at = None;
254 return status.clone();
255 }
256
257 let Some(next_due) = collapse_next_due(due_at, spec.every, finished_at) else {
258 status.state = AutomationState::Failed;
259 status.next_due_at = None;
260 status.last_error =
261 Some("automation schedule overflowed next due timestamp".to_owned());
262 return status.clone();
263 };
264
265 if spec.stop_at.is_some_and(|stop_at| next_due >= stop_at) {
266 status.state = AutomationState::Stopped;
267 status.next_due_at = None;
268 return status.clone();
269 }
270
271 status.state = AutomationState::Waiting;
272 status.next_due_at = Some(next_due);
273 due_at = next_due;
274 status.clone()
275 };
276
277 if status.state.is_terminal() {
278 return status;
279 }
280 }
281 Err(err) => {
282 return mark_failed(&shared, Some(finished_at), err.to_string()).await;
283 }
284 }
285 }
286}
287
288fn initial_status(
289 thread_id: &str,
290 session_closed: bool,
291 spec: &AutomationSpec,
292) -> AutomationStatus {
293 let now = SystemTime::now();
294 let mut status = AutomationStatus {
295 thread_id: thread_id.to_owned(),
296 runs_completed: 0,
297 next_due_at: None,
298 last_started_at: None,
299 last_finished_at: None,
300 state: AutomationState::Waiting,
301 last_error: None,
302 };
303
304 if spec.every.is_zero() {
305 status.state = AutomationState::Failed;
306 status.last_error = Some("automation interval must be greater than zero".to_owned());
307 return status;
308 }
309
310 if session_closed {
311 status.state = AutomationState::Failed;
312 status.last_error = Some("session is closed".to_owned());
313 return status;
314 }
315
316 if spec.max_runs == Some(0) {
317 status.state = AutomationState::Stopped;
318 return status;
319 }
320
321 let due_at = initial_due_at(spec.start_at, now);
322 if spec
323 .stop_at
324 .is_some_and(|stop_at| due_at >= stop_at || now >= stop_at)
325 {
326 status.state = AutomationState::Stopped;
327 return status;
328 }
329
330 status.next_due_at = Some(due_at);
331 status
332}
333
334fn initial_due_at(start_at: Option<SystemTime>, now: SystemTime) -> SystemTime {
335 match start_at {
336 Some(start_at) if start_at > now => start_at,
337 _ => now,
338 }
339}
340
341fn collapse_next_due(
342 last_due_at: SystemTime,
343 every: Duration,
344 now: SystemTime,
345) -> Option<SystemTime> {
346 let next_due = checked_add_system_time(last_due_at, every)?;
347 if next_due > now {
348 return Some(next_due);
349 }
350
351 let overdue = now.duration_since(last_due_at).unwrap_or_default();
352 let every_nanos = every.as_nanos();
353 if every_nanos == 0 {
354 return None;
355 }
356 let steps = (overdue.as_nanos() / every_nanos).saturating_add(1);
357 checked_add_system_time_by_factor(last_due_at, every, steps)
358}
359
360fn checked_add_system_time(base: SystemTime, delta: Duration) -> Option<SystemTime> {
361 base.checked_add(delta)
362}
363
364fn checked_add_system_time_by_factor(
365 base: SystemTime,
366 delta: Duration,
367 factor: u128,
368) -> Option<SystemTime> {
369 let scaled = checked_mul_duration(delta, factor)?;
370 base.checked_add(scaled)
371}
372
373fn checked_mul_duration(delta: Duration, factor: u128) -> Option<Duration> {
374 let secs = (delta.as_secs() as u128).checked_mul(factor)?;
375 let nanos = (delta.subsec_nanos() as u128).checked_mul(factor)?;
376 let carry_secs = nanos / 1_000_000_000;
377 let secs = secs.checked_add(carry_secs)?;
378 let nanos = (nanos % 1_000_000_000) as u32;
379 let secs = u64::try_from(secs).ok()?;
380 Some(Duration::new(secs, nanos))
381}
382
383async fn wait_until_due_or_stop(shared: &SharedState, due_at: SystemTime) {
384 let now = SystemTime::now();
385 let Some(delay) = due_at.duration_since(now).ok() else {
386 return;
387 };
388 tokio::select! {
389 _ = tokio::time::sleep(delay) => {}
390 _ = shared.stop_notify.notified() => {}
391 }
392}
393
394async fn mark_stopped(shared: &SharedState, finished_at: Option<SystemTime>) -> AutomationStatus {
395 let mut status = shared.status.lock().await;
396 status.state = AutomationState::Stopped;
397 status.next_due_at = None;
398 if let Some(finished_at) = finished_at {
399 status.last_finished_at = Some(finished_at);
400 }
401 status.clone()
402}
403
404async fn mark_failed(
405 shared: &SharedState,
406 finished_at: Option<SystemTime>,
407 last_error: String,
408) -> AutomationStatus {
409 let mut status = shared.status.lock().await;
410 status.state = AutomationState::Failed;
411 status.next_due_at = None;
412 if let Some(finished_at) = finished_at {
413 status.last_finished_at = Some(finished_at);
414 }
415 status.last_error = Some(last_error);
416 status.clone()
417}
418
419#[cfg(test)]
420mod tests {
421 use std::collections::VecDeque;
422 use std::fs;
423 use std::path::{Path, PathBuf};
424 use std::sync::atomic::{AtomicUsize, Ordering};
425 use std::sync::{Arc, Mutex as StdMutex};
426
427 use super::*;
428 use crate::runtime::{Client, ClientConfig, SessionConfig};
429
430 #[test]
431 fn collapse_next_due_skips_missed_ticks() {
432 let base = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
433 let now = base + Duration::from_millis(35);
434 let next_due = collapse_next_due(base, Duration::from_millis(10), now).expect("next due");
435 assert_eq!(next_due, base + Duration::from_millis(40));
436 }
437
438 #[test]
439 fn initial_status_stops_when_start_hits_stop_boundary() {
440 let now = SystemTime::now();
441 let at = now + Duration::from_millis(30);
442 let spec = AutomationSpec {
443 prompt: "night run".to_owned(),
444 start_at: Some(at),
445 every: Duration::from_millis(10),
446 stop_at: Some(at),
447 max_runs: None,
448 };
449 let status = initial_status("thr_test", false, &spec);
450 assert_eq!(status.state, AutomationState::Stopped);
451 assert_eq!(status.next_due_at, None);
452 }
453
454 #[test]
455 fn initial_status_fails_when_interval_is_zero() {
456 let spec = AutomationSpec {
457 prompt: "night run".to_owned(),
458 start_at: None,
459 every: Duration::ZERO,
460 stop_at: None,
461 max_runs: None,
462 };
463 let status = initial_status("thr_test", false, &spec);
464 assert_eq!(status.state, AutomationState::Failed);
465 assert_eq!(
466 status.last_error.as_deref(),
467 Some("automation interval must be greater than zero")
468 );
469 }
470
471 #[tokio::test(flavor = "current_thread")]
472 async fn runner_stop_signal_keeps_single_flight() {
473 let state = Arc::new(FakeRunnerState::new(
474 Duration::from_millis(40),
475 vec![Ok(()), Ok(())],
476 ));
477 let handle = spawn_runner(
478 Arc::new(FakeRunner::new("thr_stop", Arc::clone(&state))),
479 AutomationSpec {
480 prompt: "keep going".to_owned(),
481 start_at: None,
482 every: Duration::from_millis(5),
483 stop_at: None,
484 max_runs: None,
485 },
486 );
487
488 state.first_run_started.notified().await;
489 handle.stop().await;
490 let status = handle.wait().await;
491
492 assert_eq!(status.state, AutomationState::Stopped);
493 assert_eq!(status.runs_completed, 1);
494 assert_eq!(state.max_active.load(Ordering::SeqCst), 1);
495 }
496
497 #[tokio::test(flavor = "current_thread")]
498 async fn runner_marks_failure_on_prompt_error() {
499 let state = Arc::new(FakeRunnerState::new(
500 Duration::ZERO,
501 vec![Err(PromptRunError::TurnFailed)],
502 ));
503
504 let handle = spawn_runner(
505 Arc::new(FakeRunner::new("thr_failed", Arc::clone(&state))),
506 AutomationSpec {
507 prompt: "fail once".to_owned(),
508 start_at: None,
509 every: Duration::from_millis(10),
510 stop_at: None,
511 max_runs: Some(3),
512 },
513 );
514
515 let status = handle.wait().await;
516 assert_eq!(status.state, AutomationState::Failed);
517 assert_eq!(status.runs_completed, 0);
518 assert_eq!(status.last_error.as_deref(), Some("turn failed"));
519 }
520
521 #[tokio::test(flavor = "current_thread")]
522 async fn runner_respects_delayed_start_and_max_runs() {
523 let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
524 let start_at = SystemTime::now() + Duration::from_millis(30);
525
526 let handle = spawn_runner(
527 Arc::new(FakeRunner::new("thr_delayed", Arc::clone(&state))),
528 AutomationSpec {
529 prompt: "delayed".to_owned(),
530 start_at: Some(start_at),
531 every: Duration::from_millis(15),
532 stop_at: None,
533 max_runs: Some(2),
534 },
535 );
536
537 let waiting = handle.status().await;
538 assert_eq!(waiting.state, AutomationState::Waiting);
539 assert_eq!(waiting.next_due_at, Some(start_at));
540
541 let status = handle.wait().await;
542 assert_eq!(status.state, AutomationState::Stopped);
543 assert_eq!(status.runs_completed, 2);
544 }
545
546 #[tokio::test(flavor = "current_thread")]
547 async fn runner_stops_at_stop_at_boundary_after_completed_run() {
548 let state = Arc::new(FakeRunnerState::new(Duration::ZERO, vec![Ok(()), Ok(())]));
549 let start_at = SystemTime::now() + Duration::from_millis(10);
550 let stop_at = start_at + Duration::from_millis(20);
551
552 let handle = spawn_runner(
553 Arc::new(FakeRunner::new("thr_stop_at", Arc::clone(&state))),
554 AutomationSpec {
555 prompt: "bounded".to_owned(),
556 start_at: Some(start_at),
557 every: Duration::from_millis(20),
558 stop_at: Some(stop_at),
559 max_runs: None,
560 },
561 );
562
563 let status = handle.wait().await;
564 assert_eq!(status.state, AutomationState::Stopped);
565 assert_eq!(status.runs_completed, 1);
566 assert_eq!(status.next_due_at, None);
567 }
568
569 #[tokio::test(flavor = "current_thread")]
570 async fn automation_reuses_loaded_session_thread_for_repeated_runs() {
571 let temp = TempDir::new("automation_reuse");
572 let cli = write_resume_sensitive_cli_script(&temp.root, 0);
573 let client = Client::connect(
574 ClientConfig::new()
575 .with_cli_bin(cli)
576 .without_compatibility_guard(),
577 )
578 .await
579 .expect("connect client");
580 let session = client
581 .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
582 .await
583 .expect("start session");
584 let thread_id = session.thread_id.clone();
585
586 let handle = spawn(
587 session,
588 AutomationSpec {
589 prompt: "repeat".to_owned(),
590 start_at: None,
591 every: Duration::from_millis(10),
592 stop_at: None,
593 max_runs: Some(2),
594 },
595 );
596
597 let status = handle.wait().await;
598 assert_eq!(status.state, AutomationState::Stopped);
599 assert_eq!(status.runs_completed, 2);
600 assert_eq!(status.thread_id, thread_id);
601
602 client.shutdown().await.expect("shutdown client");
603 }
604
605 #[tokio::test(flavor = "current_thread")]
606 async fn automation_fails_when_session_is_closed_before_due_run() {
607 let temp = TempDir::new("automation_closed");
608 let cli = write_resume_sensitive_cli_script(&temp.root, 0);
609 let client = Client::connect(
610 ClientConfig::new()
611 .with_cli_bin(cli)
612 .without_compatibility_guard(),
613 )
614 .await
615 .expect("connect client");
616 let session = client
617 .start_session(SessionConfig::new(temp.root.to_string_lossy().to_string()))
618 .await
619 .expect("start session");
620 let session_to_close = session.clone();
621
622 let handle = spawn(
623 session,
624 AutomationSpec {
625 prompt: "repeat".to_owned(),
626 start_at: Some(SystemTime::now() + Duration::from_millis(30)),
627 every: Duration::from_millis(10),
628 stop_at: None,
629 max_runs: Some(2),
630 },
631 );
632
633 session_to_close.close().await.expect("close session");
634 let status = handle.wait().await;
635
636 assert_eq!(status.state, AutomationState::Failed);
637 assert_eq!(status.runs_completed, 0);
638 assert!(status
639 .last_error
640 .as_deref()
641 .is_some_and(|error| error.contains("session is closed")));
642
643 client.shutdown().await.expect("shutdown client");
644 }
645
646 struct FakeRunner {
647 thread_id: String,
648 state: Arc<FakeRunnerState>,
649 }
650
651 impl FakeRunner {
652 fn new(thread_id: &str, state: Arc<FakeRunnerState>) -> Self {
653 Self {
654 thread_id: thread_id.to_owned(),
655 state,
656 }
657 }
658 }
659
660 impl TurnRunner for FakeRunner {
661 fn thread_id(&self) -> &str {
662 &self.thread_id
663 }
664
665 fn is_closed(&self) -> bool {
666 false
667 }
668
669 fn run_prompt<'a>(&'a self, _prompt: &'a str) -> TurnFuture<'a> {
670 Box::pin(async move {
671 let active = self.state.active.fetch_add(1, Ordering::SeqCst) + 1;
672 self.state.max_active.fetch_max(active, Ordering::SeqCst);
673 self.state.first_run_started.notify_waiters();
674 if !self.state.delay.is_zero() {
675 tokio::time::sleep(self.state.delay).await;
676 }
677 self.state.active.fetch_sub(1, Ordering::SeqCst);
678 let next = self
679 .state
680 .results
681 .lock()
682 .expect("results lock")
683 .pop_front()
684 .unwrap_or(Ok(()));
685 next.map(|_| PromptRunResult {
686 thread_id: self.thread_id.clone(),
687 turn_id: format!(
688 "turn_{}",
689 self.state.turn_counter.fetch_add(1, Ordering::SeqCst)
690 ),
691 assistant_text: "ok".to_owned(),
692 })
693 })
694 }
695 }
696
697 struct FakeRunnerState {
698 delay: Duration,
699 results: StdMutex<VecDeque<Result<(), PromptRunError>>>,
700 active: AtomicUsize,
701 max_active: AtomicUsize,
702 turn_counter: AtomicUsize,
703 first_run_started: Notify,
704 }
705
706 impl FakeRunnerState {
707 fn new(delay: Duration, results: Vec<Result<(), PromptRunError>>) -> Self {
708 Self {
709 delay,
710 results: StdMutex::new(VecDeque::from(results)),
711 active: AtomicUsize::new(0),
712 max_active: AtomicUsize::new(0),
713 turn_counter: AtomicUsize::new(0),
714 first_run_started: Notify::new(),
715 }
716 }
717 }
718
719 #[derive(Debug)]
720 struct TempDir {
721 root: PathBuf,
722 }
723
724 impl TempDir {
725 fn new(prefix: &str) -> Self {
726 let root = std::env::temp_dir().join(format!("{prefix}_{}", uuid::Uuid::new_v4()));
727 fs::create_dir_all(&root).expect("create temp root");
728 Self { root }
729 }
730 }
731
732 impl Drop for TempDir {
733 fn drop(&mut self) {
734 let _ = fs::remove_dir_all(&self.root);
735 }
736 }
737
738 fn write_resume_sensitive_cli_script(root: &Path, allowed_resume_calls: usize) -> PathBuf {
739 let path = root.join("mock_codex_cli_resume_sensitive.py");
740 let script = r#"#!/usr/bin/env python3
741import json
742import sys
743
744allowed_resume_calls = __ALLOWED_RESUME_CALLS__
745resume_calls = 0
746
747for line in sys.stdin:
748 line = line.strip()
749 if not line:
750 continue
751 try:
752 msg = json.loads(line)
753 except Exception:
754 continue
755
756 rpc_id = msg.get("id")
757 method = msg.get("method")
758 params = msg.get("params") or {}
759
760 if rpc_id is None:
761 continue
762
763 if method == "initialize":
764 sys.stdout.write(json.dumps({
765 "id": rpc_id,
766 "result": {"ready": True, "userAgent": "Codex Desktop/0.104.0"}
767 }) + "\n")
768 sys.stdout.flush()
769 continue
770
771 if method == "thread/start":
772 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": "thr_automation"}}}) + "\n")
773 sys.stdout.flush()
774 continue
775
776 if method == "thread/resume":
777 resume_calls += 1
778 if resume_calls > allowed_resume_calls:
779 sys.stdout.write(json.dumps({
780 "id": rpc_id,
781 "error": {"code": -32002, "message": f"unexpected thread/resume call #{resume_calls}"}
782 }) + "\n")
783 else:
784 thread_id = params.get("threadId") or "thr_automation"
785 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"thread": {"id": thread_id}}}) + "\n")
786 sys.stdout.flush()
787 continue
788
789 if method == "turn/start":
790 thread_id = params.get("threadId") or "thr_automation"
791 turn_id = "turn_" + str(resume_calls)
792 text = "ok"
793 input_items = params.get("input") or []
794 if input_items and isinstance(input_items[0], dict):
795 text = input_items[0].get("text") or "ok"
796
797 sys.stdout.write(json.dumps({"method":"turn/started","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
798 sys.stdout.write(json.dumps({"method":"item/started","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","itemType":"agentMessage"}}) + "\n")
799 sys.stdout.write(json.dumps({"method":"item/agentMessage/delta","params":{"threadId":thread_id,"turnId":turn_id,"itemId":"item_1","delta":text}}) + "\n")
800 sys.stdout.write(json.dumps({"method":"turn/completed","params":{"threadId":thread_id,"turnId":turn_id}}) + "\n")
801 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"turn": {"id": turn_id}}}) + "\n")
802 sys.stdout.flush()
803 continue
804
805 if method == "thread/archive":
806 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
807 sys.stdout.flush()
808 continue
809
810 sys.stdout.write(json.dumps({"id": rpc_id, "result": {"ok": True}}) + "\n")
811 sys.stdout.flush()
812"#;
813 let script = script.replace(
814 "__ALLOWED_RESUME_CALLS__",
815 &allowed_resume_calls.to_string(),
816 );
817 fs::write(&path, script).expect("write cli script");
818 #[cfg(unix)]
819 {
820 use std::os::unix::fs::PermissionsExt;
821
822 let mut permissions = fs::metadata(&path).expect("metadata").permissions();
823 permissions.set_mode(0o755);
824 fs::set_permissions(&path, permissions).expect("set executable");
825 }
826 path
827 }
828}