1use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14#[derive(Debug)]
16pub struct InvocationSummary {
17 pub id: String,
18 pub cmd: String,
19 pub exit_code: i32,
20 pub timestamp: String,
21 pub duration_ms: Option<i64>,
22}
23
24impl Store {
25 pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31 match self.config.storage_mode {
32 StorageMode::Parquet => self.write_invocation_parquet(record),
33 StorageMode::DuckDB => self.write_invocation_duckdb(record),
34 }
35 }
36
37 fn write_invocation_parquet(&self, record: &InvocationRecord) -> Result<()> {
39 let conn = self.connection_with_options(false)?;
40 let date = record.date();
41
42 let partition_dir = self.config.invocations_dir_with_status(&record.status, &date);
44 fs::create_dir_all(&partition_dir)?;
45
46 let executable = record.executable.as_deref().unwrap_or("unknown");
48 let filename = format!(
49 "{}--{}--{}.parquet",
50 sanitize_filename(&record.session_id),
51 sanitize_filename(executable),
52 record.id
53 );
54 let file_path = partition_dir.join(&filename);
55
56 conn.execute_batch(
58 r#"
59 CREATE OR REPLACE TEMP TABLE temp_invocation (
60 id UUID,
61 session_id VARCHAR,
62 timestamp TIMESTAMP,
63 duration_ms BIGINT,
64 cwd VARCHAR,
65 cmd VARCHAR,
66 executable VARCHAR,
67 runner_id VARCHAR,
68 exit_code INTEGER,
69 status VARCHAR,
70 format_hint VARCHAR,
71 client_id VARCHAR,
72 hostname VARCHAR,
73 username VARCHAR,
74 tag VARCHAR,
75 date DATE
76 );
77 "#,
78 )?;
79
80 conn.execute(
81 r#"
82 INSERT INTO temp_invocation VALUES (
83 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
84 )
85 "#,
86 params![
87 record.id.to_string(),
88 record.session_id,
89 record.timestamp.to_rfc3339(),
90 record.duration_ms,
91 record.cwd,
92 record.cmd,
93 record.executable,
94 record.runner_id,
95 record.exit_code,
96 record.status,
97 record.format_hint,
98 record.client_id,
99 record.hostname,
100 record.username,
101 record.tag,
102 date.to_string(),
103 ],
104 )?;
105
106 let temp_path = atomic::temp_path(&file_path);
108 conn.execute(
109 &format!(
110 "COPY temp_invocation TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
111 temp_path.display()
112 ),
113 [],
114 )?;
115 conn.execute("DROP TABLE temp_invocation", [])?;
116
117 atomic::rename_into_place(&temp_path, &file_path)?;
119
120 Ok(())
121 }
122
123 fn write_invocation_duckdb(&self, record: &InvocationRecord) -> Result<()> {
125 let conn = self.connection()?;
126 let date = record.date();
127
128 conn.execute(
129 r#"
130 INSERT INTO local.invocations VALUES (
131 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
132 )
133 "#,
134 params![
135 record.id.to_string(),
136 record.session_id,
137 record.timestamp.to_rfc3339(),
138 record.duration_ms,
139 record.cwd,
140 record.cmd,
141 record.executable,
142 record.runner_id,
143 record.exit_code,
144 record.status,
145 record.format_hint,
146 record.client_id,
147 record.hostname,
148 record.username,
149 record.tag,
150 date.to_string(),
151 ],
152 )?;
153
154 Ok(())
155 }
156
157 pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
159 let conn = self.connection()?;
160
161 let sql = format!(
162 r#"
163 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
164 FROM recent_invocations
165 LIMIT {}
166 "#,
167 limit
168 );
169
170 let mut stmt = match conn.prepare(&sql) {
171 Ok(stmt) => stmt,
172 Err(e) => {
173 if e.to_string().contains("No files found") {
174 return Ok(Vec::new());
175 }
176 return Err(e.into());
177 }
178 };
179
180 let rows = stmt.query_map([], |row| {
181 Ok(InvocationSummary {
182 id: row.get(0)?,
183 cmd: row.get(1)?,
184 exit_code: row.get(2)?,
185 timestamp: row.get(3)?,
186 duration_ms: row.get(4)?,
187 })
188 });
189
190 match rows {
191 Ok(rows) => {
192 let mut results = Vec::new();
193 for row in rows {
194 results.push(row?);
195 }
196 Ok(results)
197 }
198 Err(e) => {
199 if e.to_string().contains("No files found") {
200 Ok(Vec::new())
201 } else {
202 Err(e.into())
203 }
204 }
205 }
206 }
207
208 pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
210 let invocations = self.recent_invocations(1)?;
211 Ok(invocations.into_iter().next())
212 }
213
214 pub fn query_invocations_with_limit(
225 &self,
226 query: &Query,
227 default_limit: usize,
228 ) -> Result<Vec<InvocationSummary>> {
229 let conn = self.connection()?;
230
231 let mut where_clauses: Vec<String> = Vec::new();
233
234 for component in &query.filters {
235 match component {
236 QueryComponent::CommandRegex(pattern) => {
237 let escaped = pattern.replace('\'', "''");
239 where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
240 }
241 QueryComponent::FieldFilter(filter) => {
242 let column = match filter.field.as_str() {
244 "exit" | "exit_code" => "exit_code",
245 "duration" | "duration_ms" => "duration_ms",
246 "cmd" | "command" => "cmd",
247 "cwd" => "cwd",
248 other => other, };
250
251 let escaped_value = filter.value.replace('\'', "''");
252
253 let clause = match filter.op {
254 CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
255 CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
256 CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
257 CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
258 CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
259 CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
260 CompareOp::Regex => {
261 format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
262 }
263 };
264 where_clauses.push(clause);
265 }
266 QueryComponent::Tag(_) => {
267 }
269 }
270 }
271
272 let where_sql = if where_clauses.is_empty() {
274 String::new()
275 } else {
276 format!("WHERE {}", where_clauses.join(" AND "))
277 };
278
279 let limit = query.range.map(|r| r.start).unwrap_or(default_limit);
280
281 let sql = format!(
282 r#"
283 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
284 FROM recent_invocations
285 {}
286 LIMIT {}
287 "#,
288 where_sql, limit
289 );
290
291 let mut stmt = match conn.prepare(&sql) {
292 Ok(stmt) => stmt,
293 Err(e) => {
294 if e.to_string().contains("No files found") {
295 return Ok(Vec::new());
296 }
297 return Err(e.into());
298 }
299 };
300
301 let rows = stmt.query_map([], |row| {
302 Ok(InvocationSummary {
303 id: row.get(0)?,
304 cmd: row.get(1)?,
305 exit_code: row.get(2)?,
306 timestamp: row.get(3)?,
307 duration_ms: row.get(4)?,
308 })
309 });
310
311 match rows {
312 Ok(rows) => {
313 let mut results = Vec::new();
314 for row in rows {
315 results.push(row?);
316 }
317 Ok(results)
318 }
319 Err(e) => {
320 if e.to_string().contains("No files found") {
321 Ok(Vec::new())
322 } else {
323 Err(e.into())
324 }
325 }
326 }
327 }
328
329 pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
331 self.query_invocations_with_limit(query, 20)
332 }
333
334 pub fn invocation_count(&self) -> Result<i64> {
336 let conn = self.connection()?;
337
338 let result: std::result::Result<i64, _> =
339 conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
340
341 match result {
342 Ok(count) => Ok(count),
343 Err(e) => {
344 if e.to_string().contains("No files found") {
345 Ok(0)
346 } else {
347 Err(e.into())
348 }
349 }
350 }
351 }
352
353 pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
356 let conn = self.connection()?;
357
358 let tag = tag.trim_start_matches(':');
360
361 let result: std::result::Result<String, _> = conn.query_row(
362 "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
363 params![tag],
364 |row| row.get(0),
365 );
366
367 match result {
368 Ok(id) => Ok(Some(id)),
369 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
370 Err(e) => Err(e.into()),
371 }
372 }
373
374 pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
376 let conn = self.connection()?;
377
378 conn.execute(
379 "UPDATE local.invocations SET tag = ? WHERE id = ?",
380 params![tag, invocation_id],
381 )?;
382
383 Ok(())
384 }
385
386 pub fn start_pending_invocation(
394 &self,
395 record: &InvocationRecord,
396 ) -> Result<super::pending::PendingInvocation> {
397 use super::pending::{write_pending_file, PendingInvocation};
398
399 let pending = PendingInvocation::from_record(record)
401 .ok_or_else(|| crate::error::Error::Storage("Missing runner_id".to_string()))?;
402
403 let pending_dir = self.config.pending_dir();
405 write_pending_file(&pending_dir, &pending)?;
406
407 self.write_invocation(record)?;
409
410 Ok(pending)
411 }
412
413 pub fn complete_pending_invocation(
420 &self,
421 record: &InvocationRecord,
422 pending: &super::pending::PendingInvocation,
423 ) -> Result<()> {
424 use super::pending::delete_pending_file;
425
426 self.write_invocation(record)?;
428
429 let pending_date = pending.timestamp.date_naive();
431 let pending_partition = self.config.invocations_dir_with_status("pending", &pending_date);
432 let executable = record.executable.as_deref().unwrap_or("unknown");
433 let pending_filename = format!(
434 "{}--{}--{}.parquet",
435 sanitize_filename(&pending.session_id),
436 sanitize_filename(executable),
437 pending.id
438 );
439 let pending_parquet = pending_partition.join(&pending_filename);
440 if pending_parquet.exists() {
441 let _ = fs::remove_file(&pending_parquet);
442 }
443
444 let pending_dir = self.config.pending_dir();
446 delete_pending_file(&pending_dir, pending.id, &pending.session_id)?;
447
448 Ok(())
449 }
450
451 pub fn recover_orphaned_invocations(
457 &self,
458 max_age_hours: u32,
459 dry_run: bool,
460 ) -> Result<super::pending::RecoveryStats> {
461 use super::pending::{
462 delete_pending_file, is_runner_alive, list_pending_files, RecoveryStats,
463 };
464
465 let pending_dir = self.config.pending_dir();
466 let pending_files = list_pending_files(&pending_dir)?;
467 let mut stats = RecoveryStats::default();
468
469 let now = chrono::Utc::now();
470 let max_age = chrono::Duration::hours(max_age_hours as i64);
471
472 for pending in pending_files {
473 stats.pending_checked += 1;
474
475 let age = now.signed_duration_since(pending.timestamp);
477 let is_stale = age > max_age;
478
479 let runner_alive = !is_stale && is_runner_alive(&pending.runner_id);
481
482 if runner_alive {
483 stats.still_running += 1;
484 continue;
485 }
486
487 if dry_run {
488 stats.orphaned += 1;
489 continue;
490 }
491
492 let orphaned_record = InvocationRecord {
494 id: pending.id,
495 session_id: pending.session_id.clone(),
496 timestamp: pending.timestamp,
497 duration_ms: None, cwd: pending.cwd.clone(),
499 cmd: pending.cmd.clone(),
500 executable: extract_executable(&pending.cmd),
501 runner_id: Some(pending.runner_id.clone()),
502 exit_code: None, status: "orphaned".to_string(),
504 format_hint: None,
505 client_id: pending.client_id.clone(),
506 hostname: None, username: None, tag: None,
509 };
510
511 match self.write_invocation(&orphaned_record) {
513 Ok(()) => {
514 let pending_date = pending.timestamp.date_naive();
516 let pending_partition =
517 self.config.invocations_dir_with_status("pending", &pending_date);
518 let executable = orphaned_record.executable.as_deref().unwrap_or("unknown");
519 let pending_filename = format!(
520 "{}--{}--{}.parquet",
521 sanitize_filename(&pending.session_id),
522 sanitize_filename(executable),
523 pending.id
524 );
525 let pending_parquet = pending_partition.join(&pending_filename);
526 let _ = fs::remove_file(&pending_parquet);
527
528 let _ = delete_pending_file(&pending_dir, pending.id, &pending.session_id);
530
531 stats.orphaned += 1;
532 }
533 Err(_) => {
534 stats.errors += 1;
535 }
536 }
537 }
538
539 Ok(stats)
540 }
541}
542
543fn extract_executable(cmd: &str) -> Option<String> {
545 cmd.split_whitespace()
546 .next()
547 .map(|s| s.rsplit('/').next().unwrap_or(s).to_string())
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::init::initialize;
554 use crate::Config;
555 use tempfile::TempDir;
556
557 fn setup_store() -> (TempDir, Store) {
558 let tmp = TempDir::new().unwrap();
559 let config = Config::with_root(tmp.path());
560 initialize(&config).unwrap();
561 let store = Store::open(config).unwrap();
562 (tmp, store)
563 }
564
565 #[test]
566 fn test_write_and_count_invocation() {
567 let (_tmp, store) = setup_store();
568
569 let record = InvocationRecord::new(
570 "test-session",
571 "make test",
572 "/home/user/project",
573 0,
574 "test@client",
575 );
576
577 store.write_invocation(&record).unwrap();
578
579 let count = store.invocation_count().unwrap();
580 assert_eq!(count, 1);
581 }
582
583 #[test]
584 fn test_write_and_query_invocation() {
585 let (_tmp, store) = setup_store();
586
587 let record = InvocationRecord::new(
588 "test-session",
589 "cargo build",
590 "/home/user/project",
591 0,
592 "test@client",
593 )
594 .with_duration(1500);
595
596 store.write_invocation(&record).unwrap();
597
598 let result = store
600 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
601 .unwrap();
602
603 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
604 assert_eq!(result.rows.len(), 1);
605 assert_eq!(result.rows[0][0], "cargo build");
606 assert_eq!(result.rows[0][1], "0");
607 assert_eq!(result.rows[0][2], "1500");
608 }
609
610 #[test]
611 fn test_recent_invocations_empty() {
612 let (_tmp, store) = setup_store();
613
614 let recent = store.recent_invocations(10).unwrap();
615 assert!(recent.is_empty());
616 }
617
618 #[test]
619 fn test_recent_invocations() {
620 let (_tmp, store) = setup_store();
621
622 for i in 0..3 {
624 let record = InvocationRecord::new(
625 "test-session",
626 format!("command-{}", i),
627 "/home/user",
628 i,
629 "test@client",
630 );
631 store.write_invocation(&record).unwrap();
632 }
633
634 let recent = store.recent_invocations(10).unwrap();
635 assert_eq!(recent.len(), 3);
636 }
637
638 #[test]
639 fn test_atomic_parquet_no_temp_files() {
640 let (_tmp, store) = setup_store();
641
642 let record = InvocationRecord::new(
643 "test-session",
644 "test",
645 "/home/user",
646 0,
647 "test@client",
648 );
649 store.write_invocation(&record).unwrap();
650
651 let date = record.date();
653 let inv_dir = store.config().invocations_dir(&date);
654 let temps: Vec<_> = std::fs::read_dir(&inv_dir)
655 .unwrap()
656 .filter_map(|e| e.ok())
657 .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
658 .collect();
659 assert!(
660 temps.is_empty(),
661 "No temp files should remain in {:?}",
662 inv_dir
663 );
664 }
665
666 fn setup_store_duckdb() -> (TempDir, Store) {
669 let tmp = TempDir::new().unwrap();
670 let config = Config::with_duckdb_mode(tmp.path());
671 initialize(&config).unwrap();
672 let store = Store::open(config).unwrap();
673 (tmp, store)
674 }
675
676 #[test]
677 fn test_duckdb_mode_write_and_count_invocation() {
678 let (_tmp, store) = setup_store_duckdb();
679
680 let record = InvocationRecord::new(
681 "test-session",
682 "make test",
683 "/home/user/project",
684 0,
685 "test@client",
686 );
687
688 store.write_invocation(&record).unwrap();
689
690 let count = store.invocation_count().unwrap();
691 assert_eq!(count, 1);
692 }
693
694 #[test]
695 fn test_duckdb_mode_write_and_query_invocation() {
696 let (_tmp, store) = setup_store_duckdb();
697
698 let record = InvocationRecord::new(
699 "test-session",
700 "cargo build",
701 "/home/user/project",
702 0,
703 "test@client",
704 )
705 .with_duration(1500);
706
707 store.write_invocation(&record).unwrap();
708
709 let result = store
711 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
712 .unwrap();
713
714 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
715 assert_eq!(result.rows.len(), 1);
716 assert_eq!(result.rows[0][0], "cargo build");
717 assert_eq!(result.rows[0][1], "0");
718 assert_eq!(result.rows[0][2], "1500");
719 }
720
721 #[test]
722 fn test_duckdb_mode_recent_invocations() {
723 let (_tmp, store) = setup_store_duckdb();
724
725 for i in 0..3 {
727 let record = InvocationRecord::new(
728 "test-session",
729 format!("command-{}", i),
730 "/home/user",
731 i,
732 "test@client",
733 );
734 store.write_invocation(&record).unwrap();
735 }
736
737 let recent = store.recent_invocations(10).unwrap();
738 assert_eq!(recent.len(), 3);
739 }
740
741 #[test]
742 fn test_duckdb_mode_no_parquet_files() {
743 let (tmp, store) = setup_store_duckdb();
744
745 let record = InvocationRecord::new(
746 "test-session",
747 "test",
748 "/home/user",
749 0,
750 "test@client",
751 );
752 store.write_invocation(&record).unwrap();
753
754 let invocations_dir = tmp.path().join("db/data/recent/invocations");
756 if invocations_dir.exists() {
757 let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
758 .unwrap()
759 .filter_map(|e| e.ok())
760 .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
761 .collect();
762 assert!(
763 parquet_files.is_empty(),
764 "DuckDB mode should not create parquet files"
765 );
766 }
767 }
768
769 #[test]
770 fn test_pending_invocation_lifecycle() {
771 let (tmp, store) = setup_store();
772
773 let record = InvocationRecord::new_pending_local(
775 "test-session",
776 "long-running-command",
777 "/home/user",
778 std::process::id() as i32,
779 "test@client",
780 );
781
782 let pending = store.start_pending_invocation(&record).unwrap();
784
785 let pending_dir = tmp.path().join("db/pending");
787 let pending_path = pending.path(&pending_dir);
788 assert!(pending_path.exists(), "Pending file should exist");
789
790 let date = record.date();
792 let pending_partition = tmp
793 .path()
794 .join("db/data/recent/invocations")
795 .join("status=pending")
796 .join(format!("date={}", date));
797 assert!(pending_partition.exists(), "Pending partition should exist");
798
799 let completed_record = record.complete(0, Some(100));
801 store
802 .complete_pending_invocation(&completed_record, &pending)
803 .unwrap();
804
805 assert!(!pending_path.exists(), "Pending file should be deleted");
807
808 let completed_partition = tmp
810 .path()
811 .join("db/data/recent/invocations")
812 .join("status=completed")
813 .join(format!("date={}", date));
814 assert!(
815 completed_partition.exists(),
816 "Completed partition should exist"
817 );
818 }
819
820 #[test]
821 fn test_recover_orphaned_invocations() {
822 let (tmp, store) = setup_store();
823
824 let record = InvocationRecord::new_pending_local(
826 "test-session",
827 "crashed-command",
828 "/home/user",
829 999999999, "test@client",
831 );
832
833 let pending =
835 crate::store::pending::PendingInvocation::from_record(&record).unwrap();
836 let pending_dir = tmp.path().join("db/pending");
837 crate::store::pending::write_pending_file(&pending_dir, &pending).unwrap();
838
839 store.write_invocation(&record).unwrap();
841
842 let pending_path = pending.path(&pending_dir);
844 assert!(pending_path.exists(), "Pending file should exist before recovery");
845
846 let stats = store.recover_orphaned_invocations(24, false).unwrap();
848
849 assert_eq!(stats.pending_checked, 1);
850 assert_eq!(stats.orphaned, 1);
851 assert_eq!(stats.still_running, 0);
852
853 assert!(!pending_path.exists(), "Pending file should be deleted after recovery");
855
856 let date = record.date();
858 let orphaned_partition = tmp
859 .path()
860 .join("db/data/recent/invocations")
861 .join("status=orphaned")
862 .join(format!("date={}", date));
863 assert!(
864 orphaned_partition.exists(),
865 "Orphaned partition should exist"
866 );
867 }
868
869 #[test]
870 fn test_recover_skips_running_processes() {
871 let (tmp, store) = setup_store();
872
873 let record = InvocationRecord::new_pending_local(
875 "test-session",
876 "running-command",
877 "/home/user",
878 std::process::id() as i32,
879 "test@client",
880 );
881
882 let pending =
884 crate::store::pending::PendingInvocation::from_record(&record).unwrap();
885 let pending_dir = tmp.path().join("db/pending");
886 crate::store::pending::write_pending_file(&pending_dir, &pending).unwrap();
887
888 let stats = store.recover_orphaned_invocations(24, false).unwrap();
890
891 assert_eq!(stats.pending_checked, 1);
892 assert_eq!(stats.still_running, 1);
893 assert_eq!(stats.orphaned, 0);
894
895 let pending_path = pending.path(&pending_dir);
897 assert!(pending_path.exists(), "Pending file should still exist for running process");
898 }
899}