Skip to main content

atomr_agents_coding_cli_harness/
harness.rs

1//! The harness — composes a `VendorRegistry`, an `Isolator`, and a
2//! `CliRunStore` behind a single async surface plus a `Callable` impl.
3
4use std::sync::atomic::AtomicBool;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use tokio::sync::broadcast;
9use tracing::info;
10
11use atomr_agents_callable::Callable;
12use atomr_agents_coding_cli_core::{
13    CliRequest, CliResult, CliRunId, CliSessionId, CliVendorKind, CodingCliEvent,
14    CodingCliEventStream, RunMode,
15};
16use atomr_agents_coding_cli_isolator::Isolator;
17use atomr_agents_core::{CallCtx, Result as CoreResult, Value};
18use atomr_agents_observability::EventBus;
19
20use crate::dispatch::{encode_result, parse_request};
21use crate::error::{HarnessError, Result};
22use crate::headless;
23use crate::interactive;
24use crate::registry::VendorRegistry;
25use crate::session::{InteractiveSessionHandle, SessionRegistry};
26use crate::spec::CodingCliHarnessSpec;
27use crate::store::{CliRunStore, InMemoryRunStore};
28
29pub struct CodingCliHarness {
30    pub spec: CodingCliHarnessSpec,
31    pub vendors: VendorRegistry,
32    pub isolator: Arc<dyn Isolator>,
33    pub store: Arc<dyn CliRunStore>,
34    pub bus: EventBus,
35    pub(crate) event_tx: broadcast::Sender<CodingCliEvent>,
36    pub(crate) sessions: SessionRegistry,
37    cancel: Arc<AtomicBool>,
38}
39
40impl CodingCliHarness {
41    pub fn new(
42        spec: CodingCliHarnessSpec,
43        vendors: VendorRegistry,
44        isolator: Arc<dyn Isolator>,
45        store: Arc<dyn CliRunStore>,
46    ) -> Self {
47        let (event_tx, _) = broadcast::channel(spec.event_channel_capacity);
48        Self {
49            spec,
50            vendors,
51            isolator,
52            store,
53            bus: EventBus::new(),
54            event_tx,
55            sessions: SessionRegistry::new(),
56            cancel: Arc::new(AtomicBool::new(false)),
57        }
58    }
59
60    /// Shortcut: in-memory store + default vendors + local isolator.
61    pub fn local_default() -> Self {
62        use atomr_agents_coding_cli_isolator::LocalIsolator;
63        Self::new(
64            CodingCliHarnessSpec::default(),
65            VendorRegistry::default_vendors(),
66            Arc::new(LocalIsolator::new()),
67            Arc::new(InMemoryRunStore::new()),
68        )
69    }
70
71    /// Subscribe to the normalized event stream.
72    pub fn events(&self) -> CodingCliEventStream {
73        CodingCliEventStream::new(self.event_tx.subscribe())
74    }
75
76    /// Clone the broadcast sender — the web companion does this so its
77    /// SSE handler can subscribe.
78    pub fn event_sender(&self) -> broadcast::Sender<CodingCliEvent> {
79        self.event_tx.clone()
80    }
81
82    /// List vendor kinds wired into this harness.
83    pub fn available_vendors(&self) -> Vec<CliVendorKind> {
84        self.vendors.kinds().cloned().collect()
85    }
86
87    pub fn sessions(&self) -> &SessionRegistry {
88        &self.sessions
89    }
90
91    /// Cancel any in-flight headless run (cooperative).
92    pub fn cancel(&self) {
93        self.cancel.store(true, std::sync::atomic::Ordering::Relaxed);
94    }
95
96    /// Drive one request to completion.
97    pub async fn run(&self, mut req: CliRequest) -> Result<CliResult> {
98        validate_workdir(&req)?;
99        if req.mode != RunMode::Headless {
100            return Err(HarnessError::InvalidRequest(
101                "run() drives headless mode only; use start_interactive() for interactive runs".into(),
102            ));
103        }
104        if req.model.is_none() {
105            req.model = self.spec.default_model.clone();
106        }
107        let vendor = self
108            .vendors
109            .get(&req.vendor)
110            .ok_or_else(|| HarnessError::UnknownVendor(req.vendor.clone()))?;
111
112        let run_id = CliRunId::new();
113        info!(run_id = %run_id, vendor = %req.vendor, "headless run starting");
114
115        // Persist a placeholder result so the store has an entry as
116        // soon as the run is known (UI can poll).
117        let mut placeholder = CliResult::new(run_id.clone(), req.vendor.clone());
118        placeholder.started_at = chrono::Utc::now();
119        self.store.put(&placeholder).await?;
120
121        let cancel = self.cancel.clone();
122        let event_tx = self.event_tx.clone();
123
124        let result = headless::run_one(run_id.clone(), vendor, self.isolator.clone(), req, event_tx, cancel).await?;
125        self.store.put(&result).await?;
126        Ok(result)
127    }
128
129    /// Spawn an interactive session and register it. Returns the new
130    /// `CliSessionId`; clients should connect to
131    /// `WS /api/cli/sessions/{id}/io` (in the web companion) to drive it.
132    pub async fn start_interactive(&self, mut req: CliRequest) -> Result<Arc<InteractiveSessionHandle>> {
133        validate_workdir(&req)?;
134        req.mode = RunMode::Interactive;
135        if req.model.is_none() {
136            req.model = self.spec.default_model.clone();
137        }
138        if self.sessions.len() >= self.spec.max_concurrent_sessions {
139            return Err(HarnessError::InvalidRequest(format!(
140                "max_concurrent_sessions reached ({})",
141                self.spec.max_concurrent_sessions
142            )));
143        }
144        let vendor = self
145            .vendors
146            .get(&req.vendor)
147            .ok_or_else(|| HarnessError::UnknownVendor(req.vendor.clone()))?;
148        let id = CliSessionId::new();
149        let handle = interactive::start_session(id, vendor, self.isolator.clone(), req).await?;
150        self.sessions.insert(handle.clone());
151        Ok(handle)
152    }
153
154    /// Stop an interactive session and remove it from the registry.
155    pub async fn stop_interactive(&self, id: &CliSessionId) -> Result<()> {
156        let h = self
157            .sessions
158            .get(id)
159            .ok_or_else(|| HarnessError::SessionNotFound(id.to_string()))?;
160        let _ = h.detach().await;
161        interactive::stop_session(self.isolator.clone(), &h.tmux_session, h.request.workdir.clone()).await?;
162        self.sessions.remove(id);
163        Ok(())
164    }
165}
166
167fn validate_workdir(req: &CliRequest) -> Result<()> {
168    if !req.workdir.is_dir() {
169        return Err(HarnessError::InvalidWorkdir(req.workdir.display().to_string()));
170    }
171    Ok(())
172}
173
174#[async_trait]
175impl Callable for CodingCliHarness {
176    async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
177        let req = parse_request(input)?;
178        let result = self.run(req).await.map_err(atomr_agents_core::AgentError::from)?;
179        encode_result(&result)
180    }
181
182    fn label(&self) -> &str {
183        "coding-cli-harness"
184    }
185}