Skip to main content

magic_bird/store/
invocations.rs

1//! Invocation storage operations.
2
3use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14/// Summary of an invocation (for listing).
15#[derive(Debug)]
16pub struct InvocationSummary {
17    pub id: String,
18    pub cmd: String,
19    pub exit_code: i32,
20    pub timestamp: String,
21    pub duration_ms: Option<i64>,
22}
23
24impl Store {
25    /// Write an invocation record to the store.
26    ///
27    /// Behavior depends on storage mode:
28    /// - Parquet: Creates a new Parquet file in the appropriate date partition
29    /// - DuckDB: Inserts directly into the local.invocations
30    pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31        match self.config.storage_mode {
32            StorageMode::Parquet => self.write_invocation_parquet(record),
33            StorageMode::DuckDB => self.write_invocation_duckdb(record),
34        }
35    }
36
37    /// Write invocation to a Parquet file (multi-writer safe).
38    fn write_invocation_parquet(&self, record: &InvocationRecord) -> Result<()> {
39        let conn = self.connection_with_options(false)?;
40        let date = record.date();
41
42        // Ensure the partition directory exists (status-partitioned)
43        let partition_dir = self.config.invocations_dir_with_status(&record.status, &date);
44        fs::create_dir_all(&partition_dir)?;
45
46        // Generate filename: {session}--{executable}--{id}.parquet
47        let executable = record.executable.as_deref().unwrap_or("unknown");
48        let filename = format!(
49            "{}--{}--{}.parquet",
50            sanitize_filename(&record.session_id),
51            sanitize_filename(executable),
52            record.id
53        );
54        let file_path = partition_dir.join(&filename);
55
56        // Write via DuckDB using COPY
57        conn.execute_batch(
58            r#"
59            CREATE OR REPLACE TEMP TABLE temp_invocation (
60                id UUID,
61                session_id VARCHAR,
62                timestamp TIMESTAMP,
63                duration_ms BIGINT,
64                cwd VARCHAR,
65                cmd VARCHAR,
66                executable VARCHAR,
67                runner_id VARCHAR,
68                exit_code INTEGER,
69                status VARCHAR,
70                format_hint VARCHAR,
71                client_id VARCHAR,
72                hostname VARCHAR,
73                username VARCHAR,
74                tag VARCHAR,
75                date DATE
76            );
77            "#,
78        )?;
79
80        conn.execute(
81            r#"
82            INSERT INTO temp_invocation VALUES (
83                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
84            )
85            "#,
86            params![
87                record.id.to_string(),
88                record.session_id,
89                record.timestamp.to_rfc3339(),
90                record.duration_ms,
91                record.cwd,
92                record.cmd,
93                record.executable,
94                record.runner_id,
95                record.exit_code,
96                record.status,
97                record.format_hint,
98                record.client_id,
99                record.hostname,
100                record.username,
101                record.tag,
102                date.to_string(),
103            ],
104        )?;
105
106        // Atomic write: COPY to temp file, then rename
107        let temp_path = atomic::temp_path(&file_path);
108        conn.execute(
109            &format!(
110                "COPY temp_invocation TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
111                temp_path.display()
112            ),
113            [],
114        )?;
115        conn.execute("DROP TABLE temp_invocation", [])?;
116
117        // Rename temp to final (atomic on POSIX)
118        atomic::rename_into_place(&temp_path, &file_path)?;
119
120        Ok(())
121    }
122
123    /// Write invocation directly to DuckDB table.
124    fn write_invocation_duckdb(&self, record: &InvocationRecord) -> Result<()> {
125        let conn = self.connection()?;
126        let date = record.date();
127
128        conn.execute(
129            r#"
130            INSERT INTO local.invocations VALUES (
131                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
132            )
133            "#,
134            params![
135                record.id.to_string(),
136                record.session_id,
137                record.timestamp.to_rfc3339(),
138                record.duration_ms,
139                record.cwd,
140                record.cmd,
141                record.executable,
142                record.runner_id,
143                record.exit_code,
144                record.status,
145                record.format_hint,
146                record.client_id,
147                record.hostname,
148                record.username,
149                record.tag,
150                date.to_string(),
151            ],
152        )?;
153
154        Ok(())
155    }
156
157    /// Get recent invocations (last 7 days).
158    pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
159        let conn = self.connection()?;
160
161        let sql = format!(
162            r#"
163            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
164            FROM recent_invocations
165            LIMIT {}
166            "#,
167            limit
168        );
169
170        let mut stmt = match conn.prepare(&sql) {
171            Ok(stmt) => stmt,
172            Err(e) => {
173                if e.to_string().contains("No files found") {
174                    return Ok(Vec::new());
175                }
176                return Err(e.into());
177            }
178        };
179
180        let rows = stmt.query_map([], |row| {
181            Ok(InvocationSummary {
182                id: row.get(0)?,
183                cmd: row.get(1)?,
184                exit_code: row.get(2)?,
185                timestamp: row.get(3)?,
186                duration_ms: row.get(4)?,
187            })
188        });
189
190        match rows {
191            Ok(rows) => {
192                let mut results = Vec::new();
193                for row in rows {
194                    results.push(row?);
195                }
196                Ok(results)
197            }
198            Err(e) => {
199                if e.to_string().contains("No files found") {
200                    Ok(Vec::new())
201                } else {
202                    Err(e.into())
203                }
204            }
205        }
206    }
207
208    /// Get the last invocation (most recent).
209    pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
210        let invocations = self.recent_invocations(1)?;
211        Ok(invocations.into_iter().next())
212    }
213
214    /// Query invocations with filters from the query micro-language.
215    ///
216    /// Supports:
217    /// - `~N` range selector (limit to N results)
218    /// - `%exit<>0` field filters (exit code, duration, etc.)
219    /// - `%/pattern/` command regex
220    ///
221    /// Use `default_limit` to specify the limit when no range is provided:
222    /// - 20 for listing commands (shq i)
223    /// - 1 for single-item commands (shq o, shq I, shq R)
224    pub fn query_invocations_with_limit(
225        &self,
226        query: &Query,
227        default_limit: usize,
228    ) -> Result<Vec<InvocationSummary>> {
229        let conn = self.connection()?;
230
231        // Build WHERE clauses from query filters
232        let mut where_clauses: Vec<String> = Vec::new();
233
234        for component in &query.filters {
235            match component {
236                QueryComponent::CommandRegex(pattern) => {
237                    // Use regexp_matches for regex filtering
238                    let escaped = pattern.replace('\'', "''");
239                    where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
240                }
241                QueryComponent::FieldFilter(filter) => {
242                    // Map field names to SQL column names
243                    let column = match filter.field.as_str() {
244                        "exit" | "exit_code" => "exit_code",
245                        "duration" | "duration_ms" => "duration_ms",
246                        "cmd" | "command" => "cmd",
247                        "cwd" => "cwd",
248                        other => other, // Pass through unknown fields
249                    };
250
251                    let escaped_value = filter.value.replace('\'', "''");
252
253                    let clause = match filter.op {
254                        CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
255                        CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
256                        CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
257                        CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
258                        CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
259                        CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
260                        CompareOp::Regex => {
261                            format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
262                        }
263                    };
264                    where_clauses.push(clause);
265                }
266                QueryComponent::Tag(_) => {
267                    // Tags not implemented in MVP
268                }
269            }
270        }
271
272        // Build the SQL query
273        let where_sql = if where_clauses.is_empty() {
274            String::new()
275        } else {
276            format!("WHERE {}", where_clauses.join(" AND "))
277        };
278
279        let limit = query.range.map(|r| r.start).unwrap_or(default_limit);
280
281        let sql = format!(
282            r#"
283            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
284            FROM recent_invocations
285            {}
286            LIMIT {}
287            "#,
288            where_sql, limit
289        );
290
291        let mut stmt = match conn.prepare(&sql) {
292            Ok(stmt) => stmt,
293            Err(e) => {
294                if e.to_string().contains("No files found") {
295                    return Ok(Vec::new());
296                }
297                return Err(e.into());
298            }
299        };
300
301        let rows = stmt.query_map([], |row| {
302            Ok(InvocationSummary {
303                id: row.get(0)?,
304                cmd: row.get(1)?,
305                exit_code: row.get(2)?,
306                timestamp: row.get(3)?,
307                duration_ms: row.get(4)?,
308            })
309        });
310
311        match rows {
312            Ok(rows) => {
313                let mut results = Vec::new();
314                for row in rows {
315                    results.push(row?);
316                }
317                Ok(results)
318            }
319            Err(e) => {
320                if e.to_string().contains("No files found") {
321                    Ok(Vec::new())
322                } else {
323                    Err(e.into())
324                }
325            }
326        }
327    }
328
329    /// Query invocations with default limit of 20 (for listing).
330    pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
331        self.query_invocations_with_limit(query, 20)
332    }
333
334    /// Count total invocations in the store.
335    pub fn invocation_count(&self) -> Result<i64> {
336        let conn = self.connection()?;
337
338        let result: std::result::Result<i64, _> =
339            conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
340
341        match result {
342            Ok(count) => Ok(count),
343            Err(e) => {
344                if e.to_string().contains("No files found") {
345                    Ok(0)
346                } else {
347                    Err(e.into())
348                }
349            }
350        }
351    }
352
353    /// Find an invocation by its tag.
354    /// Returns the full invocation ID if found.
355    pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
356        let conn = self.connection()?;
357
358        // Normalize tag (remove leading : if present)
359        let tag = tag.trim_start_matches(':');
360
361        let result: std::result::Result<String, _> = conn.query_row(
362            "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
363            params![tag],
364            |row| row.get(0),
365        );
366
367        match result {
368            Ok(id) => Ok(Some(id)),
369            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
370            Err(e) => Err(e.into()),
371        }
372    }
373
374    /// Set or update the tag on an invocation.
375    pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
376        let conn = self.connection()?;
377
378        conn.execute(
379            "UPDATE local.invocations SET tag = ? WHERE id = ?",
380            params![tag, invocation_id],
381        )?;
382
383        Ok(())
384    }
385
386    /// Start a pending invocation.
387    ///
388    /// This:
389    /// 1. Creates a JSON pending file for crash recovery
390    /// 2. Writes the invocation to the status=pending partition
391    ///
392    /// Returns the pending invocation for later completion.
393    pub fn start_pending_invocation(
394        &self,
395        record: &InvocationRecord,
396    ) -> Result<super::pending::PendingInvocation> {
397        use super::pending::{write_pending_file, PendingInvocation};
398
399        // Create pending invocation marker
400        let pending = PendingInvocation::from_record(record)
401            .ok_or_else(|| crate::error::Error::Storage("Missing runner_id".to_string()))?;
402
403        // Write pending file first (crash-safe marker)
404        let pending_dir = self.config.pending_dir();
405        write_pending_file(&pending_dir, &pending)?;
406
407        // Write to status=pending partition
408        self.write_invocation(record)?;
409
410        Ok(pending)
411    }
412
413    /// Complete a pending invocation.
414    ///
415    /// This:
416    /// 1. Writes the completed record to status=completed partition
417    /// 2. Deletes the pending parquet file from status=pending partition
418    /// 3. Deletes the JSON pending file
419    pub fn complete_pending_invocation(
420        &self,
421        record: &InvocationRecord,
422        pending: &super::pending::PendingInvocation,
423    ) -> Result<()> {
424        use super::pending::delete_pending_file;
425
426        // Write completed record
427        self.write_invocation(record)?;
428
429        // Delete pending parquet file
430        let pending_date = pending.timestamp.date_naive();
431        let pending_partition = self.config.invocations_dir_with_status("pending", &pending_date);
432        let executable = record.executable.as_deref().unwrap_or("unknown");
433        let pending_filename = format!(
434            "{}--{}--{}.parquet",
435            sanitize_filename(&pending.session_id),
436            sanitize_filename(executable),
437            pending.id
438        );
439        let pending_parquet = pending_partition.join(&pending_filename);
440        if pending_parquet.exists() {
441            let _ = fs::remove_file(&pending_parquet);
442        }
443
444        // Delete JSON pending file
445        let pending_dir = self.config.pending_dir();
446        delete_pending_file(&pending_dir, pending.id, &pending.session_id)?;
447
448        Ok(())
449    }
450
451    /// Recover orphaned invocations from pending files.
452    ///
453    /// This scans pending files and marks invocations as orphaned if:
454    /// - The runner is no longer alive
455    /// - The pending file is older than max_age_hours
456    pub fn recover_orphaned_invocations(
457        &self,
458        max_age_hours: u32,
459        dry_run: bool,
460    ) -> Result<super::pending::RecoveryStats> {
461        use super::pending::{
462            delete_pending_file, is_runner_alive, list_pending_files, RecoveryStats,
463        };
464
465        let pending_dir = self.config.pending_dir();
466        let pending_files = list_pending_files(&pending_dir)?;
467        let mut stats = RecoveryStats::default();
468
469        let now = chrono::Utc::now();
470        let max_age = chrono::Duration::hours(max_age_hours as i64);
471
472        for pending in pending_files {
473            stats.pending_checked += 1;
474
475            // Check if too old (runner ID might have been recycled)
476            let age = now.signed_duration_since(pending.timestamp);
477            let is_stale = age > max_age;
478
479            // Check if runner is still alive
480            let runner_alive = !is_stale && is_runner_alive(&pending.runner_id);
481
482            if runner_alive {
483                stats.still_running += 1;
484                continue;
485            }
486
487            if dry_run {
488                stats.orphaned += 1;
489                continue;
490            }
491
492            // Create orphaned record
493            let orphaned_record = InvocationRecord {
494                id: pending.id,
495                session_id: pending.session_id.clone(),
496                timestamp: pending.timestamp,
497                duration_ms: None, // Unknown
498                cwd: pending.cwd.clone(),
499                cmd: pending.cmd.clone(),
500                executable: extract_executable(&pending.cmd),
501                runner_id: Some(pending.runner_id.clone()),
502                exit_code: None, // Unknown/crashed
503                status: "orphaned".to_string(),
504                format_hint: None,
505                client_id: pending.client_id.clone(),
506                hostname: None, // Not available from pending file
507                username: None, // Not available from pending file
508                tag: None,
509            };
510
511            // Write to status=orphaned partition
512            match self.write_invocation(&orphaned_record) {
513                Ok(()) => {
514                    // Delete pending parquet file
515                    let pending_date = pending.timestamp.date_naive();
516                    let pending_partition =
517                        self.config.invocations_dir_with_status("pending", &pending_date);
518                    let executable = orphaned_record.executable.as_deref().unwrap_or("unknown");
519                    let pending_filename = format!(
520                        "{}--{}--{}.parquet",
521                        sanitize_filename(&pending.session_id),
522                        sanitize_filename(executable),
523                        pending.id
524                    );
525                    let pending_parquet = pending_partition.join(&pending_filename);
526                    let _ = fs::remove_file(&pending_parquet);
527
528                    // Delete JSON pending file
529                    let _ = delete_pending_file(&pending_dir, pending.id, &pending.session_id);
530
531                    stats.orphaned += 1;
532                }
533                Err(_) => {
534                    stats.errors += 1;
535                }
536            }
537        }
538
539        Ok(stats)
540    }
541}
542
543/// Extract executable name from command string.
544fn extract_executable(cmd: &str) -> Option<String> {
545    cmd.split_whitespace()
546        .next()
547        .map(|s| s.rsplit('/').next().unwrap_or(s).to_string())
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::init::initialize;
554    use crate::Config;
555    use tempfile::TempDir;
556
557    fn setup_store() -> (TempDir, Store) {
558        let tmp = TempDir::new().unwrap();
559        let config = Config::with_root(tmp.path());
560        initialize(&config).unwrap();
561        let store = Store::open(config).unwrap();
562        (tmp, store)
563    }
564
565    #[test]
566    fn test_write_and_count_invocation() {
567        let (_tmp, store) = setup_store();
568
569        let record = InvocationRecord::new(
570            "test-session",
571            "make test",
572            "/home/user/project",
573            0,
574            "test@client",
575        );
576
577        store.write_invocation(&record).unwrap();
578
579        let count = store.invocation_count().unwrap();
580        assert_eq!(count, 1);
581    }
582
583    #[test]
584    fn test_write_and_query_invocation() {
585        let (_tmp, store) = setup_store();
586
587        let record = InvocationRecord::new(
588            "test-session",
589            "cargo build",
590            "/home/user/project",
591            0,
592            "test@client",
593        )
594        .with_duration(1500);
595
596        store.write_invocation(&record).unwrap();
597
598        // Query using SQL
599        let result = store
600            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
601            .unwrap();
602
603        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
604        assert_eq!(result.rows.len(), 1);
605        assert_eq!(result.rows[0][0], "cargo build");
606        assert_eq!(result.rows[0][1], "0");
607        assert_eq!(result.rows[0][2], "1500");
608    }
609
610    #[test]
611    fn test_recent_invocations_empty() {
612        let (_tmp, store) = setup_store();
613
614        let recent = store.recent_invocations(10).unwrap();
615        assert!(recent.is_empty());
616    }
617
618    #[test]
619    fn test_recent_invocations() {
620        let (_tmp, store) = setup_store();
621
622        // Write a few invocations
623        for i in 0..3 {
624            let record = InvocationRecord::new(
625                "test-session",
626                format!("command-{}", i),
627                "/home/user",
628                i,
629                "test@client",
630            );
631            store.write_invocation(&record).unwrap();
632        }
633
634        let recent = store.recent_invocations(10).unwrap();
635        assert_eq!(recent.len(), 3);
636    }
637
638    #[test]
639    fn test_atomic_parquet_no_temp_files() {
640        let (_tmp, store) = setup_store();
641
642        let record = InvocationRecord::new(
643            "test-session",
644            "test",
645            "/home/user",
646            0,
647            "test@client",
648        );
649        store.write_invocation(&record).unwrap();
650
651        // Check no .tmp files in invocations directory
652        let date = record.date();
653        let inv_dir = store.config().invocations_dir(&date);
654        let temps: Vec<_> = std::fs::read_dir(&inv_dir)
655            .unwrap()
656            .filter_map(|e| e.ok())
657            .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
658            .collect();
659        assert!(
660            temps.is_empty(),
661            "No temp files should remain in {:?}",
662            inv_dir
663        );
664    }
665
666    // DuckDB mode tests
667
668    fn setup_store_duckdb() -> (TempDir, Store) {
669        let tmp = TempDir::new().unwrap();
670        let config = Config::with_duckdb_mode(tmp.path());
671        initialize(&config).unwrap();
672        let store = Store::open(config).unwrap();
673        (tmp, store)
674    }
675
676    #[test]
677    fn test_duckdb_mode_write_and_count_invocation() {
678        let (_tmp, store) = setup_store_duckdb();
679
680        let record = InvocationRecord::new(
681            "test-session",
682            "make test",
683            "/home/user/project",
684            0,
685            "test@client",
686        );
687
688        store.write_invocation(&record).unwrap();
689
690        let count = store.invocation_count().unwrap();
691        assert_eq!(count, 1);
692    }
693
694    #[test]
695    fn test_duckdb_mode_write_and_query_invocation() {
696        let (_tmp, store) = setup_store_duckdb();
697
698        let record = InvocationRecord::new(
699            "test-session",
700            "cargo build",
701            "/home/user/project",
702            0,
703            "test@client",
704        )
705        .with_duration(1500);
706
707        store.write_invocation(&record).unwrap();
708
709        // Query using SQL
710        let result = store
711            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
712            .unwrap();
713
714        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
715        assert_eq!(result.rows.len(), 1);
716        assert_eq!(result.rows[0][0], "cargo build");
717        assert_eq!(result.rows[0][1], "0");
718        assert_eq!(result.rows[0][2], "1500");
719    }
720
721    #[test]
722    fn test_duckdb_mode_recent_invocations() {
723        let (_tmp, store) = setup_store_duckdb();
724
725        // Write a few invocations
726        for i in 0..3 {
727            let record = InvocationRecord::new(
728                "test-session",
729                format!("command-{}", i),
730                "/home/user",
731                i,
732                "test@client",
733            );
734            store.write_invocation(&record).unwrap();
735        }
736
737        let recent = store.recent_invocations(10).unwrap();
738        assert_eq!(recent.len(), 3);
739    }
740
741    #[test]
742    fn test_duckdb_mode_no_parquet_files() {
743        let (tmp, store) = setup_store_duckdb();
744
745        let record = InvocationRecord::new(
746            "test-session",
747            "test",
748            "/home/user",
749            0,
750            "test@client",
751        );
752        store.write_invocation(&record).unwrap();
753
754        // Check that no parquet files were created in recent/invocations
755        let invocations_dir = tmp.path().join("db/data/recent/invocations");
756        if invocations_dir.exists() {
757            let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
758                .unwrap()
759                .filter_map(|e| e.ok())
760                .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
761                .collect();
762            assert!(
763                parquet_files.is_empty(),
764                "DuckDB mode should not create parquet files"
765            );
766        }
767    }
768
769    #[test]
770    fn test_pending_invocation_lifecycle() {
771        let (tmp, store) = setup_store();
772
773        // Create a pending invocation (using current process PID)
774        let record = InvocationRecord::new_pending_local(
775            "test-session",
776            "long-running-command",
777            "/home/user",
778            std::process::id() as i32,
779            "test@client",
780        );
781
782        // Start the pending invocation
783        let pending = store.start_pending_invocation(&record).unwrap();
784
785        // Verify pending file was created
786        let pending_dir = tmp.path().join("db/pending");
787        let pending_path = pending.path(&pending_dir);
788        assert!(pending_path.exists(), "Pending file should exist");
789
790        // Verify invocation was written to status=pending partition
791        let date = record.date();
792        let pending_partition = tmp
793            .path()
794            .join("db/data/recent/invocations")
795            .join("status=pending")
796            .join(format!("date={}", date));
797        assert!(pending_partition.exists(), "Pending partition should exist");
798
799        // Complete the invocation
800        let completed_record = record.complete(0, Some(100));
801        store
802            .complete_pending_invocation(&completed_record, &pending)
803            .unwrap();
804
805        // Verify pending file was deleted
806        assert!(!pending_path.exists(), "Pending file should be deleted");
807
808        // Verify completed record was written
809        let completed_partition = tmp
810            .path()
811            .join("db/data/recent/invocations")
812            .join("status=completed")
813            .join(format!("date={}", date));
814        assert!(
815            completed_partition.exists(),
816            "Completed partition should exist"
817        );
818    }
819
820    #[test]
821    fn test_recover_orphaned_invocations() {
822        let (tmp, store) = setup_store();
823
824        // Create a pending invocation with a dead PID
825        let record = InvocationRecord::new_pending_local(
826            "test-session",
827            "crashed-command",
828            "/home/user",
829            999999999, // PID that doesn't exist
830            "test@client",
831        );
832
833        // Write pending file manually (simulating a crash scenario)
834        let pending =
835            crate::store::pending::PendingInvocation::from_record(&record).unwrap();
836        let pending_dir = tmp.path().join("db/pending");
837        crate::store::pending::write_pending_file(&pending_dir, &pending).unwrap();
838
839        // Write to status=pending partition
840        store.write_invocation(&record).unwrap();
841
842        // Verify pending file exists
843        let pending_path = pending.path(&pending_dir);
844        assert!(pending_path.exists(), "Pending file should exist before recovery");
845
846        // Run recovery
847        let stats = store.recover_orphaned_invocations(24, false).unwrap();
848
849        assert_eq!(stats.pending_checked, 1);
850        assert_eq!(stats.orphaned, 1);
851        assert_eq!(stats.still_running, 0);
852
853        // Verify pending file was deleted
854        assert!(!pending_path.exists(), "Pending file should be deleted after recovery");
855
856        // Verify orphaned record was written
857        let date = record.date();
858        let orphaned_partition = tmp
859            .path()
860            .join("db/data/recent/invocations")
861            .join("status=orphaned")
862            .join(format!("date={}", date));
863        assert!(
864            orphaned_partition.exists(),
865            "Orphaned partition should exist"
866        );
867    }
868
869    #[test]
870    fn test_recover_skips_running_processes() {
871        let (tmp, store) = setup_store();
872
873        // Create a pending invocation with the current process PID (still alive)
874        let record = InvocationRecord::new_pending_local(
875            "test-session",
876            "running-command",
877            "/home/user",
878            std::process::id() as i32,
879            "test@client",
880        );
881
882        // Write pending file
883        let pending =
884            crate::store::pending::PendingInvocation::from_record(&record).unwrap();
885        let pending_dir = tmp.path().join("db/pending");
886        crate::store::pending::write_pending_file(&pending_dir, &pending).unwrap();
887
888        // Run recovery
889        let stats = store.recover_orphaned_invocations(24, false).unwrap();
890
891        assert_eq!(stats.pending_checked, 1);
892        assert_eq!(stats.still_running, 1);
893        assert_eq!(stats.orphaned, 0);
894
895        // Verify pending file was NOT deleted
896        let pending_path = pending.path(&pending_dir);
897        assert!(pending_path.exists(), "Pending file should still exist for running process");
898    }
899}