1use duckdb::params;
8
9use super::Store;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14#[derive(Debug)]
16pub struct InvocationSummary {
17 pub id: String,
18 pub cmd: String,
19 pub exit_code: i32,
20 pub timestamp: String,
21 pub duration_ms: Option<i64>,
22}
23
24impl Store {
25 pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31 let attempt = record.to_attempt();
33 let outcome = record.to_outcome();
34
35 self.write_attempt(&attempt)?;
37
38 if let Some(outcome) = outcome {
40 self.write_outcome(&outcome)?;
41 }
42
43 Ok(())
44 }
45
46 pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
48 let conn = self.connection()?;
49
50 let sql = format!(
51 r#"
52 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
53 FROM recent_invocations
54 LIMIT {}
55 "#,
56 limit
57 );
58
59 let mut stmt = match conn.prepare(&sql) {
60 Ok(stmt) => stmt,
61 Err(e) => {
62 if e.to_string().contains("No files found") {
63 return Ok(Vec::new());
64 }
65 return Err(e.into());
66 }
67 };
68
69 let rows = stmt.query_map([], |row| {
70 Ok(InvocationSummary {
71 id: row.get(0)?,
72 cmd: row.get(1)?,
73 exit_code: row.get(2)?,
74 timestamp: row.get(3)?,
75 duration_ms: row.get(4)?,
76 })
77 });
78
79 match rows {
80 Ok(rows) => {
81 let mut results = Vec::new();
82 for row in rows {
83 results.push(row?);
84 }
85 Ok(results)
86 }
87 Err(e) => {
88 if e.to_string().contains("No files found") {
89 Ok(Vec::new())
90 } else {
91 Err(e.into())
92 }
93 }
94 }
95 }
96
97 pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
99 let invocations = self.recent_invocations(1)?;
100 Ok(invocations.into_iter().next())
101 }
102
103 pub fn query_invocations_with_limit(
114 &self,
115 query: &Query,
116 default_limit: usize,
117 ) -> Result<Vec<InvocationSummary>> {
118 let conn = self.connection()?;
119
120 let mut where_clauses: Vec<String> = Vec::new();
122
123 for component in &query.filters {
124 match component {
125 QueryComponent::CommandRegex(pattern) => {
126 let escaped = pattern.replace('\'', "''");
128 where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
129 }
130 QueryComponent::FieldFilter(filter) => {
131 let column = match filter.field.as_str() {
133 "exit" | "exit_code" => "exit_code",
134 "duration" | "duration_ms" => "duration_ms",
135 "cmd" | "command" => "cmd",
136 "cwd" => "cwd",
137 other => other, };
139
140 let escaped_value = filter.value.replace('\'', "''");
141
142 let clause = match filter.op {
143 CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
144 CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
145 CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
146 CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
147 CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
148 CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
149 CompareOp::Regex => {
150 format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
151 }
152 };
153 where_clauses.push(clause);
154 }
155 QueryComponent::Tag(_) => {
156 }
158 }
159 }
160
161 let where_sql = if where_clauses.is_empty() {
163 String::new()
164 } else {
165 format!("WHERE {}", where_clauses.join(" AND "))
166 };
167
168 let (limit, offset) = if let Some(range) = query.range {
173 if range.is_single() {
174 (1, range.start.saturating_sub(1))
176 } else if range.is_last_n() {
177 (range.start, 0)
179 } else {
180 let end_pos = range.end.unwrap_or(1);
184 let count = range.start.saturating_sub(end_pos) + 1;
185 (count, end_pos.saturating_sub(1))
186 }
187 } else {
188 (default_limit, 0)
189 };
190
191 let sql = format!(
192 r#"
193 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
194 FROM recent_invocations
195 {}
196 LIMIT {}
197 OFFSET {}
198 "#,
199 where_sql, limit, offset
200 );
201
202 let mut stmt = match conn.prepare(&sql) {
203 Ok(stmt) => stmt,
204 Err(e) => {
205 if e.to_string().contains("No files found") {
206 return Ok(Vec::new());
207 }
208 return Err(e.into());
209 }
210 };
211
212 let rows = stmt.query_map([], |row| {
213 Ok(InvocationSummary {
214 id: row.get(0)?,
215 cmd: row.get(1)?,
216 exit_code: row.get(2)?,
217 timestamp: row.get(3)?,
218 duration_ms: row.get(4)?,
219 })
220 });
221
222 match rows {
223 Ok(rows) => {
224 let mut results = Vec::new();
225 for row in rows {
226 results.push(row?);
227 }
228 Ok(results)
229 }
230 Err(e) => {
231 if e.to_string().contains("No files found") {
232 Ok(Vec::new())
233 } else {
234 Err(e.into())
235 }
236 }
237 }
238 }
239
240 pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
242 self.query_invocations_with_limit(query, 20)
243 }
244
245 pub fn invocation_count(&self) -> Result<i64> {
247 let conn = self.connection()?;
248
249 let result: std::result::Result<i64, _> =
250 conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
251
252 match result {
253 Ok(count) => Ok(count),
254 Err(e) => {
255 if e.to_string().contains("No files found") {
256 Ok(0)
257 } else {
258 Err(e.into())
259 }
260 }
261 }
262 }
263
264 pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
267 let conn = self.connection()?;
268
269 let tag = tag.trim_start_matches(':');
271
272 let result: std::result::Result<String, _> = conn.query_row(
273 "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
274 params![tag],
275 |row| row.get(0),
276 );
277
278 match result {
279 Ok(id) => Ok(Some(id)),
280 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
281 Err(e) => Err(e.into()),
282 }
283 }
284
285 pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
289 let conn = self.connection()?;
290
291 conn.execute(
292 "UPDATE local.attempts SET tag = ? WHERE id = ?",
293 params![tag, invocation_id],
294 )?;
295
296 Ok(())
297 }
298
299 pub fn recover_orphaned_invocations(
306 &self,
307 max_age_hours: u32,
308 dry_run: bool,
309 ) -> Result<super::pending::RecoveryStats> {
310 use super::pending::{is_runner_alive, RecoveryStats};
311
312 let pending_attempts = self.get_pending_attempts()?;
314 let mut stats = RecoveryStats::default();
315
316 let now = chrono::Utc::now();
317 let max_age = chrono::Duration::hours(max_age_hours as i64);
318
319 for attempt in pending_attempts {
320 stats.pending_checked += 1;
321
322 let age = now.signed_duration_since(attempt.timestamp);
324 let is_stale = age > max_age;
325
326 let runner_alive = if let Some(ref runner_id) = attempt.machine_id {
328 !is_stale && is_runner_alive(runner_id)
329 } else {
330 !is_stale
332 };
333
334 if runner_alive {
335 stats.still_running += 1;
336 continue;
337 }
338
339 if dry_run {
340 stats.orphaned += 1;
341 continue;
342 }
343
344 match self.orphan_invocation(attempt.id, attempt.date) {
346 Ok(()) => {
347 stats.orphaned += 1;
348 }
349 Err(_) => {
350 stats.errors += 1;
351 }
352 }
353 }
354
355 Ok(stats)
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use crate::init::initialize;
363 use crate::Config;
364 use tempfile::TempDir;
365
366 fn setup_store() -> (TempDir, Store) {
367 let tmp = TempDir::new().unwrap();
368 let config = Config::with_root(tmp.path());
369 initialize(&config).unwrap();
370 let store = Store::open(config).unwrap();
371 (tmp, store)
372 }
373
374 #[test]
375 fn test_write_and_count_invocation() {
376 let (_tmp, store) = setup_store();
377
378 let record = InvocationRecord::new(
379 "test-session",
380 "make test",
381 "/home/user/project",
382 0,
383 "test@client",
384 );
385
386 store.write_invocation(&record).unwrap();
387
388 let count = store.invocation_count().unwrap();
389 assert_eq!(count, 1);
390 }
391
392 #[test]
393 fn test_write_and_query_invocation() {
394 let (_tmp, store) = setup_store();
395
396 let record = InvocationRecord::new(
397 "test-session",
398 "cargo build",
399 "/home/user/project",
400 0,
401 "test@client",
402 )
403 .with_duration(1500);
404
405 store.write_invocation(&record).unwrap();
406
407 let result = store
409 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
410 .unwrap();
411
412 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
413 assert_eq!(result.rows.len(), 1);
414 assert_eq!(result.rows[0][0], "cargo build");
415 assert_eq!(result.rows[0][1], "0");
416 assert_eq!(result.rows[0][2], "1500");
417 }
418
419 #[test]
420 fn test_recent_invocations_empty() {
421 let (_tmp, store) = setup_store();
422
423 let recent = store.recent_invocations(10).unwrap();
424 assert!(recent.is_empty());
425 }
426
427 #[test]
428 fn test_recent_invocations() {
429 let (_tmp, store) = setup_store();
430
431 for i in 0..3 {
433 let record = InvocationRecord::new(
434 "test-session",
435 format!("command-{}", i),
436 "/home/user",
437 i,
438 "test@client",
439 );
440 store.write_invocation(&record).unwrap();
441 }
442
443 let recent = store.recent_invocations(10).unwrap();
444 assert_eq!(recent.len(), 3);
445 }
446
447 #[test]
448 fn test_atomic_parquet_no_temp_files() {
449 let (_tmp, store) = setup_store();
450
451 let record = InvocationRecord::new(
452 "test-session",
453 "test",
454 "/home/user",
455 0,
456 "test@client",
457 );
458 store.write_invocation(&record).unwrap();
459
460 let date = record.date();
462 let attempts_dir = store.config().attempts_dir(&date);
463 let temps: Vec<_> = std::fs::read_dir(&attempts_dir)
464 .unwrap()
465 .filter_map(|e| e.ok())
466 .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
467 .collect();
468 assert!(
469 temps.is_empty(),
470 "No temp files should remain in {:?}",
471 attempts_dir
472 );
473 }
474
475 fn setup_store_duckdb() -> (TempDir, Store) {
478 let tmp = TempDir::new().unwrap();
479 let config = Config::with_duckdb_mode(tmp.path());
480 initialize(&config).unwrap();
481 let store = Store::open(config).unwrap();
482 (tmp, store)
483 }
484
485 #[test]
486 fn test_duckdb_mode_write_and_count_invocation() {
487 let (_tmp, store) = setup_store_duckdb();
488
489 let record = InvocationRecord::new(
490 "test-session",
491 "make test",
492 "/home/user/project",
493 0,
494 "test@client",
495 );
496
497 store.write_invocation(&record).unwrap();
498
499 let count = store.invocation_count().unwrap();
500 assert_eq!(count, 1);
501 }
502
503 #[test]
504 fn test_duckdb_mode_write_and_query_invocation() {
505 let (_tmp, store) = setup_store_duckdb();
506
507 let record = InvocationRecord::new(
508 "test-session",
509 "cargo build",
510 "/home/user/project",
511 0,
512 "test@client",
513 )
514 .with_duration(1500);
515
516 store.write_invocation(&record).unwrap();
517
518 let result = store
520 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
521 .unwrap();
522
523 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
524 assert_eq!(result.rows.len(), 1);
525 assert_eq!(result.rows[0][0], "cargo build");
526 assert_eq!(result.rows[0][1], "0");
527 assert_eq!(result.rows[0][2], "1500");
528 }
529
530 #[test]
531 fn test_duckdb_mode_recent_invocations() {
532 let (_tmp, store) = setup_store_duckdb();
533
534 for i in 0..3 {
536 let record = InvocationRecord::new(
537 "test-session",
538 format!("command-{}", i),
539 "/home/user",
540 i,
541 "test@client",
542 );
543 store.write_invocation(&record).unwrap();
544 }
545
546 let recent = store.recent_invocations(10).unwrap();
547 assert_eq!(recent.len(), 3);
548 }
549
550 #[test]
551 fn test_duckdb_mode_no_parquet_files() {
552 let (tmp, store) = setup_store_duckdb();
553
554 let record = InvocationRecord::new(
555 "test-session",
556 "test",
557 "/home/user",
558 0,
559 "test@client",
560 );
561 store.write_invocation(&record).unwrap();
562
563 let invocations_dir = tmp.path().join("db/data/recent/invocations");
565 if invocations_dir.exists() {
566 let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
567 .unwrap()
568 .filter_map(|e| e.ok())
569 .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
570 .collect();
571 assert!(
572 parquet_files.is_empty(),
573 "DuckDB mode should not create parquet files"
574 );
575 }
576 }
577
578 #[test]
579 fn test_pending_invocation_lifecycle() {
580 let (_tmp, store) = setup_store();
581
582 use crate::schema::AttemptRecord;
584
585 let attempt = AttemptRecord::new(
587 "test-session",
588 "long-running-command",
589 "/home/user",
590 "test@client",
591 );
592
593 store.start_invocation(&attempt).unwrap();
595
596 let count = store.attempt_count().unwrap();
598 assert_eq!(count, 1, "Attempt should be written");
599
600 let pending = store.get_pending_attempts().unwrap();
602 assert_eq!(pending.len(), 1, "Should have one pending attempt");
603 assert_eq!(pending[0].id, attempt.id);
604
605 store.complete_invocation(attempt.id, 0, Some(100), attempt.date).unwrap();
607
608 let outcome_count = store.outcome_count().unwrap();
610 assert_eq!(outcome_count, 1, "Outcome should be written");
611
612 let pending_after = store.get_pending_attempts().unwrap();
614 assert!(pending_after.is_empty(), "No pending attempts after completion");
615
616 let invocations = store.recent_invocations(10).unwrap();
618 assert_eq!(invocations.len(), 1, "Should have one invocation");
619 assert_eq!(invocations[0].exit_code, 0);
620 }
621
622 #[test]
623 fn test_recover_orphaned_invocations() {
624 let (_tmp, store) = setup_store();
625
626 use crate::schema::AttemptRecord;
629
630 let mut attempt = AttemptRecord::new(
631 "test-session",
632 "crashed-command",
633 "/home/user",
634 "test@client",
635 );
636 attempt.machine_id = Some("pid:999999999".to_string());
638
639 store.write_attempt(&attempt).unwrap();
641
642 let pending = store.get_pending_attempts().unwrap();
644 assert_eq!(pending.len(), 1, "Should have one pending attempt");
645
646 let stats = store.recover_orphaned_invocations(24, false).unwrap();
648
649 assert_eq!(stats.pending_checked, 1);
650 assert_eq!(stats.orphaned, 1);
651 assert_eq!(stats.still_running, 0);
652
653 let outcome_count = store.outcome_count().unwrap();
655 assert_eq!(outcome_count, 1, "Orphaned outcome should be written");
656
657 let pending_after = store.get_pending_attempts().unwrap();
659 assert!(pending_after.is_empty(), "No pending attempts after recovery");
660 }
661
662 #[test]
663 fn test_recover_skips_running_processes() {
664 let (_tmp, store) = setup_store();
665
666 use crate::schema::AttemptRecord;
668
669 let mut attempt = AttemptRecord::new(
670 "test-session",
671 "running-command",
672 "/home/user",
673 "test@client",
674 );
675 attempt.machine_id = Some(format!("pid:{}", std::process::id()));
677
678 store.write_attempt(&attempt).unwrap();
680
681 let attempt_count = store.attempt_count().unwrap();
683 assert_eq!(attempt_count, 1, "Attempt should be written");
684
685 let pending_before = store.get_pending_attempts().unwrap();
687 assert_eq!(pending_before.len(), 1, "Should have one pending attempt before recovery");
688
689 let stats = store.recover_orphaned_invocations(24, false).unwrap();
691
692 assert_eq!(stats.pending_checked, 1, "Should check one pending attempt");
693 assert_eq!(stats.still_running, 1, "Should detect process is still running");
694 assert_eq!(stats.orphaned, 0, "Should not orphan running process");
695
696 let pending_after = store.get_pending_attempts().unwrap();
698 assert_eq!(pending_after.len(), 1, "Attempt should still be pending");
699 }
700}