atomr_agents_coding_cli_harness/
harness.rs1use std::sync::atomic::AtomicBool;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use tokio::sync::broadcast;
9use tracing::info;
10
11use atomr_agents_callable::Callable;
12use atomr_agents_coding_cli_core::{
13 CliRequest, CliResult, CliRunId, CliSessionId, CliVendorKind, CodingCliEvent,
14 CodingCliEventStream, RunMode,
15};
16use atomr_agents_coding_cli_isolator::Isolator;
17use atomr_agents_core::{CallCtx, Result as CoreResult, Value};
18use atomr_agents_observability::EventBus;
19
20use crate::dispatch::{encode_result, parse_request};
21use crate::error::{HarnessError, Result};
22use crate::headless;
23use crate::interactive;
24use crate::registry::VendorRegistry;
25use crate::session::{InteractiveSessionHandle, SessionRegistry};
26use crate::spec::CodingCliHarnessSpec;
27use crate::store::{CliRunStore, InMemoryRunStore};
28
29pub struct CodingCliHarness {
30 pub spec: CodingCliHarnessSpec,
31 pub vendors: VendorRegistry,
32 pub isolator: Arc<dyn Isolator>,
33 pub store: Arc<dyn CliRunStore>,
34 pub bus: EventBus,
35 pub(crate) event_tx: broadcast::Sender<CodingCliEvent>,
36 pub(crate) sessions: SessionRegistry,
37 cancel: Arc<AtomicBool>,
38}
39
40impl CodingCliHarness {
41 pub fn new(
42 spec: CodingCliHarnessSpec,
43 vendors: VendorRegistry,
44 isolator: Arc<dyn Isolator>,
45 store: Arc<dyn CliRunStore>,
46 ) -> Self {
47 let (event_tx, _) = broadcast::channel(spec.event_channel_capacity);
48 Self {
49 spec,
50 vendors,
51 isolator,
52 store,
53 bus: EventBus::new(),
54 event_tx,
55 sessions: SessionRegistry::new(),
56 cancel: Arc::new(AtomicBool::new(false)),
57 }
58 }
59
60 pub fn local_default() -> Self {
62 use atomr_agents_coding_cli_isolator::LocalIsolator;
63 Self::new(
64 CodingCliHarnessSpec::default(),
65 VendorRegistry::default_vendors(),
66 Arc::new(LocalIsolator::new()),
67 Arc::new(InMemoryRunStore::new()),
68 )
69 }
70
71 pub fn events(&self) -> CodingCliEventStream {
73 CodingCliEventStream::new(self.event_tx.subscribe())
74 }
75
76 pub fn event_sender(&self) -> broadcast::Sender<CodingCliEvent> {
79 self.event_tx.clone()
80 }
81
82 pub fn available_vendors(&self) -> Vec<CliVendorKind> {
84 self.vendors.kinds().cloned().collect()
85 }
86
87 pub fn sessions(&self) -> &SessionRegistry {
88 &self.sessions
89 }
90
91 pub fn cancel(&self) {
93 self.cancel.store(true, std::sync::atomic::Ordering::Relaxed);
94 }
95
96 pub async fn run(&self, mut req: CliRequest) -> Result<CliResult> {
98 validate_workdir(&req)?;
99 if req.mode != RunMode::Headless {
100 return Err(HarnessError::InvalidRequest(
101 "run() drives headless mode only; use start_interactive() for interactive runs".into(),
102 ));
103 }
104 if req.model.is_none() {
105 req.model = self.spec.default_model.clone();
106 }
107 let vendor = self
108 .vendors
109 .get(&req.vendor)
110 .ok_or_else(|| HarnessError::UnknownVendor(req.vendor.clone()))?;
111
112 let run_id = CliRunId::new();
113 info!(run_id = %run_id, vendor = %req.vendor, "headless run starting");
114
115 let mut placeholder = CliResult::new(run_id.clone(), req.vendor.clone());
118 placeholder.started_at = chrono::Utc::now();
119 self.store.put(&placeholder).await?;
120
121 let cancel = self.cancel.clone();
122 let event_tx = self.event_tx.clone();
123
124 let result = headless::run_one(run_id.clone(), vendor, self.isolator.clone(), req, event_tx, cancel).await?;
125 self.store.put(&result).await?;
126 Ok(result)
127 }
128
129 pub async fn start_interactive(&self, mut req: CliRequest) -> Result<Arc<InteractiveSessionHandle>> {
133 validate_workdir(&req)?;
134 req.mode = RunMode::Interactive;
135 if req.model.is_none() {
136 req.model = self.spec.default_model.clone();
137 }
138 if self.sessions.len() >= self.spec.max_concurrent_sessions {
139 return Err(HarnessError::InvalidRequest(format!(
140 "max_concurrent_sessions reached ({})",
141 self.spec.max_concurrent_sessions
142 )));
143 }
144 let vendor = self
145 .vendors
146 .get(&req.vendor)
147 .ok_or_else(|| HarnessError::UnknownVendor(req.vendor.clone()))?;
148 let id = CliSessionId::new();
149 let handle = interactive::start_session(id, vendor, self.isolator.clone(), req).await?;
150 self.sessions.insert(handle.clone());
151 Ok(handle)
152 }
153
154 pub async fn stop_interactive(&self, id: &CliSessionId) -> Result<()> {
156 let h = self
157 .sessions
158 .get(id)
159 .ok_or_else(|| HarnessError::SessionNotFound(id.to_string()))?;
160 let _ = h.detach().await;
161 interactive::stop_session(self.isolator.clone(), &h.tmux_session, h.request.workdir.clone()).await?;
162 self.sessions.remove(id);
163 Ok(())
164 }
165}
166
167fn validate_workdir(req: &CliRequest) -> Result<()> {
168 if !req.workdir.is_dir() {
169 return Err(HarnessError::InvalidWorkdir(req.workdir.display().to_string()));
170 }
171 Ok(())
172}
173
174#[async_trait]
175impl Callable for CodingCliHarness {
176 async fn call(&self, input: Value, _ctx: CallCtx) -> CoreResult<Value> {
177 let req = parse_request(input)?;
178 let result = self.run(req).await.map_err(atomr_agents_core::AgentError::from)?;
179 encode_result(&result)
180 }
181
182 fn label(&self) -> &str {
183 "coding-cli-harness"
184 }
185}