1mod emit;
2
3use crate::adapters::get_adapter;
4use crate::discovery::discover_first;
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::types::{RunOptions, RunResult};
8use std::sync::Arc;
9use tokio_util::sync::CancellationToken;
10
11use emit::EmitWrapper;
12
13const DEFAULT_IDLE_TIMEOUT_MS: u64 = 300_000; const DEFAULT_MAX_CONSECUTIVE_TOOL_FAILURES: u32 = 3;
15
16#[must_use = "the run will be abandoned if the handle is dropped"]
18pub struct RunHandle {
19 pub result: tokio::task::JoinHandle<Result<RunResult>>,
20 pub cancel: CancellationToken,
21}
22
23impl RunHandle {
24 pub fn abort(&self) {
26 self.cancel.cancel();
27 }
28}
29
30pub fn run(
32 opts: RunOptions,
33 on_event: Option<Arc<dyn Fn(StreamEvent) + Send + Sync>>,
34) -> RunHandle {
35 let cancel = CancellationToken::new();
36 let cancel_clone = cancel.clone();
37
38 let result = tokio::spawn(async move { run_internal(opts, on_event, cancel_clone).await });
39
40 RunHandle { result, cancel }
41}
42
43async fn run_internal(
44 opts: RunOptions,
45 on_event: Option<Arc<dyn Fn(StreamEvent) + Send + Sync>>,
46 cancel: CancellationToken,
47) -> Result<RunResult> {
48 let cli_name = match opts.cli {
50 Some(cli) => cli,
51 None => {
52 if opts.executable_path.is_some() {
53 return Err(Error::CliRequiredWithExecutable);
54 }
55 let (name, _path) = discover_first().await.ok_or(Error::NoCli)?;
56 name
57 }
58 };
59
60 let adapter = get_adapter(cli_name);
61
62 let wrapper = EmitWrapper::new(
64 on_event.clone(),
65 opts.idle_timeout_ms.unwrap_or(DEFAULT_IDLE_TIMEOUT_MS),
66 opts.total_timeout_ms,
67 opts.max_consecutive_tool_failures
68 .unwrap_or(DEFAULT_MAX_CONSECUTIVE_TOOL_FAILURES),
69 cancel.clone(),
70 );
71
72 let emit_fn = wrapper.make_emit_fn();
73
74 let result = adapter.run_boxed(&opts, &emit_fn, cancel.clone()).await;
75
76 wrapper.cleanup();
77
78 match result {
79 Ok(mut result) => {
80 if cancel.is_cancelled() {
81 result.success = false;
82 result.text = Some("Cancelled.".into());
83 }
84 if let Some(ref cb) = on_event {
85 cb(StreamEvent::Done {
86 result: result.clone(),
87 });
88 }
89 Ok(result)
90 }
91 Err(e) => {
92 if cancel.is_cancelled() {
93 let result = RunResult {
94 success: false,
95 text: Some("Cancelled.".into()),
96 ..Default::default()
97 };
98 if let Some(ref cb) = on_event {
99 cb(StreamEvent::Done {
100 result: result.clone(),
101 });
102 }
103 Ok(result)
104 } else {
105 Err(e)
106 }
107 }
108 }
109}