Skip to main content

scud/db/
events.rs

1//! Event database operations - insert and query swarm events.
2
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6
7use crate::commands::swarm::events::{AgentEvent, EventKind};
8
9pub fn insert_event(conn: &Connection, event: &AgentEvent) -> Result<i64> {
10    let (kind, success, duration_ms, tool_name, file_path, dependency_id, reason, data) =
11        match &event.event {
12            EventKind::Spawned => ("spawned", None, None, None, None, None, None, None),
13            EventKind::Started => ("started", None, None, None, None, None, None, None),
14            EventKind::Completed {
15                success,
16                duration_ms,
17            } => (
18                "completed",
19                Some(*success as i32),
20                Some(*duration_ms as i64),
21                None,
22                None,
23                None,
24                None,
25                None,
26            ),
27            EventKind::Failed { reason } => (
28                "failed",
29                Some(0),
30                None,
31                None,
32                None,
33                None,
34                Some(reason.as_str()),
35                None,
36            ),
37            EventKind::ToolCall {
38                tool,
39                input_summary,
40            } => (
41                "tool_call",
42                None,
43                None,
44                Some(tool.as_str()),
45                None,
46                None,
47                None,
48                input_summary
49                    .as_ref()
50                    .map(|s| serde_json::json!({"input_summary": s}).to_string()),
51            ),
52            EventKind::ToolResult {
53                tool,
54                success,
55                duration_ms,
56            } => (
57                "tool_result",
58                Some(*success as i32),
59                duration_ms.map(|d| d as i64),
60                Some(tool.as_str()),
61                None,
62                None,
63                None,
64                None,
65            ),
66            EventKind::FileRead { path } => (
67                "file_read",
68                None,
69                None,
70                None,
71                Some(path.as_str()),
72                None,
73                None,
74                None,
75            ),
76            EventKind::FileWrite {
77                path,
78                lines_changed,
79            } => (
80                "file_write",
81                None,
82                None,
83                None,
84                Some(path.as_str()),
85                None,
86                None,
87                lines_changed.map(|l| serde_json::json!({"lines_changed": l}).to_string()),
88            ),
89            EventKind::DependencyMet { dependency_id } => (
90                "dependency_met",
91                None,
92                None,
93                None,
94                None,
95                Some(dependency_id.as_str()),
96                None,
97                None,
98            ),
99            EventKind::Unblocked { by_task_id } => (
100                "unblocked",
101                None,
102                None,
103                None,
104                None,
105                Some(by_task_id.as_str()),
106                None,
107                None,
108            ),
109            EventKind::Output { line } => (
110                "output",
111                None,
112                None,
113                None,
114                None,
115                None,
116                None,
117                Some(serde_json::json!({"line": line}).to_string()),
118            ),
119            EventKind::WaveStarted {
120                wave_number,
121                task_count,
122            } => (
123                "wave_started",
124                None,
125                None,
126                None,
127                None,
128                None,
129                None,
130                Some(
131                    serde_json::json!({"wave_number": wave_number, "task_count": task_count})
132                        .to_string(),
133                ),
134            ),
135            EventKind::WaveCompleted {
136                wave_number,
137                duration_ms,
138            } => (
139                "wave_completed",
140                None,
141                Some(*duration_ms as i64),
142                None,
143                None,
144                None,
145                None,
146                Some(serde_json::json!({"wave_number": wave_number}).to_string()),
147            ),
148            EventKind::ValidationPassed => (
149                "validation_passed",
150                Some(1),
151                None,
152                None,
153                None,
154                None,
155                None,
156                None,
157            ),
158            EventKind::ValidationFailed { failures } => (
159                "validation_failed",
160                Some(0),
161                None,
162                None,
163                None,
164                None,
165                None,
166                Some(serde_json::json!({"failures": failures}).to_string()),
167            ),
168            EventKind::RepairStarted { attempt, task_ids } => (
169                "repair_started",
170                None,
171                None,
172                None,
173                None,
174                None,
175                None,
176                Some(serde_json::json!({"attempt": attempt, "task_ids": task_ids}).to_string()),
177            ),
178            EventKind::RepairCompleted { attempt, success } => (
179                "repair_completed",
180                Some(*success as i32),
181                None,
182                None,
183                None,
184                None,
185                None,
186                Some(serde_json::json!({"attempt": attempt}).to_string()),
187            ),
188            EventKind::Heartbeat => ("heartbeat", None, None, None, None, None, None, None),
189            EventKind::Custom { name, data } => (
190                "custom",
191                None,
192                None,
193                None,
194                None,
195                None,
196                Some(name.as_str()),
197                data.as_ref().map(|d| d.to_string()),
198            ),
199        };
200
201    conn.execute(
202        "INSERT INTO events (timestamp, session_id, task_id, kind, success, duration_ms,
203         tool_name, file_path, dependency_id, reason, data)
204         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
205        params![
206            event.timestamp.to_rfc3339(),
207            event.session_id,
208            event.task_id,
209            kind,
210            success,
211            duration_ms,
212            tool_name,
213            file_path,
214            dependency_id,
215            reason,
216            data,
217        ],
218    )?;
219    Ok(conn.last_insert_rowid())
220}
221
222pub fn get_events_for_session(conn: &Connection, session_id: &str) -> Result<Vec<AgentEvent>> {
223    let mut stmt = conn.prepare(
224        "SELECT timestamp, session_id, task_id, kind, success, duration_ms,
225                tool_name, file_path, dependency_id, reason, data
226         FROM events WHERE session_id = ? ORDER BY timestamp ASC",
227    )?;
228
229    let events = stmt.query_map(params![session_id], |row| {
230        let timestamp: String = row.get(0)?;
231        let session_id: String = row.get(1)?;
232        let task_id: String = row.get(2)?;
233        let kind: String = row.get(3)?;
234        let success: Option<i32> = row.get(4)?;
235        let duration_ms: Option<i64> = row.get(5)?;
236        let tool_name: Option<String> = row.get(6)?;
237        let file_path: Option<String> = row.get(7)?;
238        let dependency_id: Option<String> = row.get(8)?;
239        let reason: Option<String> = row.get(9)?;
240        let data: Option<String> = row.get(10)?;
241
242        let event = match kind.as_str() {
243            "spawned" => EventKind::Spawned,
244            "started" => EventKind::Started,
245            "completed" => EventKind::Completed {
246                success: success.unwrap_or(0) != 0,
247                duration_ms: duration_ms.unwrap_or(0) as u64,
248            },
249            "failed" => EventKind::Failed {
250                reason: reason.unwrap_or_default(),
251            },
252            "tool_call" => EventKind::ToolCall {
253                tool: tool_name.clone().unwrap_or_default(),
254                input_summary: data.as_ref().and_then(|d| {
255                    serde_json::from_str::<serde_json::Value>(d)
256                        .ok()
257                        .and_then(|v| {
258                            v.get("input_summary")
259                                .and_then(|s| s.as_str())
260                                .map(String::from)
261                        })
262                }),
263            },
264            "tool_result" => EventKind::ToolResult {
265                tool: tool_name.clone().unwrap_or_default(),
266                success: success.unwrap_or(0) != 0,
267                duration_ms: duration_ms.map(|d| d as u64),
268            },
269            "file_read" => EventKind::FileRead {
270                path: file_path.clone().unwrap_or_default(),
271            },
272            "file_write" => EventKind::FileWrite {
273                path: file_path.clone().unwrap_or_default(),
274                lines_changed: data.as_ref().and_then(|d| {
275                    serde_json::from_str::<serde_json::Value>(d)
276                        .ok()
277                        .and_then(|v| v.get("lines_changed").and_then(|n| n.as_u64()))
278                        .map(|n| n as u32)
279                }),
280            },
281            "dependency_met" => EventKind::DependencyMet {
282                dependency_id: dependency_id.clone().unwrap_or_default(),
283            },
284            "unblocked" => EventKind::Unblocked {
285                by_task_id: dependency_id.clone().unwrap_or_default(),
286            },
287            "output" => EventKind::Output {
288                line: data
289                    .as_ref()
290                    .and_then(|d| {
291                        serde_json::from_str::<serde_json::Value>(d)
292                            .ok()
293                            .and_then(|v| v.get("line").and_then(|s| s.as_str()).map(String::from))
294                    })
295                    .unwrap_or_default(),
296            },
297            "wave_started" => {
298                let parsed = data
299                    .as_ref()
300                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
301                EventKind::WaveStarted {
302                    wave_number: parsed
303                        .as_ref()
304                        .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
305                        .unwrap_or(0) as usize,
306                    task_count: parsed
307                        .as_ref()
308                        .and_then(|v| v.get("task_count").and_then(|n| n.as_u64()))
309                        .unwrap_or(0) as usize,
310                }
311            }
312            "wave_completed" => {
313                let parsed = data
314                    .as_ref()
315                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
316                EventKind::WaveCompleted {
317                    wave_number: parsed
318                        .as_ref()
319                        .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
320                        .unwrap_or(0) as usize,
321                    duration_ms: duration_ms.unwrap_or(0) as u64,
322                }
323            }
324            "validation_passed" => EventKind::ValidationPassed,
325            "validation_failed" => {
326                let parsed = data
327                    .as_ref()
328                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
329                EventKind::ValidationFailed {
330                    failures: parsed
331                        .as_ref()
332                        .and_then(|v| v.get("failures"))
333                        .and_then(|v| v.as_array())
334                        .map(|arr| {
335                            arr.iter()
336                                .filter_map(|v| v.as_str().map(String::from))
337                                .collect()
338                        })
339                        .unwrap_or_default(),
340                }
341            }
342            "repair_started" => {
343                let parsed = data
344                    .as_ref()
345                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
346                EventKind::RepairStarted {
347                    attempt: parsed
348                        .as_ref()
349                        .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
350                        .unwrap_or(0) as usize,
351                    task_ids: parsed
352                        .as_ref()
353                        .and_then(|v| v.get("task_ids"))
354                        .and_then(|v| v.as_array())
355                        .map(|arr| {
356                            arr.iter()
357                                .filter_map(|v| v.as_str().map(String::from))
358                                .collect()
359                        })
360                        .unwrap_or_default(),
361                }
362            }
363            "repair_completed" => {
364                let parsed = data
365                    .as_ref()
366                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
367                EventKind::RepairCompleted {
368                    attempt: parsed
369                        .as_ref()
370                        .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
371                        .unwrap_or(0) as usize,
372                    success: success.unwrap_or(0) != 0,
373                }
374            }
375            _ => EventKind::Custom {
376                name: kind.clone(),
377                data: data.and_then(|d| serde_json::from_str(&d).ok()),
378            },
379        };
380
381        Ok(AgentEvent {
382            timestamp: DateTime::parse_from_rfc3339(&timestamp)
383                .map(|dt| dt.with_timezone(&Utc))
384                .unwrap_or_else(|_| Utc::now()),
385            session_id,
386            task_id,
387            event,
388        })
389    })?;
390
391    events.collect::<Result<Vec<_>, _>>().map_err(Into::into)
392}
393
394pub fn get_events_for_session_limited(
395    conn: &Connection,
396    session_id: &str,
397    limit: Option<usize>,
398    since: Option<DateTime<Utc>>,
399) -> Result<Vec<AgentEvent>> {
400    let mut query = "SELECT timestamp, session_id, task_id, kind, success, duration_ms,
401                    tool_name, file_path, dependency_id, reason, data
402             FROM events WHERE session_id = ?"
403        .to_string();
404    let mut params: Vec<String> = vec![session_id.to_string()];
405
406    if let Some(since) = since {
407        query.push_str(" AND timestamp >= ?");
408        params.push(since.to_rfc3339());
409    }
410
411    query.push_str(" ORDER BY timestamp DESC");
412
413    if let Some(limit) = limit {
414        query.push_str(&format!(" LIMIT {}", limit));
415    }
416
417    let mut stmt = conn.prepare(&query)?;
418
419    let events = stmt.query_map(rusqlite::params_from_iter(params), |row| {
420        let timestamp: String = row.get(0)?;
421        let session_id: String = row.get(1)?;
422        let task_id: String = row.get(2)?;
423        let kind: String = row.get(3)?;
424        let success: Option<i32> = row.get(4)?;
425        let duration_ms: Option<i64> = row.get(5)?;
426        let tool_name: Option<String> = row.get(6)?;
427        let file_path: Option<String> = row.get(7)?;
428        let dependency_id: Option<String> = row.get(8)?;
429        let reason: Option<String> = row.get(9)?;
430        let data: Option<String> = row.get(10)?;
431
432        let event = match kind.as_str() {
433            "spawned" => EventKind::Spawned,
434            "started" => EventKind::Started,
435            "completed" => EventKind::Completed {
436                success: success.unwrap_or(0) != 0,
437                duration_ms: duration_ms.unwrap_or(0) as u64,
438            },
439            "failed" => EventKind::Failed {
440                reason: reason.unwrap_or_default(),
441            },
442            "tool_call" => EventKind::ToolCall {
443                tool: tool_name.clone().unwrap_or_default(),
444                input_summary: data.as_ref().and_then(|d| {
445                    serde_json::from_str::<serde_json::Value>(d)
446                        .ok()
447                        .and_then(|v| {
448                            v.get("input_summary")
449                                .and_then(|s| s.as_str())
450                                .map(String::from)
451                        })
452                }),
453            },
454            "tool_result" => EventKind::ToolResult {
455                tool: tool_name.clone().unwrap_or_default(),
456                success: success.unwrap_or(0) != 0,
457                duration_ms: duration_ms.map(|d| d as u64),
458            },
459            "file_read" => EventKind::FileRead {
460                path: file_path.clone().unwrap_or_default(),
461            },
462            "file_write" => EventKind::FileWrite {
463                path: file_path.clone().unwrap_or_default(),
464                lines_changed: data.as_ref().and_then(|d| {
465                    serde_json::from_str::<serde_json::Value>(d)
466                        .ok()
467                        .and_then(|v| v.get("lines_changed").and_then(|n| n.as_u64()))
468                        .map(|n| n as u32)
469                }),
470            },
471            "dependency_met" => EventKind::DependencyMet {
472                dependency_id: dependency_id.clone().unwrap_or_default(),
473            },
474            "unblocked" => EventKind::Unblocked {
475                by_task_id: dependency_id.clone().unwrap_or_default(),
476            },
477            "output" => EventKind::Output {
478                line: data
479                    .as_ref()
480                    .and_then(|d| {
481                        serde_json::from_str::<serde_json::Value>(d)
482                            .ok()
483                            .and_then(|v| v.get("line").and_then(|s| s.as_str()).map(String::from))
484                    })
485                    .unwrap_or_default(),
486            },
487            "wave_started" => {
488                let parsed = data
489                    .as_ref()
490                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
491                EventKind::WaveStarted {
492                    wave_number: parsed
493                        .as_ref()
494                        .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
495                        .unwrap_or(0) as usize,
496                    task_count: parsed
497                        .as_ref()
498                        .and_then(|v| v.get("task_count").and_then(|n| n.as_u64()))
499                        .unwrap_or(0) as usize,
500                }
501            }
502            "wave_completed" => {
503                let parsed = data
504                    .as_ref()
505                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
506                EventKind::WaveCompleted {
507                    wave_number: parsed
508                        .as_ref()
509                        .and_then(|v| v.get("wave_number").and_then(|n| n.as_u64()))
510                        .unwrap_or(0) as usize,
511                    duration_ms: duration_ms.unwrap_or(0) as u64,
512                }
513            }
514            "validation_passed" => EventKind::ValidationPassed,
515            "validation_failed" => {
516                let parsed = data
517                    .as_ref()
518                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
519                EventKind::ValidationFailed {
520                    failures: parsed
521                        .as_ref()
522                        .and_then(|v| v.get("failures"))
523                        .and_then(|v| v.as_array())
524                        .map(|arr| {
525                            arr.iter()
526                                .filter_map(|v| v.as_str().map(String::from))
527                                .collect()
528                        })
529                        .unwrap_or_default(),
530                }
531            }
532            "repair_started" => {
533                let parsed = data
534                    .as_ref()
535                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
536                EventKind::RepairStarted {
537                    attempt: parsed
538                        .as_ref()
539                        .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
540                        .unwrap_or(0) as usize,
541                    task_ids: parsed
542                        .as_ref()
543                        .and_then(|v| v.get("task_ids"))
544                        .and_then(|v| v.as_array())
545                        .map(|arr| {
546                            arr.iter()
547                                .filter_map(|v| v.as_str().map(String::from))
548                                .collect()
549                        })
550                        .unwrap_or_default(),
551                }
552            }
553            "repair_completed" => {
554                let parsed = data
555                    .as_ref()
556                    .and_then(|d| serde_json::from_str::<serde_json::Value>(d).ok());
557                EventKind::RepairCompleted {
558                    attempt: parsed
559                        .as_ref()
560                        .and_then(|v| v.get("attempt").and_then(|n| n.as_u64()))
561                        .unwrap_or(0) as usize,
562                    success: success.unwrap_or(0) != 0,
563                }
564            }
565            _ => EventKind::Custom {
566                name: kind.clone(),
567                data: data.and_then(|d| serde_json::from_str(&d).ok()),
568            },
569        };
570
571        Ok(AgentEvent {
572            timestamp: DateTime::parse_from_rfc3339(&timestamp)
573                .map(|dt| dt.with_timezone(&Utc))
574                .unwrap_or_else(|_| Utc::now()),
575            session_id,
576            task_id,
577            event,
578        })
579    })?;
580
581    let mut events: Vec<AgentEvent> = events.collect::<Result<Vec<_>, _>>()?;
582    events.reverse(); // Reverse to chronological order since we queried DESC
583    Ok(events)
584}
585
586pub fn list_sessions(conn: &Connection) -> Result<Vec<String>> {
587    let mut stmt = conn.prepare(
588        "SELECT session_id, MIN(timestamp) as first_ts
589         FROM events GROUP BY session_id ORDER BY first_ts ASC",
590    )?;
591    let sessions = stmt.query_map([], |row| row.get::<_, String>(0))?;
592    sessions.collect::<Result<Vec<_>, _>>().map_err(Into::into)
593}