1use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16 Created,
17 Planning,
18 Executing,
19 Verifying,
20 Completed,
21 Failed,
22 Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27 pub sequence: usize,
28 pub timestamp_ms: u64,
29 pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ActiveToolSnapshot {
34 pub id: String,
35 pub name: String,
36 pub started_at_ms: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RunSnapshot {
41 pub id: String,
42 pub session_id: String,
43 pub status: RunStatus,
44 pub prompt: String,
45 pub created_at_ms: u64,
46 pub updated_at_ms: u64,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub result_text: Option<String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub error: Option<String>,
51 pub event_count: usize,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RunRecord {
56 pub snapshot: RunSnapshot,
57 pub events: Vec<RunEventRecord>,
58}
59
60impl RunSnapshot {
61 fn new(id: String, session_id: String, prompt: String) -> Self {
62 let now = now_ms();
63 Self {
64 id,
65 session_id,
66 status: RunStatus::Created,
67 prompt,
68 created_at_ms: now,
69 updated_at_ms: now,
70 result_text: None,
71 error: None,
72 event_count: 0,
73 }
74 }
75}
76
77#[derive(Debug, Default)]
78pub struct InMemoryRunStore {
79 runs: RwLock<HashMap<String, RunSnapshot>>,
80 events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
81 insertion_order: RwLock<VecDeque<String>>,
84 max_runs: Option<usize>,
87 max_events_per_run: Option<usize>,
92}
93
94impl InMemoryRunStore {
95 pub fn new() -> Self {
96 Self::default()
97 }
98
99 pub fn with_retention(max_runs: Option<usize>, max_events_per_run: Option<usize>) -> Self {
102 Self {
103 runs: RwLock::new(HashMap::new()),
104 events: RwLock::new(HashMap::new()),
105 insertion_order: RwLock::new(VecDeque::new()),
106 max_runs,
107 max_events_per_run,
108 }
109 }
110
111 pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
112 let id = format!("run-{}", uuid::Uuid::new_v4());
116 self.create_run_with_id(id, session_id, prompt).await
117 }
118
119 pub async fn create_run_with_id(
123 &self,
124 id: String,
125 session_id: &str,
126 prompt: &str,
127 ) -> RunSnapshot {
128 let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
129 {
138 let mut order = self.insertion_order.write().await;
139 let mut events = self.events.write().await;
140 let mut runs = self.runs.write().await;
141 runs.insert(id.clone(), snapshot.clone());
142 events.insert(id.clone(), Vec::new());
143 order.push_back(id);
144 if let Some(cap) = self.max_runs {
145 while order.len() > cap {
146 if let Some(victim) = order.pop_front() {
147 runs.remove(&victim);
148 events.remove(&victim);
149 }
150 }
151 }
152 }
153 snapshot
154 }
155
156 pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
157 let mut events = self.events.write().await;
158 let run_events = events.get_mut(run_id)?;
159 let sequence = run_events.len();
160 run_events.push(RunEventRecord {
161 sequence,
162 timestamp_ms: now_ms(),
163 event: event.clone(),
164 });
165 if let Some(cap) = self.max_events_per_run {
167 if run_events.len() > cap {
168 let excess = run_events.len() - cap;
169 run_events.drain(..excess);
170 }
171 }
172 drop(events);
173
174 let mut runs = self.runs.write().await;
175 let run = runs.get_mut(run_id)?;
176 apply_event_to_snapshot(run, &event);
177 run.event_count += 1;
178 run.updated_at_ms = now_ms();
179 Some(run.clone())
180 }
181
182 pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
183 let mut runs = self.runs.write().await;
184 let run = runs.get_mut(run_id)?;
185 if run.status == RunStatus::Cancelled {
186 return Some(run.clone());
187 }
188 run.status = RunStatus::Failed;
189 run.error = Some(error.into());
190 run.updated_at_ms = now_ms();
191 Some(run.clone())
192 }
193
194 pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
195 let mut runs = self.runs.write().await;
196 let run = runs.get_mut(run_id)?;
197 run.status = RunStatus::Cancelled;
198 run.updated_at_ms = now_ms();
199 Some(run.clone())
200 }
201
202 pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
203 self.runs.read().await.get(run_id).cloned()
204 }
205
206 pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
207 self.events
208 .read()
209 .await
210 .get(run_id)
211 .cloned()
212 .unwrap_or_default()
213 }
214
215 pub async fn list(&self) -> Vec<RunSnapshot> {
216 let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
217 runs.sort_by_key(|run| run.created_at_ms);
218 runs
219 }
220
221 pub async fn records(&self) -> Vec<RunRecord> {
222 let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
223 let events = self.events.read().await;
224 let mut records = snapshots
225 .into_iter()
226 .map(|snapshot| RunRecord {
227 events: events.get(&snapshot.id).cloned().unwrap_or_default(),
228 snapshot,
229 })
230 .collect::<Vec<_>>();
231 records.sort_by_key(|record| record.snapshot.created_at_ms);
232 records
233 }
234
235 pub async fn replace_records(&self, records: Vec<RunRecord>) {
236 let mut sorted = records;
240 sorted.sort_by_key(|r| r.snapshot.created_at_ms);
241 let mut run_map = HashMap::new();
242 let mut event_map = HashMap::new();
243 let mut order = VecDeque::with_capacity(sorted.len());
244 for record in sorted {
245 let id = record.snapshot.id.clone();
246 event_map.insert(id.clone(), record.events);
254 run_map.insert(id.clone(), record.snapshot);
255 order.push_back(id);
256 }
257 *self.runs.write().await = run_map;
258 *self.events.write().await = event_map;
259 *self.insertion_order.write().await = order;
260 }
261}
262
263#[cfg(test)]
264mod retention_tests {
265 use super::*;
266
267 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
268 async fn concurrent_create_and_record_under_cap_does_not_deadlock() {
269 let store = std::sync::Arc::new(InMemoryRunStore::with_retention(Some(10), None));
273 let mut handles = Vec::new();
274 for i in 0..100 {
275 let s = std::sync::Arc::clone(&store);
276 handles.push(tokio::spawn(async move {
277 let r = s.create_run("sess", &format!("p{i}")).await;
278 for _ in 0..5 {
279 s.record_event(
280 &r.id,
281 AgentEvent::TextDelta {
282 text: "x".to_string(),
283 },
284 )
285 .await;
286 }
287 }));
288 }
289 for h in handles {
290 h.await.unwrap();
291 }
292 assert!(store.list().await.len() <= 10);
295 }
296
297 #[tokio::test]
298 async fn replace_records_preserves_cumulative_event_count_after_trim() {
299 let src = InMemoryRunStore::with_retention(None, Some(3));
301 let run = src.create_run("s", "p").await;
302 for _ in 0..10 {
303 src.record_event(
304 &run.id,
305 AgentEvent::TextDelta {
306 text: "x".to_string(),
307 },
308 )
309 .await;
310 }
311 let records = src.records().await;
312 assert_eq!(records.len(), 1);
314 assert_eq!(records[0].events.len(), 3, "buffer trimmed to cap");
315 assert_eq!(records[0].snapshot.event_count, 10, "cumulative preserved");
316
317 let dst = InMemoryRunStore::new();
319 dst.replace_records(records).await;
320 let restored = dst.snapshot(&run.id).await.unwrap();
321 assert_eq!(
322 restored.event_count, 10,
323 "replace_records must NOT reset event_count to the trimmed buffer length"
324 );
325 assert_eq!(dst.events(&run.id).await.len(), 3);
327 }
328
329 #[tokio::test]
330 async fn max_runs_evicts_oldest() {
331 let store = InMemoryRunStore::with_retention(Some(2), None);
332 let _ = store.create_run("session-1", "prompt-1").await;
333 let r2 = store.create_run("session-1", "prompt-2").await;
334 let r3 = store.create_run("session-1", "prompt-3").await;
335
336 assert_eq!(store.list().await.len(), 2);
338 let ids: Vec<String> = store.list().await.into_iter().map(|r| r.id).collect();
339 assert!(ids.contains(&r2.id));
340 assert!(ids.contains(&r3.id));
341 assert!(store.events(&r2.id).await.is_empty());
342 let surviving_event_count: usize =
344 store.events(&r2.id).await.len() + store.events(&r3.id).await.len();
345 assert_eq!(surviving_event_count, 0);
346 }
347
348 #[tokio::test]
349 async fn max_events_per_run_caps_event_buffer() {
350 let store = InMemoryRunStore::with_retention(None, Some(3));
351 let run = store.create_run("session-1", "prompt").await;
352 for _ in 0..10 {
353 store
354 .record_event(
355 &run.id,
356 AgentEvent::TextDelta {
357 text: "x".to_string(),
358 },
359 )
360 .await;
361 }
362 let events = store.events(&run.id).await;
363 assert_eq!(
364 events.len(),
365 3,
366 "buffer must be capped at max_events_per_run"
367 );
368 let snap = store.snapshot(&run.id).await.unwrap();
371 assert_eq!(snap.event_count, 10);
372 }
373
374 #[tokio::test]
375 async fn unlimited_retention_is_the_default() {
376 let store = InMemoryRunStore::new();
377 for i in 0..50 {
378 let r = store.create_run("s", &format!("p{i}")).await;
379 for _ in 0..20 {
380 store
381 .record_event(
382 &r.id,
383 AgentEvent::TextDelta {
384 text: "y".to_string(),
385 },
386 )
387 .await;
388 }
389 }
390 assert_eq!(store.list().await.len(), 50);
391 }
392}
393
394#[derive(Clone)]
395pub struct RunHandle {
396 id: String,
397 session_id: String,
398 store: Arc<InMemoryRunStore>,
399 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
400 current_run_id: Arc<Mutex<Option<String>>>,
401 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
402}
403
404impl RunHandle {
405 pub(crate) fn new(
406 id: String,
407 session_id: String,
408 store: Arc<InMemoryRunStore>,
409 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
410 current_run_id: Arc<Mutex<Option<String>>>,
411 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
412 ) -> Self {
413 Self {
414 id,
415 session_id,
416 store,
417 cancel_token,
418 current_run_id,
419 hook_executor,
420 }
421 }
422
423 pub fn id(&self) -> &str {
424 &self.id
425 }
426
427 pub fn session_id(&self) -> &str {
428 &self.session_id
429 }
430
431 pub async fn snapshot(&self) -> Option<RunSnapshot> {
432 self.store.snapshot(&self.id).await
433 }
434
435 pub async fn events(&self) -> Vec<RunEventRecord> {
436 self.store.events(&self.id).await
437 }
438
439 pub async fn status(&self) -> Option<RunStatus> {
440 self.snapshot().await.map(|snapshot| snapshot.status)
441 }
442
443 pub async fn cancel(&self) -> bool {
444 let current_run_id = self.current_run_id.lock().await.clone();
445 if current_run_id.as_deref() != Some(self.id.as_str()) {
446 return false;
447 }
448
449 let token = self.cancel_token.lock().await.clone();
450 if let Some(token) = token {
451 token.cancel();
452 let _ = self.store.mark_cancelled(&self.id).await;
453 if let Some(executor) = &self.hook_executor {
454 executor
455 .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
456 .await;
457 }
458 true
459 } else {
460 false
461 }
462 }
463}
464
465fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
466 match event {
467 AgentEvent::Start { prompt } => {
468 run.status = RunStatus::Executing;
469 if run.prompt.is_empty() {
470 run.prompt = prompt.clone();
471 }
472 }
473 AgentEvent::PlanningStart { .. } => {
474 run.status = RunStatus::Planning;
475 }
476 AgentEvent::StepStart { .. }
477 | AgentEvent::ToolStart { .. }
478 | AgentEvent::TurnStart { .. }
479 if !matches!(run.status, RunStatus::Planning) =>
480 {
481 run.status = RunStatus::Executing;
482 }
483 AgentEvent::End { text, .. } => {
484 if run.status == RunStatus::Cancelled {
485 return;
486 }
487 run.status = RunStatus::Completed;
488 run.result_text = Some(text.clone());
489 run.error = None;
490 }
491 AgentEvent::Error { message } => {
492 if run.status == RunStatus::Cancelled {
493 return;
494 }
495 run.status = RunStatus::Failed;
496 run.error = Some(message.clone());
497 }
498 _ => {}
499 }
500}
501
502fn now_ms() -> u64 {
503 std::time::SystemTime::now()
504 .duration_since(std::time::UNIX_EPOCH)
505 .map(|duration| duration.as_millis() as u64)
506 .unwrap_or(0)
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[tokio::test]
514 async fn run_store_tracks_status_and_events() {
515 let store = InMemoryRunStore::new();
516 let run = store.create_run("session-1", "fix tests").await;
517
518 store
519 .record_event(
520 &run.id,
521 AgentEvent::Start {
522 prompt: "fix tests".to_string(),
523 },
524 )
525 .await;
526 store
527 .record_event(
528 &run.id,
529 AgentEvent::End {
530 text: "done".to_string(),
531 usage: Default::default(),
532 verification_summary: Box::new(
533 crate::verification::VerificationSummary::from_reports(&[]),
534 ),
535 meta: None,
536 },
537 )
538 .await;
539
540 let snapshot = store.snapshot(&run.id).await.unwrap();
541 assert_eq!(snapshot.status, RunStatus::Completed);
542 assert_eq!(snapshot.result_text.as_deref(), Some("done"));
543 assert_eq!(snapshot.event_count, 2);
544 assert_eq!(store.events(&run.id).await.len(), 2);
545 }
546
547 #[tokio::test]
548 async fn run_store_replaces_persisted_records() {
549 let source = InMemoryRunStore::new();
550 let run = source.create_run("session-1", "persist").await;
551 source
552 .record_event(
553 &run.id,
554 AgentEvent::Start {
555 prompt: "persist".to_string(),
556 },
557 )
558 .await;
559
560 let target = InMemoryRunStore::new();
561 target.replace_records(source.records().await).await;
562
563 assert_eq!(target.list().await.len(), 1);
564 assert_eq!(target.events(&run.id).await.len(), 1);
565 assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
566 }
567
568 #[tokio::test]
569 async fn run_handle_only_cancels_current_run() {
570 let store = Arc::new(InMemoryRunStore::new());
571 let run = store.create_run("session-1", "fix tests").await;
572 let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
573 let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
574 let handle = RunHandle::new(
575 run.id.clone(),
576 run.session_id.clone(),
577 store.clone(),
578 cancel_token,
579 current_run_id.clone(),
580 None,
581 );
582
583 assert!(handle.cancel().await);
584 assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
585
586 *current_run_id.lock().await = Some("other-run".to_string());
587 assert!(!handle.cancel().await);
588 }
589}