Skip to main content

a3s_code_core/
run.rs

1//! Durable run primitives for agent executions.
2//!
3//! This module is intentionally small: it records runtime events and maintains a
4//! stable run status snapshot that can be persisted by session stores.
5
6use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16    Created,
17    Planning,
18    Executing,
19    Verifying,
20    Completed,
21    Failed,
22    Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27    pub sequence: usize,
28    pub timestamp_ms: u64,
29    pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ActiveToolSnapshot {
34    pub id: String,
35    pub name: String,
36    pub started_at_ms: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RunSnapshot {
41    pub id: String,
42    pub session_id: String,
43    pub status: RunStatus,
44    pub prompt: String,
45    pub created_at_ms: u64,
46    pub updated_at_ms: u64,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub result_text: Option<String>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub error: Option<String>,
51    pub event_count: usize,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RunRecord {
56    pub snapshot: RunSnapshot,
57    pub events: Vec<RunEventRecord>,
58}
59
60impl RunSnapshot {
61    fn new(id: String, session_id: String, prompt: String) -> Self {
62        let now = now_ms();
63        Self {
64            id,
65            session_id,
66            status: RunStatus::Created,
67            prompt,
68            created_at_ms: now,
69            updated_at_ms: now,
70            result_text: None,
71            error: None,
72            event_count: 0,
73        }
74    }
75}
76
77#[derive(Debug, Default)]
78pub struct InMemoryRunStore {
79    runs: RwLock<HashMap<String, RunSnapshot>>,
80    events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
81    /// Insertion order of run ids — used to FIFO-evict the oldest run
82    /// when `max_runs` is set and exceeded.
83    insertion_order: RwLock<VecDeque<String>>,
84    /// Maximum number of runs retained. When exceeded, oldest run is
85    /// dropped along with its events. `None` = unlimited (default).
86    max_runs: Option<usize>,
87    /// Maximum number of events retained per run. When exceeded, the
88    /// oldest events are FIFO-dropped from that run's buffer. The
89    /// run's `event_count` field is **not** decremented — it stays as
90    /// the cumulative total ever recorded. `None` = unlimited.
91    max_events_per_run: Option<usize>,
92}
93
94impl InMemoryRunStore {
95    pub fn new() -> Self {
96        Self::default()
97    }
98
99    /// Construct a store with optional FIFO retention caps. `None`
100    /// fields keep the unbounded default.
101    pub fn with_retention(max_runs: Option<usize>, max_events_per_run: Option<usize>) -> Self {
102        Self {
103            runs: RwLock::new(HashMap::new()),
104            events: RwLock::new(HashMap::new()),
105            insertion_order: RwLock::new(VecDeque::new()),
106            max_runs,
107            max_events_per_run,
108        }
109    }
110
111    pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
112        // Default ID generation when the caller has no host_env handy.
113        // Production callers reach `create_run_with_id` via
114        // `RunControlState::start_run` so the host's IdGenerator is honored.
115        let id = format!("run-{}", uuid::Uuid::new_v4());
116        self.create_run_with_id(id, session_id, prompt).await
117    }
118
119    /// Create a run with a caller-supplied id. Used by the session
120    /// orchestration layer so the parent session's host-provided
121    /// [`IdGenerator`](crate::host_env::IdGenerator) governs run ids.
122    pub async fn create_run_with_id(
123        &self,
124        id: String,
125        session_id: &str,
126        prompt: &str,
127    ) -> RunSnapshot {
128        let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
129        // Hold all three structures together for the insert + FIFO-evict so
130        // `runs`, `events`, and `insertion_order` never diverge under
131        // concurrent access (previously the maps were locked separately,
132        // leaving a window where a run existed in one map but not the
133        // other). Canonical acquisition order: order -> events -> runs.
134        // Other methods (record_event, records, mark_*) only ever hold ONE
135        // of {events, runs} at a time — they never nest — so holding both
136        // here cannot ABBA-deadlock against them.
137        {
138            let mut order = self.insertion_order.write().await;
139            let mut events = self.events.write().await;
140            let mut runs = self.runs.write().await;
141            runs.insert(id.clone(), snapshot.clone());
142            events.insert(id.clone(), Vec::new());
143            order.push_back(id);
144            if let Some(cap) = self.max_runs {
145                while order.len() > cap {
146                    if let Some(victim) = order.pop_front() {
147                        runs.remove(&victim);
148                        events.remove(&victim);
149                    }
150                }
151            }
152        }
153        snapshot
154    }
155
156    pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
157        let mut events = self.events.write().await;
158        let run_events = events.get_mut(run_id)?;
159        let sequence = run_events.len();
160        run_events.push(RunEventRecord {
161            sequence,
162            timestamp_ms: now_ms(),
163            event: event.clone(),
164        });
165        // FIFO-trim event buffer past per-run cap.
166        if let Some(cap) = self.max_events_per_run {
167            if run_events.len() > cap {
168                let excess = run_events.len() - cap;
169                run_events.drain(..excess);
170            }
171        }
172        drop(events);
173
174        let mut runs = self.runs.write().await;
175        let run = runs.get_mut(run_id)?;
176        apply_event_to_snapshot(run, &event);
177        run.event_count += 1;
178        run.updated_at_ms = now_ms();
179        Some(run.clone())
180    }
181
182    pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
183        let mut runs = self.runs.write().await;
184        let run = runs.get_mut(run_id)?;
185        if run.status == RunStatus::Cancelled {
186            return Some(run.clone());
187        }
188        run.status = RunStatus::Failed;
189        run.error = Some(error.into());
190        run.updated_at_ms = now_ms();
191        Some(run.clone())
192    }
193
194    pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
195        let mut runs = self.runs.write().await;
196        let run = runs.get_mut(run_id)?;
197        run.status = RunStatus::Cancelled;
198        run.updated_at_ms = now_ms();
199        Some(run.clone())
200    }
201
202    pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
203        self.runs.read().await.get(run_id).cloned()
204    }
205
206    pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
207        self.events
208            .read()
209            .await
210            .get(run_id)
211            .cloned()
212            .unwrap_or_default()
213    }
214
215    pub async fn list(&self) -> Vec<RunSnapshot> {
216        let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
217        runs.sort_by_key(|run| run.created_at_ms);
218        runs
219    }
220
221    pub async fn records(&self) -> Vec<RunRecord> {
222        let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
223        let events = self.events.read().await;
224        let mut records = snapshots
225            .into_iter()
226            .map(|snapshot| RunRecord {
227                events: events.get(&snapshot.id).cloned().unwrap_or_default(),
228                snapshot,
229            })
230            .collect::<Vec<_>>();
231        records.sort_by_key(|record| record.snapshot.created_at_ms);
232        records
233    }
234
235    pub async fn replace_records(&self, records: Vec<RunRecord>) {
236        // Preserve creation-order in the FIFO eviction queue so a
237        // restored session honours its `max_runs` cap consistently
238        // with newly-created runs.
239        let mut sorted = records;
240        sorted.sort_by_key(|r| r.snapshot.created_at_ms);
241        let mut run_map = HashMap::new();
242        let mut event_map = HashMap::new();
243        let mut order = VecDeque::with_capacity(sorted.len());
244        for record in sorted {
245            let id = record.snapshot.id.clone();
246            // Trust the persisted `event_count` — it is the CUMULATIVE total
247            // ever recorded and is deliberately not decremented when the
248            // per-run event buffer is FIFO-trimmed by `max_events_per_run`.
249            // Overwriting it with `record.events.len()` here would corrupt
250            // the cumulative count for any restored run whose buffer was
251            // trimmed (restoring a 100-event run with a 50-cap buffer as
252            // event_count=50).
253            event_map.insert(id.clone(), record.events);
254            run_map.insert(id.clone(), record.snapshot);
255            order.push_back(id);
256        }
257        *self.runs.write().await = run_map;
258        *self.events.write().await = event_map;
259        *self.insertion_order.write().await = order;
260    }
261}
262
263#[cfg(test)]
264mod retention_tests {
265    use super::*;
266
267    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
268    async fn concurrent_create_and_record_under_cap_does_not_deadlock() {
269        // Guards the canonical lock-ordering change in create_run_with_id
270        // (order -> events -> runs held together). A bad ordering would
271        // ABBA-deadlock against concurrent record_event and hang this test.
272        let store = std::sync::Arc::new(InMemoryRunStore::with_retention(Some(10), None));
273        let mut handles = Vec::new();
274        for i in 0..100 {
275            let s = std::sync::Arc::clone(&store);
276            handles.push(tokio::spawn(async move {
277                let r = s.create_run("sess", &format!("p{i}")).await;
278                for _ in 0..5 {
279                    s.record_event(
280                        &r.id,
281                        AgentEvent::TextDelta {
282                            text: "x".to_string(),
283                        },
284                    )
285                    .await;
286                }
287            }));
288        }
289        for h in handles {
290            h.await.unwrap();
291        }
292        // Cap honored under concurrent load, and the store is still usable
293        // (no deadlock, no poisoned locks).
294        assert!(store.list().await.len() <= 10);
295    }
296
297    #[tokio::test]
298    async fn replace_records_preserves_cumulative_event_count_after_trim() {
299        // Source store with a small per-run event cap.
300        let src = InMemoryRunStore::with_retention(None, Some(3));
301        let run = src.create_run("s", "p").await;
302        for _ in 0..10 {
303            src.record_event(
304                &run.id,
305                AgentEvent::TextDelta {
306                    text: "x".to_string(),
307                },
308            )
309            .await;
310        }
311        let records = src.records().await;
312        // Buffer trimmed to cap, but cumulative event_count is the total.
313        assert_eq!(records.len(), 1);
314        assert_eq!(records[0].events.len(), 3, "buffer trimmed to cap");
315        assert_eq!(records[0].snapshot.event_count, 10, "cumulative preserved");
316
317        // Round-trip into a fresh store via replace_records.
318        let dst = InMemoryRunStore::new();
319        dst.replace_records(records).await;
320        let restored = dst.snapshot(&run.id).await.unwrap();
321        assert_eq!(
322            restored.event_count, 10,
323            "replace_records must NOT reset event_count to the trimmed buffer length"
324        );
325        // The (trimmed) event buffer still round-trips at cap size.
326        assert_eq!(dst.events(&run.id).await.len(), 3);
327    }
328
329    #[tokio::test]
330    async fn max_runs_evicts_oldest() {
331        let store = InMemoryRunStore::with_retention(Some(2), None);
332        let _ = store.create_run("session-1", "prompt-1").await;
333        let r2 = store.create_run("session-1", "prompt-2").await;
334        let r3 = store.create_run("session-1", "prompt-3").await;
335
336        // Oldest run (prompt-1) must have been evicted.
337        assert_eq!(store.list().await.len(), 2);
338        let ids: Vec<String> = store.list().await.into_iter().map(|r| r.id).collect();
339        assert!(ids.contains(&r2.id));
340        assert!(ids.contains(&r3.id));
341        assert!(store.events(&r2.id).await.is_empty());
342        // The evicted run's events are gone too.
343        let surviving_event_count: usize =
344            store.events(&r2.id).await.len() + store.events(&r3.id).await.len();
345        assert_eq!(surviving_event_count, 0);
346    }
347
348    #[tokio::test]
349    async fn max_events_per_run_caps_event_buffer() {
350        let store = InMemoryRunStore::with_retention(None, Some(3));
351        let run = store.create_run("session-1", "prompt").await;
352        for _ in 0..10 {
353            store
354                .record_event(
355                    &run.id,
356                    AgentEvent::TextDelta {
357                        text: "x".to_string(),
358                    },
359                )
360                .await;
361        }
362        let events = store.events(&run.id).await;
363        assert_eq!(
364            events.len(),
365            3,
366            "buffer must be capped at max_events_per_run"
367        );
368        // Snapshot `event_count` reflects the cumulative total, not the
369        // surviving buffer length.
370        let snap = store.snapshot(&run.id).await.unwrap();
371        assert_eq!(snap.event_count, 10);
372    }
373
374    #[tokio::test]
375    async fn unlimited_retention_is_the_default() {
376        let store = InMemoryRunStore::new();
377        for i in 0..50 {
378            let r = store.create_run("s", &format!("p{i}")).await;
379            for _ in 0..20 {
380                store
381                    .record_event(
382                        &r.id,
383                        AgentEvent::TextDelta {
384                            text: "y".to_string(),
385                        },
386                    )
387                    .await;
388            }
389        }
390        assert_eq!(store.list().await.len(), 50);
391    }
392}
393
394#[derive(Clone)]
395pub struct RunHandle {
396    id: String,
397    session_id: String,
398    store: Arc<InMemoryRunStore>,
399    cancel_token: Arc<Mutex<Option<CancellationToken>>>,
400    current_run_id: Arc<Mutex<Option<String>>>,
401    hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
402}
403
404impl RunHandle {
405    pub(crate) fn new(
406        id: String,
407        session_id: String,
408        store: Arc<InMemoryRunStore>,
409        cancel_token: Arc<Mutex<Option<CancellationToken>>>,
410        current_run_id: Arc<Mutex<Option<String>>>,
411        hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
412    ) -> Self {
413        Self {
414            id,
415            session_id,
416            store,
417            cancel_token,
418            current_run_id,
419            hook_executor,
420        }
421    }
422
423    pub fn id(&self) -> &str {
424        &self.id
425    }
426
427    pub fn session_id(&self) -> &str {
428        &self.session_id
429    }
430
431    pub async fn snapshot(&self) -> Option<RunSnapshot> {
432        self.store.snapshot(&self.id).await
433    }
434
435    pub async fn events(&self) -> Vec<RunEventRecord> {
436        self.store.events(&self.id).await
437    }
438
439    pub async fn status(&self) -> Option<RunStatus> {
440        self.snapshot().await.map(|snapshot| snapshot.status)
441    }
442
443    pub async fn cancel(&self) -> bool {
444        let current_run_id = self.current_run_id.lock().await.clone();
445        if current_run_id.as_deref() != Some(self.id.as_str()) {
446            return false;
447        }
448
449        let token = self.cancel_token.lock().await.clone();
450        if let Some(token) = token {
451            token.cancel();
452            let _ = self.store.mark_cancelled(&self.id).await;
453            if let Some(executor) = &self.hook_executor {
454                executor
455                    .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
456                    .await;
457            }
458            true
459        } else {
460            false
461        }
462    }
463}
464
465fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
466    match event {
467        AgentEvent::Start { prompt } => {
468            run.status = RunStatus::Executing;
469            if run.prompt.is_empty() {
470                run.prompt = prompt.clone();
471            }
472        }
473        AgentEvent::PlanningStart { .. } => {
474            run.status = RunStatus::Planning;
475        }
476        AgentEvent::StepStart { .. }
477        | AgentEvent::ToolStart { .. }
478        | AgentEvent::TurnStart { .. }
479            if !matches!(run.status, RunStatus::Planning) =>
480        {
481            run.status = RunStatus::Executing;
482        }
483        AgentEvent::End { text, .. } => {
484            if run.status == RunStatus::Cancelled {
485                return;
486            }
487            run.status = RunStatus::Completed;
488            run.result_text = Some(text.clone());
489            run.error = None;
490        }
491        AgentEvent::Error { message } => {
492            if run.status == RunStatus::Cancelled {
493                return;
494            }
495            run.status = RunStatus::Failed;
496            run.error = Some(message.clone());
497        }
498        _ => {}
499    }
500}
501
502fn now_ms() -> u64 {
503    std::time::SystemTime::now()
504        .duration_since(std::time::UNIX_EPOCH)
505        .map(|duration| duration.as_millis() as u64)
506        .unwrap_or(0)
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    #[tokio::test]
514    async fn run_store_tracks_status_and_events() {
515        let store = InMemoryRunStore::new();
516        let run = store.create_run("session-1", "fix tests").await;
517
518        store
519            .record_event(
520                &run.id,
521                AgentEvent::Start {
522                    prompt: "fix tests".to_string(),
523                },
524            )
525            .await;
526        store
527            .record_event(
528                &run.id,
529                AgentEvent::End {
530                    text: "done".to_string(),
531                    usage: Default::default(),
532                    verification_summary: Box::new(
533                        crate::verification::VerificationSummary::from_reports(&[]),
534                    ),
535                    meta: None,
536                },
537            )
538            .await;
539
540        let snapshot = store.snapshot(&run.id).await.unwrap();
541        assert_eq!(snapshot.status, RunStatus::Completed);
542        assert_eq!(snapshot.result_text.as_deref(), Some("done"));
543        assert_eq!(snapshot.event_count, 2);
544        assert_eq!(store.events(&run.id).await.len(), 2);
545    }
546
547    #[tokio::test]
548    async fn run_store_replaces_persisted_records() {
549        let source = InMemoryRunStore::new();
550        let run = source.create_run("session-1", "persist").await;
551        source
552            .record_event(
553                &run.id,
554                AgentEvent::Start {
555                    prompt: "persist".to_string(),
556                },
557            )
558            .await;
559
560        let target = InMemoryRunStore::new();
561        target.replace_records(source.records().await).await;
562
563        assert_eq!(target.list().await.len(), 1);
564        assert_eq!(target.events(&run.id).await.len(), 1);
565        assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
566    }
567
568    #[tokio::test]
569    async fn run_handle_only_cancels_current_run() {
570        let store = Arc::new(InMemoryRunStore::new());
571        let run = store.create_run("session-1", "fix tests").await;
572        let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
573        let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
574        let handle = RunHandle::new(
575            run.id.clone(),
576            run.session_id.clone(),
577            store.clone(),
578            cancel_token,
579            current_run_id.clone(),
580            None,
581        );
582
583        assert!(handle.cancel().await);
584        assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
585
586        *current_run_id.lock().await = Some("other-run".to_string());
587        assert!(!handle.cancel().await);
588    }
589}