Skip to main content

a3s_code_core/
run.rs

1//! Durable run primitives for agent executions.
2//!
3//! This module is intentionally small: it records runtime events and maintains a
4//! stable run status snapshot that can be persisted by session stores.
5
6use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16    Created,
17    Planning,
18    Executing,
19    Verifying,
20    Completed,
21    Failed,
22    Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27    pub sequence: usize,
28    pub timestamp_ms: u64,
29    pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ActiveToolSnapshot {
34    pub id: String,
35    pub name: String,
36    pub started_at_ms: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RunSnapshot {
41    pub id: String,
42    pub session_id: String,
43    pub status: RunStatus,
44    pub prompt: String,
45    pub created_at_ms: u64,
46    pub updated_at_ms: u64,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub result_text: Option<String>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub error: Option<String>,
51    pub event_count: usize,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RunRecord {
56    pub snapshot: RunSnapshot,
57    pub events: Vec<RunEventRecord>,
58}
59
60impl RunSnapshot {
61    fn new(id: String, session_id: String, prompt: String) -> Self {
62        let now = now_ms();
63        Self {
64            id,
65            session_id,
66            status: RunStatus::Created,
67            prompt,
68            created_at_ms: now,
69            updated_at_ms: now,
70            result_text: None,
71            error: None,
72            event_count: 0,
73        }
74    }
75}
76
77#[derive(Debug, Default)]
78pub struct InMemoryRunStore {
79    runs: RwLock<HashMap<String, RunSnapshot>>,
80    events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
81}
82
83impl InMemoryRunStore {
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
89        let id = format!("run-{}", uuid::Uuid::new_v4());
90        let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
91        self.runs.write().await.insert(id.clone(), snapshot.clone());
92        self.events.write().await.insert(id, Vec::new());
93        snapshot
94    }
95
96    pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
97        let mut events = self.events.write().await;
98        let run_events = events.get_mut(run_id)?;
99        let sequence = run_events.len();
100        run_events.push(RunEventRecord {
101            sequence,
102            timestamp_ms: now_ms(),
103            event: event.clone(),
104        });
105        drop(events);
106
107        let mut runs = self.runs.write().await;
108        let run = runs.get_mut(run_id)?;
109        apply_event_to_snapshot(run, &event);
110        run.event_count += 1;
111        run.updated_at_ms = now_ms();
112        Some(run.clone())
113    }
114
115    pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
116        let mut runs = self.runs.write().await;
117        let run = runs.get_mut(run_id)?;
118        if run.status == RunStatus::Cancelled {
119            return Some(run.clone());
120        }
121        run.status = RunStatus::Failed;
122        run.error = Some(error.into());
123        run.updated_at_ms = now_ms();
124        Some(run.clone())
125    }
126
127    pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
128        let mut runs = self.runs.write().await;
129        let run = runs.get_mut(run_id)?;
130        run.status = RunStatus::Cancelled;
131        run.updated_at_ms = now_ms();
132        Some(run.clone())
133    }
134
135    pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
136        self.runs.read().await.get(run_id).cloned()
137    }
138
139    pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
140        self.events
141            .read()
142            .await
143            .get(run_id)
144            .cloned()
145            .unwrap_or_default()
146    }
147
148    pub async fn list(&self) -> Vec<RunSnapshot> {
149        let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
150        runs.sort_by_key(|run| run.created_at_ms);
151        runs
152    }
153
154    pub async fn records(&self) -> Vec<RunRecord> {
155        let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
156        let events = self.events.read().await;
157        let mut records = snapshots
158            .into_iter()
159            .map(|snapshot| RunRecord {
160                events: events.get(&snapshot.id).cloned().unwrap_or_default(),
161                snapshot,
162            })
163            .collect::<Vec<_>>();
164        records.sort_by_key(|record| record.snapshot.created_at_ms);
165        records
166    }
167
168    pub async fn replace_records(&self, records: Vec<RunRecord>) {
169        let mut run_map = HashMap::new();
170        let mut event_map = HashMap::new();
171
172        for mut record in records {
173            record.snapshot.event_count = record.events.len();
174            event_map.insert(record.snapshot.id.clone(), record.events);
175            run_map.insert(record.snapshot.id.clone(), record.snapshot);
176        }
177
178        *self.runs.write().await = run_map;
179        *self.events.write().await = event_map;
180    }
181}
182
183#[derive(Clone)]
184pub struct RunHandle {
185    id: String,
186    session_id: String,
187    store: Arc<InMemoryRunStore>,
188    cancel_token: Arc<Mutex<Option<CancellationToken>>>,
189    current_run_id: Arc<Mutex<Option<String>>>,
190    hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
191}
192
193impl RunHandle {
194    pub(crate) fn new(
195        id: String,
196        session_id: String,
197        store: Arc<InMemoryRunStore>,
198        cancel_token: Arc<Mutex<Option<CancellationToken>>>,
199        current_run_id: Arc<Mutex<Option<String>>>,
200        hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
201    ) -> Self {
202        Self {
203            id,
204            session_id,
205            store,
206            cancel_token,
207            current_run_id,
208            hook_executor,
209        }
210    }
211
212    pub fn id(&self) -> &str {
213        &self.id
214    }
215
216    pub fn session_id(&self) -> &str {
217        &self.session_id
218    }
219
220    pub async fn snapshot(&self) -> Option<RunSnapshot> {
221        self.store.snapshot(&self.id).await
222    }
223
224    pub async fn events(&self) -> Vec<RunEventRecord> {
225        self.store.events(&self.id).await
226    }
227
228    pub async fn status(&self) -> Option<RunStatus> {
229        self.snapshot().await.map(|snapshot| snapshot.status)
230    }
231
232    pub async fn cancel(&self) -> bool {
233        let current_run_id = self.current_run_id.lock().await.clone();
234        if current_run_id.as_deref() != Some(self.id.as_str()) {
235            return false;
236        }
237
238        let token = self.cancel_token.lock().await.clone();
239        if let Some(token) = token {
240            token.cancel();
241            let _ = self.store.mark_cancelled(&self.id).await;
242            if let Some(executor) = &self.hook_executor {
243                executor
244                    .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
245                    .await;
246            }
247            true
248        } else {
249            false
250        }
251    }
252}
253
254fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
255    match event {
256        AgentEvent::Start { prompt } => {
257            run.status = RunStatus::Executing;
258            if run.prompt.is_empty() {
259                run.prompt = prompt.clone();
260            }
261        }
262        AgentEvent::PlanningStart { .. } => {
263            run.status = RunStatus::Planning;
264        }
265        AgentEvent::StepStart { .. }
266        | AgentEvent::ToolStart { .. }
267        | AgentEvent::TurnStart { .. }
268            if !matches!(run.status, RunStatus::Planning) =>
269        {
270            run.status = RunStatus::Executing;
271        }
272        AgentEvent::End { text, .. } => {
273            if run.status == RunStatus::Cancelled {
274                return;
275            }
276            run.status = RunStatus::Completed;
277            run.result_text = Some(text.clone());
278            run.error = None;
279        }
280        AgentEvent::Error { message } => {
281            if run.status == RunStatus::Cancelled {
282                return;
283            }
284            run.status = RunStatus::Failed;
285            run.error = Some(message.clone());
286        }
287        _ => {}
288    }
289}
290
291fn now_ms() -> u64 {
292    std::time::SystemTime::now()
293        .duration_since(std::time::UNIX_EPOCH)
294        .map(|duration| duration.as_millis() as u64)
295        .unwrap_or(0)
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[tokio::test]
303    async fn run_store_tracks_status_and_events() {
304        let store = InMemoryRunStore::new();
305        let run = store.create_run("session-1", "fix tests").await;
306
307        store
308            .record_event(
309                &run.id,
310                AgentEvent::Start {
311                    prompt: "fix tests".to_string(),
312                },
313            )
314            .await;
315        store
316            .record_event(
317                &run.id,
318                AgentEvent::End {
319                    text: "done".to_string(),
320                    usage: Default::default(),
321                    verification_summary: Box::new(
322                        crate::verification::VerificationSummary::from_reports(&[]),
323                    ),
324                    meta: None,
325                },
326            )
327            .await;
328
329        let snapshot = store.snapshot(&run.id).await.unwrap();
330        assert_eq!(snapshot.status, RunStatus::Completed);
331        assert_eq!(snapshot.result_text.as_deref(), Some("done"));
332        assert_eq!(snapshot.event_count, 2);
333        assert_eq!(store.events(&run.id).await.len(), 2);
334    }
335
336    #[tokio::test]
337    async fn run_store_replaces_persisted_records() {
338        let source = InMemoryRunStore::new();
339        let run = source.create_run("session-1", "persist").await;
340        source
341            .record_event(
342                &run.id,
343                AgentEvent::Start {
344                    prompt: "persist".to_string(),
345                },
346            )
347            .await;
348
349        let target = InMemoryRunStore::new();
350        target.replace_records(source.records().await).await;
351
352        assert_eq!(target.list().await.len(), 1);
353        assert_eq!(target.events(&run.id).await.len(), 1);
354        assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
355    }
356
357    #[tokio::test]
358    async fn run_handle_only_cancels_current_run() {
359        let store = Arc::new(InMemoryRunStore::new());
360        let run = store.create_run("session-1", "fix tests").await;
361        let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
362        let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
363        let handle = RunHandle::new(
364            run.id.clone(),
365            run.session_id.clone(),
366            store.clone(),
367            cancel_token,
368            current_run_id.clone(),
369            None,
370        );
371
372        assert!(handle.cancel().await);
373        assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
374
375        *current_run_id.lock().await = Some("other-run".to_string());
376        assert!(!handle.cancel().await);
377    }
378}