Skip to main content

a3s_code_core/
run.rs

1//! Durable run primitives for agent executions.
2//!
3//! This module is intentionally small: it records runtime events and maintains a
4//! stable run status snapshot that can be persisted by session stores.
5
6use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16    Created,
17    Planning,
18    Executing,
19    Verifying,
20    Completed,
21    Failed,
22    Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27    pub sequence: usize,
28    pub timestamp_ms: u64,
29    pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct RunSnapshot {
34    pub id: String,
35    pub session_id: String,
36    pub status: RunStatus,
37    pub prompt: String,
38    pub created_at_ms: u64,
39    pub updated_at_ms: u64,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub result_text: Option<String>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub error: Option<String>,
44    pub event_count: usize,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct RunRecord {
49    pub snapshot: RunSnapshot,
50    pub events: Vec<RunEventRecord>,
51}
52
53impl RunSnapshot {
54    fn new(id: String, session_id: String, prompt: String) -> Self {
55        let now = now_ms();
56        Self {
57            id,
58            session_id,
59            status: RunStatus::Created,
60            prompt,
61            created_at_ms: now,
62            updated_at_ms: now,
63            result_text: None,
64            error: None,
65            event_count: 0,
66        }
67    }
68}
69
70#[derive(Debug, Default)]
71pub struct InMemoryRunStore {
72    runs: RwLock<HashMap<String, RunSnapshot>>,
73    events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
74}
75
76impl InMemoryRunStore {
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
82        let id = format!("run-{}", uuid::Uuid::new_v4());
83        let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
84        self.runs.write().await.insert(id.clone(), snapshot.clone());
85        self.events.write().await.insert(id, Vec::new());
86        snapshot
87    }
88
89    pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
90        let mut events = self.events.write().await;
91        let run_events = events.get_mut(run_id)?;
92        let sequence = run_events.len();
93        run_events.push(RunEventRecord {
94            sequence,
95            timestamp_ms: now_ms(),
96            event: event.clone(),
97        });
98        drop(events);
99
100        let mut runs = self.runs.write().await;
101        let run = runs.get_mut(run_id)?;
102        apply_event_to_snapshot(run, &event);
103        run.event_count += 1;
104        run.updated_at_ms = now_ms();
105        Some(run.clone())
106    }
107
108    pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
109        let mut runs = self.runs.write().await;
110        let run = runs.get_mut(run_id)?;
111        if run.status == RunStatus::Cancelled {
112            return Some(run.clone());
113        }
114        run.status = RunStatus::Failed;
115        run.error = Some(error.into());
116        run.updated_at_ms = now_ms();
117        Some(run.clone())
118    }
119
120    pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
121        let mut runs = self.runs.write().await;
122        let run = runs.get_mut(run_id)?;
123        run.status = RunStatus::Cancelled;
124        run.updated_at_ms = now_ms();
125        Some(run.clone())
126    }
127
128    pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
129        self.runs.read().await.get(run_id).cloned()
130    }
131
132    pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
133        self.events
134            .read()
135            .await
136            .get(run_id)
137            .cloned()
138            .unwrap_or_default()
139    }
140
141    pub async fn list(&self) -> Vec<RunSnapshot> {
142        let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
143        runs.sort_by_key(|run| run.created_at_ms);
144        runs
145    }
146
147    pub async fn records(&self) -> Vec<RunRecord> {
148        let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
149        let events = self.events.read().await;
150        let mut records = snapshots
151            .into_iter()
152            .map(|snapshot| RunRecord {
153                events: events.get(&snapshot.id).cloned().unwrap_or_default(),
154                snapshot,
155            })
156            .collect::<Vec<_>>();
157        records.sort_by_key(|record| record.snapshot.created_at_ms);
158        records
159    }
160
161    pub async fn replace_records(&self, records: Vec<RunRecord>) {
162        let mut run_map = HashMap::new();
163        let mut event_map = HashMap::new();
164
165        for mut record in records {
166            record.snapshot.event_count = record.events.len();
167            event_map.insert(record.snapshot.id.clone(), record.events);
168            run_map.insert(record.snapshot.id.clone(), record.snapshot);
169        }
170
171        *self.runs.write().await = run_map;
172        *self.events.write().await = event_map;
173    }
174}
175
176#[derive(Clone)]
177pub struct RunHandle {
178    id: String,
179    session_id: String,
180    store: Arc<InMemoryRunStore>,
181    cancel_token: Arc<Mutex<Option<CancellationToken>>>,
182    current_run_id: Arc<Mutex<Option<String>>>,
183    hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
184}
185
186impl RunHandle {
187    pub(crate) fn new(
188        id: String,
189        session_id: String,
190        store: Arc<InMemoryRunStore>,
191        cancel_token: Arc<Mutex<Option<CancellationToken>>>,
192        current_run_id: Arc<Mutex<Option<String>>>,
193        hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
194    ) -> Self {
195        Self {
196            id,
197            session_id,
198            store,
199            cancel_token,
200            current_run_id,
201            hook_executor,
202        }
203    }
204
205    pub fn id(&self) -> &str {
206        &self.id
207    }
208
209    pub fn session_id(&self) -> &str {
210        &self.session_id
211    }
212
213    pub async fn snapshot(&self) -> Option<RunSnapshot> {
214        self.store.snapshot(&self.id).await
215    }
216
217    pub async fn events(&self) -> Vec<RunEventRecord> {
218        self.store.events(&self.id).await
219    }
220
221    pub async fn status(&self) -> Option<RunStatus> {
222        self.snapshot().await.map(|snapshot| snapshot.status)
223    }
224
225    pub async fn cancel(&self) -> bool {
226        let current_run_id = self.current_run_id.lock().await.clone();
227        if current_run_id.as_deref() != Some(self.id.as_str()) {
228            return false;
229        }
230
231        let token = self.cancel_token.lock().await.clone();
232        if let Some(token) = token {
233            token.cancel();
234            let _ = self.store.mark_cancelled(&self.id).await;
235            if let Some(executor) = &self.hook_executor {
236                executor
237                    .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
238                    .await;
239            }
240            true
241        } else {
242            false
243        }
244    }
245}
246
247fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
248    match event {
249        AgentEvent::Start { prompt } => {
250            run.status = RunStatus::Executing;
251            if run.prompt.is_empty() {
252                run.prompt = prompt.clone();
253            }
254        }
255        AgentEvent::PlanningStart { .. } => {
256            run.status = RunStatus::Planning;
257        }
258        AgentEvent::StepStart { .. }
259        | AgentEvent::ToolStart { .. }
260        | AgentEvent::TurnStart { .. }
261            if !matches!(run.status, RunStatus::Planning) =>
262        {
263            run.status = RunStatus::Executing;
264        }
265        AgentEvent::End { text, .. } => {
266            if run.status == RunStatus::Cancelled {
267                return;
268            }
269            run.status = RunStatus::Completed;
270            run.result_text = Some(text.clone());
271            run.error = None;
272        }
273        AgentEvent::Error { message } => {
274            if run.status == RunStatus::Cancelled {
275                return;
276            }
277            run.status = RunStatus::Failed;
278            run.error = Some(message.clone());
279        }
280        _ => {}
281    }
282}
283
284fn now_ms() -> u64 {
285    std::time::SystemTime::now()
286        .duration_since(std::time::UNIX_EPOCH)
287        .map(|duration| duration.as_millis() as u64)
288        .unwrap_or(0)
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[tokio::test]
296    async fn run_store_tracks_status_and_events() {
297        let store = InMemoryRunStore::new();
298        let run = store.create_run("session-1", "fix tests").await;
299
300        store
301            .record_event(
302                &run.id,
303                AgentEvent::Start {
304                    prompt: "fix tests".to_string(),
305                },
306            )
307            .await;
308        store
309            .record_event(
310                &run.id,
311                AgentEvent::End {
312                    text: "done".to_string(),
313                    usage: Default::default(),
314                    verification_summary: Box::new(
315                        crate::verification::VerificationSummary::from_reports(&[]),
316                    ),
317                    meta: None,
318                },
319            )
320            .await;
321
322        let snapshot = store.snapshot(&run.id).await.unwrap();
323        assert_eq!(snapshot.status, RunStatus::Completed);
324        assert_eq!(snapshot.result_text.as_deref(), Some("done"));
325        assert_eq!(snapshot.event_count, 2);
326        assert_eq!(store.events(&run.id).await.len(), 2);
327    }
328
329    #[tokio::test]
330    async fn run_store_replaces_persisted_records() {
331        let source = InMemoryRunStore::new();
332        let run = source.create_run("session-1", "persist").await;
333        source
334            .record_event(
335                &run.id,
336                AgentEvent::Start {
337                    prompt: "persist".to_string(),
338                },
339            )
340            .await;
341
342        let target = InMemoryRunStore::new();
343        target.replace_records(source.records().await).await;
344
345        assert_eq!(target.list().await.len(), 1);
346        assert_eq!(target.events(&run.id).await.len(), 1);
347        assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
348    }
349
350    #[tokio::test]
351    async fn run_handle_only_cancels_current_run() {
352        let store = Arc::new(InMemoryRunStore::new());
353        let run = store.create_run("session-1", "fix tests").await;
354        let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
355        let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
356        let handle = RunHandle::new(
357            run.id.clone(),
358            run.session_id.clone(),
359            store.clone(),
360            cancel_token,
361            current_run_id.clone(),
362            None,
363        );
364
365        assert!(handle.cancel().await);
366        assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
367
368        *current_run_id.lock().await = Some("other-run".to_string());
369        assert!(!handle.cancel().await);
370    }
371}