1use anyhow::Result;
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6
7use crate::commands::swarm::events::{AgentEvent, EventKind};
8
9pub fn insert_event(conn: &Connection, event: &AgentEvent) -> Result<i64> {
10 let (kind, success, duration_ms, tool_name, file_path, dependency_id, reason, data) =
11 match &event.event {
12 EventKind::Spawned => ("spawned", None, None, None, None, None, None, None),
13 EventKind::Started => ("started", None, None, None, None, None, None, None),
14 EventKind::Completed {
15 success,
16 duration_ms,
17 } => (
18 "completed",
19 Some(*success as i32),
20 Some(*duration_ms as i64),
21 None,
22 None,
23 None,
24 None,
25 None,
26 ),
27 EventKind::Failed { reason } => (
28 "failed",
29 Some(0),
30 None,
31 None,
32 None,
33 None,
34 Some(reason.as_str()),
35 None,
36 ),
37 EventKind::ToolCall {
38 tool,
39 input_summary,
40 } => (
41 "tool_call",
42 None,
43 None,
44 Some(tool.as_str()),
45 None,
46 None,
47 None,
48 input_summary
49 .as_ref()
50 .map(|s| serde_json::json!({"input_summary": s}).to_string()),
51 ),
52 EventKind::ToolResult {
53 tool,
54 success,
55 duration_ms,
56 } => (
57 "tool_result",
58 Some(*success as i32),
59 duration_ms.map(|d| d as i64),
60 Some(tool.as_str()),
61 None,
62 None,
63 None,
64 None,
65 ),
66 EventKind::FileRead { path } => (
67 "file_read",
68 None,
69 None,
70 None,
71 Some(path.as_str()),
72 None,
73 None,
74 None,
75 ),
76 EventKind::FileWrite {
77 path,
78 lines_changed,
79 } => (
80 "file_write",
81 None,
82 None,
83 None,
84 Some(path.as_str()),
85 None,
86 None,
87 lines_changed.map(|l| serde_json::json!({"lines_changed": l}).to_string()),
88 ),
89 EventKind::DependencyMet { dependency_id } => (
90 "dependency_met",
91 None,
92 None,
93 None,
94 None,
95 Some(dependency_id.as_str()),
96 None,
97 None,
98 ),
99 EventKind::Unblocked { by_task_id } => (
100 "unblocked",
101 None,
102 None,
103 None,
104 None,
105 Some(by_task_id.as_str()),
106 None,
107 None,
108 ),
109 EventKind::Output { line } => (
110 "output",
111 None,
112 None,
113 None,
114 None,
115 None,
116 None,
117 Some(serde_json::json!({"line": line}).to_string()),
118 ),
119 EventKind::WaveStarted {
120 wave_number,
121 task_count,
122 } => (
123 "wave_started",
124 None,
125 None,
126 None,
127 None,
128 None,
129 None,
130 Some(
131 serde_json::json!({"wave_number": wave_number, "task_count": task_count})
132 .to_string(),
133 ),
134 ),
135 EventKind::WaveCompleted {
136 wave_number,
137 duration_ms,
138 } => (
139 "wave_completed",
140 None,
141 Some(*duration_ms as i64),
142 None,
143 None,
144 None,
145 None,
146 Some(serde_json::json!({"wave_number": wave_number}).to_string()),
147 ),
148 EventKind::ValidationPassed => (
149 "validation_passed",
150 Some(1),
151 None,
152 None,
153 None,
154 None,
155 None,
156 None,
157 ),
158 EventKind::ValidationFailed { failures } => (
159 "validation_failed",
160 Some(0),
161 None,
162 None,
163 None,
164 None,
165 None,
166 Some(serde_json::json!({"failures": failures}).to_string()),
167 ),
168 EventKind::RepairStarted { attempt, task_ids } => (
169 "repair_started",
170 None,
171 None,
172 None,
173 None,
174 None,
175 None,
176 Some(serde_json::json!({"attempt": attempt, "task_ids": task_ids}).to_string()),
177 ),
178 EventKind::RepairCompleted { attempt, success } => (
179 "repair_completed",
180 Some(*success as i32),
181 None,
182 None,
183 None,
184 None,
185 None,
186 Some(serde_json::json!({"attempt": attempt}).to_string()),
187 ),
188 EventKind::Heartbeat => ("heartbeat", None, None, None, None, None, None, None),
189 EventKind::Custom { name, data } => (
190 "custom",
191 None,
192 None,
193 None,
194 None,
195 None,
196 Some(name.as_str()),
197 data.as_ref().map(|d| d.to_string()),
198 ),
199 };
200
201 conn.execute(
202 "INSERT INTO events (timestamp, session_id, task_id, kind, success, duration_ms,
203 tool_name, file_path, dependency_id, reason, data)
204 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
205 params![
206 event.timestamp.to_rfc3339(),
207 event.session_id,
208 event.task_id,
209 kind,
210 success,
211 duration_ms,
212 tool_name,
213 file_path,
214 dependency_id,
215 reason,
216 data,
217 ],
218 )?;
219 Ok(conn.last_insert_rowid())
220}
221
222pub fn get_events_for_session(conn: &Connection, session_id: &str) -> Result<Vec<AgentEvent>> {
223 let mut stmt = conn.prepare(
224 "SELECT timestamp, session_id, task_id, kind, success, duration_ms,
225 tool_name, file_path, dependency_id, reason, data
226 FROM events WHERE session_id = ? ORDER BY timestamp ASC",
227 )?;
228
229 let events = stmt.query_map(params![session_id], |row| {
230 let timestamp: String = row.get(0)?;
231 let session_id: String = row.get(1)?;
232 let task_id: String = row.get(2)?;
233 let kind: String = row.get(3)?;
234 let success: Option<i32> = row.get(4)?;
235 let duration_ms: Option<i64> = row.get(5)?;
236 let tool_name: Option<String> = row.get(6)?;
237 let file_path: Option<String> = row.get(7)?;
238 let dependency_id: Option<String> = row.get(8)?;
239 let reason: Option<String> = row.get(9)?;
240 let data: Option<String> = row.get(10)?;
241
242 let event = match kind.as_str() {
243 "spawned" => EventKind::Spawned,
244 "started" => EventKind::Started,
245 "completed" => EventKind::Completed {
246 success: success.unwrap_or(0) != 0,
247 duration_ms: duration_ms.unwrap_or(0) as u64,
248 },
249 "failed" => EventKind::Failed {
250 reason: reason.unwrap_or_default(),
251 },
252 "tool_call" => EventKind::ToolCall {
253 tool: tool_name.clone().unwrap_or_default(),
254 input_summary: data.as_ref().and_then(|d| {
255 serde_json::from_str::<serde_json::Value>(d)
256 .ok()
257 .and_then(|v| {
258 v.get("input_summary")
259 .and_then(|s| s.as_str())
260 .map(String::from)
261 })
262 }),
263 },
264 "tool_result" => EventKind::ToolResult {
265 tool: tool_name.clone().unwrap_or_default(),
266 success: success.unwrap_or(0) != 0,
267 duration_ms: duration_ms.map(|d| d as u64),
268 },
269 "file_read" => EventKind::FileRead {
270 path: file_path.clone().unwrap_or_default(),
271 },
272 "file_write" => EventKind::FileWrite {
273 path: file_path.clone().unwrap_or_default(),
274 lines_changed: data.as_ref().and_then(|d| {
275 serde_json::from_str::<serde_json::Value>(d)
276 .ok()
277 .and_then(|v| v.get("lines_changed").and_then(|n| n.as_u64()))
278 .map(|n| n as u32)
279 }),
280 },
281 "dependency_met" => EventKind::DependencyMet {
282 dependency_id: dependency_id.clone().unwrap_or_default(),
283 },
284 "unblocked" => EventKind::Unblocked {
285 by_task_id: dependency_id.clone().unwrap_or_default(),
286 },
287 "output" => EventKind::Output {
288 line: data
289 .as_ref()
290 .and_then(|d| {
291 serde_json::from_str::<serde_json::Value>(d)
292 .ok()
293 .and_then(|v| v.get("line").and_then(|s| s.as_str()).map(String::from))
294 })
295 .unwrap_or_default(),
296 },
297 "wave_started" => {
298 let parsed = data
299 .as_ref()
300 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
301 EventKind::WaveStarted {
302 wave_number: parsed
303 .as_ref()
304 .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
305 .unwrap_or(0) as usize,
306 task_count: parsed
307 .as_ref()
308 .and_then(|v| v.get("task_count").and_then(|n| n.as_u64()))
309 .unwrap_or(0) as usize,
310 }
311 }
312 "wave_completed" => {
313 let parsed = data
314 .as_ref()
315 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
316 EventKind::WaveCompleted {
317 wave_number: parsed
318 .as_ref()
319 .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
320 .unwrap_or(0) as usize,
321 duration_ms: duration_ms.unwrap_or(0) as u64,
322 }
323 }
324 "validation_passed" => EventKind::ValidationPassed,
325 "validation_failed" => {
326 let parsed = data
327 .as_ref()
328 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
329 EventKind::ValidationFailed {
330 failures: parsed
331 .as_ref()
332 .and_then(|v| v.get("failures"))
333 .and_then(|v| v.as_array())
334 .map(|arr| {
335 arr.iter()
336 .filter_map(|v| v.as_str().map(String::from))
337 .collect()
338 })
339 .unwrap_or_default(),
340 }
341 }
342 "repair_started" => {
343 let parsed = data
344 .as_ref()
345 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
346 EventKind::RepairStarted {
347 attempt: parsed
348 .as_ref()
349 .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
350 .unwrap_or(0) as usize,
351 task_ids: parsed
352 .as_ref()
353 .and_then(|v| v.get("task_ids"))
354 .and_then(|v| v.as_array())
355 .map(|arr| {
356 arr.iter()
357 .filter_map(|v| v.as_str().map(String::from))
358 .collect()
359 })
360 .unwrap_or_default(),
361 }
362 }
363 "repair_completed" => {
364 let parsed = data
365 .as_ref()
366 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
367 EventKind::RepairCompleted {
368 attempt: parsed
369 .as_ref()
370 .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
371 .unwrap_or(0) as usize,
372 success: success.unwrap_or(0) != 0,
373 }
374 }
375 _ => EventKind::Custom {
376 name: kind.clone(),
377 data: data.and_then(|d| serde_json::from_str(&d).ok()),
378 },
379 };
380
381 Ok(AgentEvent {
382 timestamp: DateTime::parse_from_rfc3339(×tamp)
383 .map(|dt| dt.with_timezone(&Utc))
384 .unwrap_or_else(|_| Utc::now()),
385 session_id,
386 task_id,
387 event,
388 })
389 })?;
390
391 events.collect::<Result<Vec<_>, _>>().map_err(Into::into)
392}
393
394pub fn get_events_for_session_limited(
395 conn: &Connection,
396 session_id: &str,
397 limit: Option<usize>,
398 since: Option<DateTime<Utc>>,
399) -> Result<Vec<AgentEvent>> {
400 let mut query = "SELECT timestamp, session_id, task_id, kind, success, duration_ms,
401 tool_name, file_path, dependency_id, reason, data
402 FROM events WHERE session_id = ?"
403 .to_string();
404 let mut params: Vec<String> = vec![session_id.to_string()];
405
406 if let Some(since) = since {
407 query.push_str(" AND timestamp >= ?");
408 params.push(since.to_rfc3339());
409 }
410
411 query.push_str(" ORDER BY timestamp DESC");
412
413 if let Some(limit) = limit {
414 query.push_str(&format!(" LIMIT {}", limit));
415 }
416
417 let mut stmt = conn.prepare(&query)?;
418
419 let events = stmt.query_map(rusqlite::params_from_iter(params), |row| {
420 let timestamp: String = row.get(0)?;
421 let session_id: String = row.get(1)?;
422 let task_id: String = row.get(2)?;
423 let kind: String = row.get(3)?;
424 let success: Option<i32> = row.get(4)?;
425 let duration_ms: Option<i64> = row.get(5)?;
426 let tool_name: Option<String> = row.get(6)?;
427 let file_path: Option<String> = row.get(7)?;
428 let dependency_id: Option<String> = row.get(8)?;
429 let reason: Option<String> = row.get(9)?;
430 let data: Option<String> = row.get(10)?;
431
432 let event = match kind.as_str() {
433 "spawned" => EventKind::Spawned,
434 "started" => EventKind::Started,
435 "completed" => EventKind::Completed {
436 success: success.unwrap_or(0) != 0,
437 duration_ms: duration_ms.unwrap_or(0) as u64,
438 },
439 "failed" => EventKind::Failed {
440 reason: reason.unwrap_or_default(),
441 },
442 "tool_call" => EventKind::ToolCall {
443 tool: tool_name.clone().unwrap_or_default(),
444 input_summary: data.as_ref().and_then(|d| {
445 serde_json::from_str::<serde_json::Value>(d)
446 .ok()
447 .and_then(|v| {
448 v.get("input_summary")
449 .and_then(|s| s.as_str())
450 .map(String::from)
451 })
452 }),
453 },
454 "tool_result" => EventKind::ToolResult {
455 tool: tool_name.clone().unwrap_or_default(),
456 success: success.unwrap_or(0) != 0,
457 duration_ms: duration_ms.map(|d| d as u64),
458 },
459 "file_read" => EventKind::FileRead {
460 path: file_path.clone().unwrap_or_default(),
461 },
462 "file_write" => EventKind::FileWrite {
463 path: file_path.clone().unwrap_or_default(),
464 lines_changed: data.as_ref().and_then(|d| {
465 serde_json::from_str::<serde_json::Value>(d)
466 .ok()
467 .and_then(|v| v.get("lines_changed").and_then(|n| n.as_u64()))
468 .map(|n| n as u32)
469 }),
470 },
471 "dependency_met" => EventKind::DependencyMet {
472 dependency_id: dependency_id.clone().unwrap_or_default(),
473 },
474 "unblocked" => EventKind::Unblocked {
475 by_task_id: dependency_id.clone().unwrap_or_default(),
476 },
477 "output" => EventKind::Output {
478 line: data
479 .as_ref()
480 .and_then(|d| {
481 serde_json::from_str::<serde_json::Value>(d)
482 .ok()
483 .and_then(|v| v.get("line").and_then(|s| s.as_str()).map(String::from))
484 })
485 .unwrap_or_default(),
486 },
487 "wave_started" => {
488 let parsed = data
489 .as_ref()
490 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
491 EventKind::WaveStarted {
492 wave_number: parsed
493 .as_ref()
494 .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
495 .unwrap_or(0) as usize,
496 task_count: parsed
497 .as_ref()
498 .and_then(|v| v.get("task_count").and_then(|n| n.as_u64()))
499 .unwrap_or(0) as usize,
500 }
501 }
502 "wave_completed" => {
503 let parsed = data
504 .as_ref()
505 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
506 EventKind::WaveCompleted {
507 wave_number: parsed
508 .as_ref()
509 .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
510 .unwrap_or(0) as usize,
511 duration_ms: duration_ms.unwrap_or(0) as u64,
512 }
513 }
514 "validation_passed" => EventKind::ValidationPassed,
515 "validation_failed" => {
516 let parsed = data
517 .as_ref()
518 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
519 EventKind::ValidationFailed {
520 failures: parsed
521 .as_ref()
522 .and_then(|v| v.get("failures"))
523 .and_then(|v| v.as_array())
524 .map(|arr| {
525 arr.iter()
526 .filter_map(|v| v.as_str().map(String::from))
527 .collect()
528 })
529 .unwrap_or_default(),
530 }
531 }
532 "repair_started" => {
533 let parsed = data
534 .as_ref()
535 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
536 EventKind::RepairStarted {
537 attempt: parsed
538 .as_ref()
539 .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
540 .unwrap_or(0) as usize,
541 task_ids: parsed
542 .as_ref()
543 .and_then(|v| v.get("task_ids"))
544 .and_then(|v| v.as_array())
545 .map(|arr| {
546 arr.iter()
547 .filter_map(|v| v.as_str().map(String::from))
548 .collect()
549 })
550 .unwrap_or_default(),
551 }
552 }
553 "repair_completed" => {
554 let parsed = data
555 .as_ref()
556 .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
557 EventKind::RepairCompleted {
558 attempt: parsed
559 .as_ref()
560 .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
561 .unwrap_or(0) as usize,
562 success: success.unwrap_or(0) != 0,
563 }
564 }
565 _ => EventKind::Custom {
566 name: kind.clone(),
567 data: data.and_then(|d| serde_json::from_str(&d).ok()),
568 },
569 };
570
571 Ok(AgentEvent {
572 timestamp: DateTime::parse_from_rfc3339(×tamp)
573 .map(|dt| dt.with_timezone(&Utc))
574 .unwrap_or_else(|_| Utc::now()),
575 session_id,
576 task_id,
577 event,
578 })
579 })?;
580
581 let mut events: Vec<AgentEvent> = events.collect::<Result<Vec<_>, _>>()?;
582 events.reverse(); Ok(events)
584}
585
586pub fn list_sessions(conn: &Connection) -> Result<Vec<String>> {
587 let mut stmt = conn.prepare(
588 "SELECT session_id, MIN(timestamp) as first_ts
589 FROM events GROUP BY session_id ORDER BY first_ts ASC",
590 )?;
591 let sessions = stmt.query_map([], |row| row.get::<_, String>(0))?;
592 sessions.collect::<Result<Vec<_>, _>>().map_err(Into::into)
593}