Skip to main content

cli_agents/runner/
mod.rs

1mod emit;
2
3use crate::adapters::get_adapter;
4use crate::discovery::discover_first;
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::types::{RunOptions, RunResult};
8use std::sync::Arc;
9use tokio_util::sync::CancellationToken;
10
11use emit::EmitWrapper;
12
13const DEFAULT_IDLE_TIMEOUT_MS: u64 = 300_000; // 5 minutes
14const DEFAULT_MAX_CONSECUTIVE_TOOL_FAILURES: u32 = 3;
15
16/// Handle returned by [`run()`] — allows awaiting the result or aborting.
17#[must_use = "the run will be abandoned if the handle is dropped"]
18pub struct RunHandle {
19    pub result: tokio::task::JoinHandle<Result<RunResult>>,
20    pub cancel: CancellationToken,
21}
22
23impl RunHandle {
24    /// Abort the running agent.
25    pub fn abort(&self) {
26        self.cancel.cancel();
27    }
28}
29
30/// Launch an agent run. Returns a [`RunHandle`] with the result future and abort handle.
31pub fn run(
32    opts: RunOptions,
33    on_event: Option<Arc<dyn Fn(StreamEvent) + Send + Sync>>,
34) -> RunHandle {
35    let cancel = CancellationToken::new();
36    let cancel_clone = cancel.clone();
37
38    let result = tokio::spawn(async move { run_internal(opts, on_event, cancel_clone).await });
39
40    RunHandle { result, cancel }
41}
42
43async fn run_internal(
44    opts: RunOptions,
45    on_event: Option<Arc<dyn Fn(StreamEvent) + Send + Sync>>,
46    cancel: CancellationToken,
47) -> Result<RunResult> {
48    // Resolve adapter
49    let cli_name = match opts.cli {
50        Some(cli) => cli,
51        None => {
52            if opts.executable_path.is_some() {
53                return Err(Error::CliRequiredWithExecutable);
54            }
55            let (name, _path) = discover_first().await.ok_or(Error::NoCli)?;
56            name
57        }
58    };
59
60    let adapter = get_adapter(cli_name);
61
62    // Wrap emit with tool failure tracking + timeouts
63    let wrapper = EmitWrapper::new(
64        on_event.clone(),
65        opts.idle_timeout_ms.unwrap_or(DEFAULT_IDLE_TIMEOUT_MS),
66        opts.total_timeout_ms,
67        opts.max_consecutive_tool_failures
68            .unwrap_or(DEFAULT_MAX_CONSECUTIVE_TOOL_FAILURES),
69        cancel.clone(),
70    );
71
72    let emit_fn = wrapper.make_emit_fn();
73
74    let result = adapter.run_boxed(&opts, &emit_fn, cancel.clone()).await;
75
76    wrapper.cleanup();
77
78    match result {
79        Ok(mut result) => {
80            if cancel.is_cancelled() {
81                result.success = false;
82                result.text = Some("Cancelled.".into());
83            }
84            if let Some(ref cb) = on_event {
85                cb(StreamEvent::Done {
86                    result: result.clone(),
87                });
88            }
89            Ok(result)
90        }
91        Err(e) => {
92            if cancel.is_cancelled() {
93                let result = RunResult {
94                    success: false,
95                    text: Some("Cancelled.".into()),
96                    ..Default::default()
97                };
98                if let Some(ref cb) = on_event {
99                    cb(StreamEvent::Done {
100                        result: result.clone(),
101                    });
102                }
103                Ok(result)
104            } else {
105                Err(e)
106            }
107        }
108    }
109}