1use crate::agent::AgentEvent;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{Mutex, RwLock};
11use tokio_util::sync::CancellationToken;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RunStatus {
16 Created,
17 Planning,
18 Executing,
19 Verifying,
20 Completed,
21 Failed,
22 Cancelled,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RunEventRecord {
27 pub sequence: usize,
28 pub timestamp_ms: u64,
29 pub event: AgentEvent,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct RunSnapshot {
34 pub id: String,
35 pub session_id: String,
36 pub status: RunStatus,
37 pub prompt: String,
38 pub created_at_ms: u64,
39 pub updated_at_ms: u64,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub result_text: Option<String>,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub error: Option<String>,
44 pub event_count: usize,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct RunRecord {
49 pub snapshot: RunSnapshot,
50 pub events: Vec<RunEventRecord>,
51}
52
53impl RunSnapshot {
54 fn new(id: String, session_id: String, prompt: String) -> Self {
55 let now = now_ms();
56 Self {
57 id,
58 session_id,
59 status: RunStatus::Created,
60 prompt,
61 created_at_ms: now,
62 updated_at_ms: now,
63 result_text: None,
64 error: None,
65 event_count: 0,
66 }
67 }
68}
69
70#[derive(Debug, Default)]
71pub struct InMemoryRunStore {
72 runs: RwLock<HashMap<String, RunSnapshot>>,
73 events: RwLock<HashMap<String, Vec<RunEventRecord>>>,
74}
75
76impl InMemoryRunStore {
77 pub fn new() -> Self {
78 Self::default()
79 }
80
81 pub async fn create_run(&self, session_id: &str, prompt: &str) -> RunSnapshot {
82 let id = format!("run-{}", uuid::Uuid::new_v4());
83 let snapshot = RunSnapshot::new(id.clone(), session_id.to_string(), prompt.to_string());
84 self.runs.write().await.insert(id.clone(), snapshot.clone());
85 self.events.write().await.insert(id, Vec::new());
86 snapshot
87 }
88
89 pub async fn record_event(&self, run_id: &str, event: AgentEvent) -> Option<RunSnapshot> {
90 let mut events = self.events.write().await;
91 let run_events = events.get_mut(run_id)?;
92 let sequence = run_events.len();
93 run_events.push(RunEventRecord {
94 sequence,
95 timestamp_ms: now_ms(),
96 event: event.clone(),
97 });
98 drop(events);
99
100 let mut runs = self.runs.write().await;
101 let run = runs.get_mut(run_id)?;
102 apply_event_to_snapshot(run, &event);
103 run.event_count += 1;
104 run.updated_at_ms = now_ms();
105 Some(run.clone())
106 }
107
108 pub async fn mark_failed(&self, run_id: &str, error: impl Into<String>) -> Option<RunSnapshot> {
109 let mut runs = self.runs.write().await;
110 let run = runs.get_mut(run_id)?;
111 if run.status == RunStatus::Cancelled {
112 return Some(run.clone());
113 }
114 run.status = RunStatus::Failed;
115 run.error = Some(error.into());
116 run.updated_at_ms = now_ms();
117 Some(run.clone())
118 }
119
120 pub async fn mark_cancelled(&self, run_id: &str) -> Option<RunSnapshot> {
121 let mut runs = self.runs.write().await;
122 let run = runs.get_mut(run_id)?;
123 run.status = RunStatus::Cancelled;
124 run.updated_at_ms = now_ms();
125 Some(run.clone())
126 }
127
128 pub async fn snapshot(&self, run_id: &str) -> Option<RunSnapshot> {
129 self.runs.read().await.get(run_id).cloned()
130 }
131
132 pub async fn events(&self, run_id: &str) -> Vec<RunEventRecord> {
133 self.events
134 .read()
135 .await
136 .get(run_id)
137 .cloned()
138 .unwrap_or_default()
139 }
140
141 pub async fn list(&self) -> Vec<RunSnapshot> {
142 let mut runs = self.runs.read().await.values().cloned().collect::<Vec<_>>();
143 runs.sort_by_key(|run| run.created_at_ms);
144 runs
145 }
146
147 pub async fn records(&self) -> Vec<RunRecord> {
148 let snapshots = self.runs.read().await.values().cloned().collect::<Vec<_>>();
149 let events = self.events.read().await;
150 let mut records = snapshots
151 .into_iter()
152 .map(|snapshot| RunRecord {
153 events: events.get(&snapshot.id).cloned().unwrap_or_default(),
154 snapshot,
155 })
156 .collect::<Vec<_>>();
157 records.sort_by_key(|record| record.snapshot.created_at_ms);
158 records
159 }
160
161 pub async fn replace_records(&self, records: Vec<RunRecord>) {
162 let mut run_map = HashMap::new();
163 let mut event_map = HashMap::new();
164
165 for mut record in records {
166 record.snapshot.event_count = record.events.len();
167 event_map.insert(record.snapshot.id.clone(), record.events);
168 run_map.insert(record.snapshot.id.clone(), record.snapshot);
169 }
170
171 *self.runs.write().await = run_map;
172 *self.events.write().await = event_map;
173 }
174}
175
176#[derive(Clone)]
177pub struct RunHandle {
178 id: String,
179 session_id: String,
180 store: Arc<InMemoryRunStore>,
181 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
182 current_run_id: Arc<Mutex<Option<String>>>,
183 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
184}
185
186impl RunHandle {
187 pub(crate) fn new(
188 id: String,
189 session_id: String,
190 store: Arc<InMemoryRunStore>,
191 cancel_token: Arc<Mutex<Option<CancellationToken>>>,
192 current_run_id: Arc<Mutex<Option<String>>>,
193 hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
194 ) -> Self {
195 Self {
196 id,
197 session_id,
198 store,
199 cancel_token,
200 current_run_id,
201 hook_executor,
202 }
203 }
204
205 pub fn id(&self) -> &str {
206 &self.id
207 }
208
209 pub fn session_id(&self) -> &str {
210 &self.session_id
211 }
212
213 pub async fn snapshot(&self) -> Option<RunSnapshot> {
214 self.store.snapshot(&self.id).await
215 }
216
217 pub async fn events(&self) -> Vec<RunEventRecord> {
218 self.store.events(&self.id).await
219 }
220
221 pub async fn status(&self) -> Option<RunStatus> {
222 self.snapshot().await.map(|snapshot| snapshot.status)
223 }
224
225 pub async fn cancel(&self) -> bool {
226 let current_run_id = self.current_run_id.lock().await.clone();
227 if current_run_id.as_deref() != Some(self.id.as_str()) {
228 return false;
229 }
230
231 let token = self.cancel_token.lock().await.clone();
232 if let Some(token) = token {
233 token.cancel();
234 let _ = self.store.mark_cancelled(&self.id).await;
235 if let Some(executor) = &self.hook_executor {
236 executor
237 .record_run_cancelled(&self.id, &self.session_id, Some("cancelled by host"))
238 .await;
239 }
240 true
241 } else {
242 false
243 }
244 }
245}
246
247fn apply_event_to_snapshot(run: &mut RunSnapshot, event: &AgentEvent) {
248 match event {
249 AgentEvent::Start { prompt } => {
250 run.status = RunStatus::Executing;
251 if run.prompt.is_empty() {
252 run.prompt = prompt.clone();
253 }
254 }
255 AgentEvent::PlanningStart { .. } => {
256 run.status = RunStatus::Planning;
257 }
258 AgentEvent::StepStart { .. }
259 | AgentEvent::ToolStart { .. }
260 | AgentEvent::TurnStart { .. }
261 if !matches!(run.status, RunStatus::Planning) =>
262 {
263 run.status = RunStatus::Executing;
264 }
265 AgentEvent::End { text, .. } => {
266 if run.status == RunStatus::Cancelled {
267 return;
268 }
269 run.status = RunStatus::Completed;
270 run.result_text = Some(text.clone());
271 run.error = None;
272 }
273 AgentEvent::Error { message } => {
274 if run.status == RunStatus::Cancelled {
275 return;
276 }
277 run.status = RunStatus::Failed;
278 run.error = Some(message.clone());
279 }
280 _ => {}
281 }
282}
283
284fn now_ms() -> u64 {
285 std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .map(|duration| duration.as_millis() as u64)
288 .unwrap_or(0)
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[tokio::test]
296 async fn run_store_tracks_status_and_events() {
297 let store = InMemoryRunStore::new();
298 let run = store.create_run("session-1", "fix tests").await;
299
300 store
301 .record_event(
302 &run.id,
303 AgentEvent::Start {
304 prompt: "fix tests".to_string(),
305 },
306 )
307 .await;
308 store
309 .record_event(
310 &run.id,
311 AgentEvent::End {
312 text: "done".to_string(),
313 usage: Default::default(),
314 verification_summary: Box::new(
315 crate::verification::VerificationSummary::from_reports(&[]),
316 ),
317 meta: None,
318 },
319 )
320 .await;
321
322 let snapshot = store.snapshot(&run.id).await.unwrap();
323 assert_eq!(snapshot.status, RunStatus::Completed);
324 assert_eq!(snapshot.result_text.as_deref(), Some("done"));
325 assert_eq!(snapshot.event_count, 2);
326 assert_eq!(store.events(&run.id).await.len(), 2);
327 }
328
329 #[tokio::test]
330 async fn run_store_replaces_persisted_records() {
331 let source = InMemoryRunStore::new();
332 let run = source.create_run("session-1", "persist").await;
333 source
334 .record_event(
335 &run.id,
336 AgentEvent::Start {
337 prompt: "persist".to_string(),
338 },
339 )
340 .await;
341
342 let target = InMemoryRunStore::new();
343 target.replace_records(source.records().await).await;
344
345 assert_eq!(target.list().await.len(), 1);
346 assert_eq!(target.events(&run.id).await.len(), 1);
347 assert_eq!(target.snapshot(&run.id).await.unwrap().event_count, 1);
348 }
349
350 #[tokio::test]
351 async fn run_handle_only_cancels_current_run() {
352 let store = Arc::new(InMemoryRunStore::new());
353 let run = store.create_run("session-1", "fix tests").await;
354 let cancel_token = Arc::new(Mutex::new(Some(CancellationToken::new())));
355 let current_run_id = Arc::new(Mutex::new(Some(run.id.clone())));
356 let handle = RunHandle::new(
357 run.id.clone(),
358 run.session_id.clone(),
359 store.clone(),
360 cancel_token,
361 current_run_id.clone(),
362 None,
363 );
364
365 assert!(handle.cancel().await);
366 assert_eq!(handle.status().await, Some(RunStatus::Cancelled));
367
368 *current_run_id.lock().await = Some("other-run".to_string());
369 assert!(!handle.cancel().await);
370 }
371}