1use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16 Created,
17 Planning,
18 Executing,
19 Verifying,
20 Completed,
21 Failed,
22 Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27 pub sequence: usize,
28 pub timestamp_ms: u64,
29 pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ActiveToolSnapshot {
34 pub id: String,
35 pub name: String,
36 pub started_at_ms: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RunSnapshot {
41 pub id: String,
42 pub session_id: String,
43 pub status: RunStatus,
44 pub prompt: String,
45 pub created_at_ms: u64,
46 pub updated_at_ms: u64,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub result_text: Option<String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub error: Option<String>,
51 pub event_count: usize,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RunRecord {
56 pub snapshot: RunSnapshot,
57 pub events: Vec<RunEventRecord>,
58}
59
60impl RunSnapshot {
61 fn new(id: String, session_id: String, prompt: String) -> Self {
62 let now = now_ms();
63 Self {
64 id,
65 session_id,
66 status: RunStatus::Created,
67 prompt,
68 created_at_ms: now,
69 updated_at_ms: now,
70 result_text: None,
71 error: None,
72 event_count: 0,
73 }
74 }
75}
76
77#[derive(Debug, Default)]
78pub struct InMemoryRunStore {
79 runs: RwLock<HashMap<String, RunSnapshot>>,
80 events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
81}
82
83impl InMemoryRunStore {
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
89 let id = format!("run-{}", uuid::Uuid::new_v4());
90 let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
91 self.runs.write().await.insert(id.clone(), snapshot.clone());
92 self.events.write().await.insert(id, Vec::new());
93 snapshot
94 }
95
96 pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
97 let mut events = self.events.write().await;
98 let run_events = events.get_mut(run_id)?;
99 let sequence = run_events.len();
100 run_events.push(RunEventRecord {
101 sequence,
102 timestamp_ms: now_ms(),
103 event: event.clone(),
104 });
105 drop(events);
106
107 let mut runs = self.runs.write().await;
108 let run = runs.get_mut(run_id)?;
109 apply_event_to_snapshot(run, &event);
110 run.event_count += 1;
111 run.updated_at_ms = now_ms();
112 Some(run.clone())
113 }
114
115 pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
116 let mut runs = self.runs.write().await;
117 let run = runs.get_mut(run_id)?;
118 if run.status == RunStatus::Cancelled {
119 return Some(run.clone());
120 }
121 run.status = RunStatus::Failed;
122 run.error = Some(error.into());
123 run.updated_at_ms = now_ms();
124 Some(run.clone())
125 }
126
127 pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
128 let mut runs = self.runs.write().await;
129 let run = runs.get_mut(run_id)?;
130 run.status = RunStatus::Cancelled;
131 run.updated_at_ms = now_ms();
132 Some(run.clone())
133 }
134
135 pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
136 self.runs.read().await.get(run_id).cloned()
137 }
138
139 pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
140 self.events
141 .read()
142 .await
143 .get(run_id)
144 .cloned()
145 .unwrap_or_default()
146 }
147
148 pub async fn list(&self) -> Vec<RunSnapshot> {
149 let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
150 runs.sort_by_key(|run| run.created_at_ms);
151 runs
152 }
153
154 pub async fn records(&self) -> Vec<RunRecord> {
155 let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
156 let events = self.events.read().await;
157 let mut records = snapshots
158 .into_iter()
159 .map(|snapshot| RunRecord {
160 events: events.get(&snapshot.id).cloned().unwrap_or_default(),
161 snapshot,
162 })
163 .collect::<Vec<_>>();
164 records.sort_by_key(|record| record.snapshot.created_at_ms);
165 records
166 }
167
168 pub async fn replace_records(&self, records: Vec<RunRecord>) {
169 let mut run_map = HashMap::new();
170 let mut event_map = HashMap::new();
171
172 for mut record in records {
173 record.snapshot.event_count = record.events.len();
174 event_map.insert(record.snapshot.id.clone(), record.events);
175 run_map.insert(record.snapshot.id.clone(), record.snapshot);
176 }
177
178 *self.runs.write().await = run_map;
179 *self.events.write().await = event_map;
180 }
181}
182
183#[derive(Clone)]
184pub struct RunHandle {
185 id: String,
186 session_id: String,
187 store: Arc<InMemoryRunStore>,
188 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
189 current_run_id: Arc<Mutex<Option<String>>>,
190 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
191}
192
193impl RunHandle {
194 pub(crate) fn new(
195 id: String,
196 session_id: String,
197 store: Arc<InMemoryRunStore>,
198 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
199 current_run_id: Arc<Mutex<Option<String>>>,
200 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
201 ) -> Self {
202 Self {
203 id,
204 session_id,
205 store,
206 cancel_token,
207 current_run_id,
208 hook_executor,
209 }
210 }
211
212 pub fn id(&self) -> &str {
213 &self.id
214 }
215
216 pub fn session_id(&self) -> &str {
217 &self.session_id
218 }
219
220 pub async fn snapshot(&self) -> Option<RunSnapshot> {
221 self.store.snapshot(&self.id).await
222 }
223
224 pub async fn events(&self) -> Vec<RunEventRecord> {
225 self.store.events(&self.id).await
226 }
227
228 pub async fn status(&self) -> Option<RunStatus> {
229 self.snapshot().await.map(|snapshot| snapshot.status)
230 }
231
232 pub async fn cancel(&self) -> bool {
233 let current_run_id = self.current_run_id.lock().await.clone();
234 if current_run_id.as_deref() != Some(self.id.as_str()) {
235 return false;
236 }
237
238 let token = self.cancel_token.lock().await.clone();
239 if let Some(token) = token {
240 token.cancel();
241 let _ = self.store.mark_cancelled(&self.id).await;
242 if let Some(executor) = &self.hook_executor {
243 executor
244 .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
245 .await;
246 }
247 true
248 } else {
249 false
250 }
251 }
252}
253
254fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
255 match event {
256 AgentEvent::Start { prompt } => {
257 run.status = RunStatus::Executing;
258 if run.prompt.is_empty() {
259 run.prompt = prompt.clone();
260 }
261 }
262 AgentEvent::PlanningStart { .. } => {
263 run.status = RunStatus::Planning;
264 }
265 AgentEvent::StepStart { .. }
266 | AgentEvent::ToolStart { .. }
267 | AgentEvent::TurnStart { .. }
268 if !matches!(run.status, RunStatus::Planning) =>
269 {
270 run.status = RunStatus::Executing;
271 }
272 AgentEvent::End { text, .. } => {
273 if run.status == RunStatus::Cancelled {
274 return;
275 }
276 run.status = RunStatus::Completed;
277 run.result_text = Some(text.clone());
278 run.error = None;
279 }
280 AgentEvent::Error { message } => {
281 if run.status == RunStatus::Cancelled {
282 return;
283 }
284 run.status = RunStatus::Failed;
285 run.error = Some(message.clone());
286 }
287 _ => {}
288 }
289}
290
291fn now_ms() -> u64 {
292 std::time::SystemTime::now()
293 .duration_since(std::time::UNIX_EPOCH)
294 .map(|duration| duration.as_millis() as u64)
295 .unwrap_or(0)
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[tokio::test]
303 async fn run_store_tracks_status_and_events() {
304 let store = InMemoryRunStore::new();
305 let run = store.create_run("session-1", "fix tests").await;
306
307 store
308 .record_event(
309 &run.id,
310 AgentEvent::Start {
311 prompt: "fix tests".to_string(),
312 },
313 )
314 .await;
315 store
316 .record_event(
317 &run.id,
318 AgentEvent::End {
319 text: "done".to_string(),
320 usage: Default::default(),
321 verification_summary: Box::new(
322 crate::verification::VerificationSummary::from_reports(&[]),
323 ),
324 meta: None,
325 },
326 )
327 .await;
328
329 let snapshot = store.snapshot(&run.id).await.unwrap();
330 assert_eq!(snapshot.status, RunStatus::Completed);
331 assert_eq!(snapshot.result_text.as_deref(), Some("done"));
332 assert_eq!(snapshot.event_count, 2);
333 assert_eq!(store.events(&run.id).await.len(), 2);
334 }
335
336 #[tokio::test]
337 async fn run_store_replaces_persisted_records() {
338 let source = InMemoryRunStore::new();
339 let run = source.create_run("session-1", "persist").await;
340 source
341 .record_event(
342 &run.id,
343 AgentEvent::Start {
344 prompt: "persist".to_string(),
345 },
346 )
347 .await;
348
349 let target = InMemoryRunStore::new();
350 target.replace_records(source.records().await).await;
351
352 assert_eq!(target.list().await.len(), 1);
353 assert_eq!(target.events(&run.id).await.len(), 1);
354 assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
355 }
356
357 #[tokio::test]
358 async fn run_handle_only_cancels_current_run() {
359 let store = Arc::new(InMemoryRunStore::new());
360 let run = store.create_run("session-1", "fix tests").await;
361 let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
362 let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
363 let handle = RunHandle::new(
364 run.id.clone(),
365 run.session_id.clone(),
366 store.clone(),
367 cancel_token,
368 current_run_id.clone(),
369 None,
370 );
371
372 assert!(handle.cancel().await);
373 assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
374
375 *current_run_id.lock().await = Some("other-run".to_string());
376 assert!(!handle.cancel().await);
377 }
378}