Skip to main content

magic_bird/store/
invocations.rs

1//! Invocation storage operations.
2//!
3//! V5 schema: Invocations are now composed of attempts + outcomes.
4//! - write_invocation() writes both attempt and outcome (for completed commands)
5//! - For long-running commands, use start_invocation() and complete_invocation()
6
7use duckdb::params;
8
9use super::Store;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14/// Summary of an invocation (for listing).
15#[derive(Debug)]
16pub struct InvocationSummary {
17    pub id: String,
18    pub cmd: String,
19    pub exit_code: i32,
20    pub timestamp: String,
21    pub duration_ms: Option<i64>,
22}
23
24impl Store {
25    /// Write an invocation record to the store (v5 schema).
26    ///
27    /// This writes both an attempt and an outcome record, since the invocation
28    /// is already complete. For long-running commands, use start_invocation()
29    /// followed by complete_invocation().
30    pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31        // Convert InvocationRecord to v5 attempt + outcome
32        let attempt = record.to_attempt();
33        let outcome = record.to_outcome();
34
35        // Write the attempt
36        self.write_attempt(&attempt)?;
37
38        // Write the outcome (if the invocation is completed)
39        if let Some(outcome) = outcome {
40            self.write_outcome(&outcome)?;
41        }
42
43        Ok(())
44    }
45
46    /// Get recent invocations (last 7 days).
47    pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
48        let conn = self.connection()?;
49
50        let sql = format!(
51            r#"
52            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
53            FROM recent_invocations
54            LIMIT {}
55            "#,
56            limit
57        );
58
59        let mut stmt = match conn.prepare(&sql) {
60            Ok(stmt) => stmt,
61            Err(e) => {
62                if e.to_string().contains("No files found") {
63                    return Ok(Vec::new());
64                }
65                return Err(e.into());
66            }
67        };
68
69        let rows = stmt.query_map([], |row| {
70            Ok(InvocationSummary {
71                id: row.get(0)?,
72                cmd: row.get(1)?,
73                exit_code: row.get(2)?,
74                timestamp: row.get(3)?,
75                duration_ms: row.get(4)?,
76            })
77        });
78
79        match rows {
80            Ok(rows) => {
81                let mut results = Vec::new();
82                for row in rows {
83                    results.push(row?);
84                }
85                Ok(results)
86            }
87            Err(e) => {
88                if e.to_string().contains("No files found") {
89                    Ok(Vec::new())
90                } else {
91                    Err(e.into())
92                }
93            }
94        }
95    }
96
97    /// Get the last invocation (most recent).
98    pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
99        let invocations = self.recent_invocations(1)?;
100        Ok(invocations.into_iter().next())
101    }
102
103    /// Query invocations with filters from the query micro-language.
104    ///
105    /// Supports:
106    /// - `~N` range selector (limit to N results)
107    /// - `%exit<>0` field filters (exit code, duration, etc.)
108    /// - `%/pattern/` command regex
109    ///
110    /// Use `default_limit` to specify the limit when no range is provided:
111    /// - 20 for listing commands (shq i)
112    /// - 1 for single-item commands (shq o, shq I, shq R)
113    pub fn query_invocations_with_limit(
114        &self,
115        query: &Query,
116        default_limit: usize,
117    ) -> Result<Vec<InvocationSummary>> {
118        let conn = self.connection()?;
119
120        // Build WHERE clauses from query filters
121        let mut where_clauses: Vec<String> = Vec::new();
122
123        for component in &query.filters {
124            match component {
125                QueryComponent::CommandRegex(pattern) => {
126                    // Use regexp_matches for regex filtering
127                    let escaped = pattern.replace('\'', "''");
128                    where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
129                }
130                QueryComponent::FieldFilter(filter) => {
131                    // Map field names to SQL column names
132                    let column = match filter.field.as_str() {
133                        "exit" | "exit_code" => "exit_code",
134                        "duration" | "duration_ms" => "duration_ms",
135                        "cmd" | "command" => "cmd",
136                        "cwd" => "cwd",
137                        other => other, // Pass through unknown fields
138                    };
139
140                    let escaped_value = filter.value.replace('\'', "''");
141
142                    let clause = match filter.op {
143                        CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
144                        CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
145                        CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
146                        CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
147                        CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
148                        CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
149                        CompareOp::Regex => {
150                            format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
151                        }
152                    };
153                    where_clauses.push(clause);
154                }
155                QueryComponent::Tag(_) => {
156                    // Tags not implemented in MVP
157                }
158            }
159        }
160
161        // Build the SQL query
162        let where_sql = if where_clauses.is_empty() {
163            String::new()
164        } else {
165            format!("WHERE {}", where_clauses.join(" AND "))
166        };
167
168        // Determine limit and offset based on range selector semantics:
169        // - Single item (end: None): position N means OFFSET N-1, LIMIT 1
170        // - Last N (end: Some(0)): LIMIT N
171        // - Range (end: Some(M)): positions from N to M
172        let (limit, offset) = if let Some(range) = query.range {
173            if range.is_single() {
174                // ~N = single item at position N (1-indexed)
175                (1, range.start.saturating_sub(1))
176            } else if range.is_last_n() {
177                // ~N: = last N items
178                (range.start, 0)
179            } else {
180                // ~N:~M = range from position N to M
181                // Position N is offset N-1, position M is offset M-1
182                // So we need M items starting at offset (start - 1)
183                let end_pos = range.end.unwrap_or(1);
184                let count = range.start.saturating_sub(end_pos) + 1;
185                (count, end_pos.saturating_sub(1))
186            }
187        } else {
188            (default_limit, 0)
189        };
190
191        let sql = format!(
192            r#"
193            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
194            FROM recent_invocations
195            {}
196            LIMIT {}
197            OFFSET {}
198            "#,
199            where_sql, limit, offset
200        );
201
202        let mut stmt = match conn.prepare(&sql) {
203            Ok(stmt) => stmt,
204            Err(e) => {
205                if e.to_string().contains("No files found") {
206                    return Ok(Vec::new());
207                }
208                return Err(e.into());
209            }
210        };
211
212        let rows = stmt.query_map([], |row| {
213            Ok(InvocationSummary {
214                id: row.get(0)?,
215                cmd: row.get(1)?,
216                exit_code: row.get(2)?,
217                timestamp: row.get(3)?,
218                duration_ms: row.get(4)?,
219            })
220        });
221
222        match rows {
223            Ok(rows) => {
224                let mut results = Vec::new();
225                for row in rows {
226                    results.push(row?);
227                }
228                Ok(results)
229            }
230            Err(e) => {
231                if e.to_string().contains("No files found") {
232                    Ok(Vec::new())
233                } else {
234                    Err(e.into())
235                }
236            }
237        }
238    }
239
240    /// Query invocations with default limit of 20 (for listing).
241    pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
242        self.query_invocations_with_limit(query, 20)
243    }
244
245    /// Count total invocations in the store.
246    pub fn invocation_count(&self) -> Result<i64> {
247        let conn = self.connection()?;
248
249        let result: std::result::Result<i64, _> =
250            conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
251
252        match result {
253            Ok(count) => Ok(count),
254            Err(e) => {
255                if e.to_string().contains("No files found") {
256                    Ok(0)
257                } else {
258                    Err(e.into())
259                }
260            }
261        }
262    }
263
264    /// Find an invocation by its tag.
265    /// Returns the full invocation ID if found.
266    pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
267        let conn = self.connection()?;
268
269        // Normalize tag (remove leading : if present)
270        let tag = tag.trim_start_matches(':');
271
272        let result: std::result::Result<String, _> = conn.query_row(
273            "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
274            params![tag],
275            |row| row.get(0),
276        );
277
278        match result {
279            Ok(id) => Ok(Some(id)),
280            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
281            Err(e) => Err(e.into()),
282        }
283    }
284
285    /// Set or update the tag on an invocation.
286    ///
287    /// V5 schema: Updates the tag on the attempts table.
288    pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
289        let conn = self.connection()?;
290
291        conn.execute(
292            "UPDATE local.attempts SET tag = ? WHERE id = ?",
293            params![tag, invocation_id],
294        )?;
295
296        Ok(())
297    }
298
299    /// Recover orphaned invocations (v5 schema).
300    ///
301    /// V5: Scans attempts without outcomes and checks if the runner is still alive.
302    /// If not alive, writes an orphaned outcome record.
303    ///
304    /// Note: This now looks at machine_id field which stores the runner_id for local invocations.
305    pub fn recover_orphaned_invocations(
306        &self,
307        max_age_hours: u32,
308        dry_run: bool,
309    ) -> Result<super::pending::RecoveryStats> {
310        use super::pending::{is_runner_alive, RecoveryStats};
311
312        // V5: Get pending attempts (attempts without outcomes)
313        let pending_attempts = self.get_pending_attempts()?;
314        let mut stats = RecoveryStats::default();
315
316        let now = chrono::Utc::now();
317        let max_age = chrono::Duration::hours(max_age_hours as i64);
318
319        for attempt in pending_attempts {
320            stats.pending_checked += 1;
321
322            // Check if too old (runner ID might have been recycled)
323            let age = now.signed_duration_since(attempt.timestamp);
324            let is_stale = age > max_age;
325
326            // Check if runner is still alive (machine_id stores runner_id for local invocations)
327            let runner_alive = if let Some(ref runner_id) = attempt.machine_id {
328                !is_stale && is_runner_alive(runner_id)
329            } else {
330                // No runner_id means we can't check - mark as orphaned if stale
331                !is_stale
332            };
333
334            if runner_alive {
335                stats.still_running += 1;
336                continue;
337            }
338
339            if dry_run {
340                stats.orphaned += 1;
341                continue;
342            }
343
344            // V5: Write orphaned outcome record
345            match self.orphan_invocation(attempt.id, attempt.date) {
346                Ok(()) => {
347                    stats.orphaned += 1;
348                }
349                Err(_) => {
350                    stats.errors += 1;
351                }
352            }
353        }
354
355        Ok(stats)
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use crate::init::initialize;
363    use crate::Config;
364    use tempfile::TempDir;
365
366    fn setup_store() -> (TempDir, Store) {
367        let tmp = TempDir::new().unwrap();
368        let config = Config::with_root(tmp.path());
369        initialize(&config).unwrap();
370        let store = Store::open(config).unwrap();
371        (tmp, store)
372    }
373
374    #[test]
375    fn test_write_and_count_invocation() {
376        let (_tmp, store) = setup_store();
377
378        let record = InvocationRecord::new(
379            "test-session",
380            "make test",
381            "/home/user/project",
382            0,
383            "test@client",
384        );
385
386        store.write_invocation(&record).unwrap();
387
388        let count = store.invocation_count().unwrap();
389        assert_eq!(count, 1);
390    }
391
392    #[test]
393    fn test_write_and_query_invocation() {
394        let (_tmp, store) = setup_store();
395
396        let record = InvocationRecord::new(
397            "test-session",
398            "cargo build",
399            "/home/user/project",
400            0,
401            "test@client",
402        )
403        .with_duration(1500);
404
405        store.write_invocation(&record).unwrap();
406
407        // Query using SQL
408        let result = store
409            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
410            .unwrap();
411
412        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
413        assert_eq!(result.rows.len(), 1);
414        assert_eq!(result.rows[0][0], "cargo build");
415        assert_eq!(result.rows[0][1], "0");
416        assert_eq!(result.rows[0][2], "1500");
417    }
418
419    #[test]
420    fn test_recent_invocations_empty() {
421        let (_tmp, store) = setup_store();
422
423        let recent = store.recent_invocations(10).unwrap();
424        assert!(recent.is_empty());
425    }
426
427    #[test]
428    fn test_recent_invocations() {
429        let (_tmp, store) = setup_store();
430
431        // Write a few invocations
432        for i in 0..3 {
433            let record = InvocationRecord::new(
434                "test-session",
435                format!("command-{}", i),
436                "/home/user",
437                i,
438                "test@client",
439            );
440            store.write_invocation(&record).unwrap();
441        }
442
443        let recent = store.recent_invocations(10).unwrap();
444        assert_eq!(recent.len(), 3);
445    }
446
447    #[test]
448    fn test_atomic_parquet_no_temp_files() {
449        let (_tmp, store) = setup_store();
450
451        let record = InvocationRecord::new(
452            "test-session",
453            "test",
454            "/home/user",
455            0,
456            "test@client",
457        );
458        store.write_invocation(&record).unwrap();
459
460        // V5: Check no .tmp files in attempts directory
461        let date = record.date();
462        let attempts_dir = store.config().attempts_dir(&date);
463        let temps: Vec<_> = std::fs::read_dir(&attempts_dir)
464            .unwrap()
465            .filter_map(|e| e.ok())
466            .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
467            .collect();
468        assert!(
469            temps.is_empty(),
470            "No temp files should remain in {:?}",
471            attempts_dir
472        );
473    }
474
475    // DuckDB mode tests
476
477    fn setup_store_duckdb() -> (TempDir, Store) {
478        let tmp = TempDir::new().unwrap();
479        let config = Config::with_duckdb_mode(tmp.path());
480        initialize(&config).unwrap();
481        let store = Store::open(config).unwrap();
482        (tmp, store)
483    }
484
485    #[test]
486    fn test_duckdb_mode_write_and_count_invocation() {
487        let (_tmp, store) = setup_store_duckdb();
488
489        let record = InvocationRecord::new(
490            "test-session",
491            "make test",
492            "/home/user/project",
493            0,
494            "test@client",
495        );
496
497        store.write_invocation(&record).unwrap();
498
499        let count = store.invocation_count().unwrap();
500        assert_eq!(count, 1);
501    }
502
503    #[test]
504    fn test_duckdb_mode_write_and_query_invocation() {
505        let (_tmp, store) = setup_store_duckdb();
506
507        let record = InvocationRecord::new(
508            "test-session",
509            "cargo build",
510            "/home/user/project",
511            0,
512            "test@client",
513        )
514        .with_duration(1500);
515
516        store.write_invocation(&record).unwrap();
517
518        // Query using SQL
519        let result = store
520            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
521            .unwrap();
522
523        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
524        assert_eq!(result.rows.len(), 1);
525        assert_eq!(result.rows[0][0], "cargo build");
526        assert_eq!(result.rows[0][1], "0");
527        assert_eq!(result.rows[0][2], "1500");
528    }
529
530    #[test]
531    fn test_duckdb_mode_recent_invocations() {
532        let (_tmp, store) = setup_store_duckdb();
533
534        // Write a few invocations
535        for i in 0..3 {
536            let record = InvocationRecord::new(
537                "test-session",
538                format!("command-{}", i),
539                "/home/user",
540                i,
541                "test@client",
542            );
543            store.write_invocation(&record).unwrap();
544        }
545
546        let recent = store.recent_invocations(10).unwrap();
547        assert_eq!(recent.len(), 3);
548    }
549
550    #[test]
551    fn test_duckdb_mode_no_parquet_files() {
552        let (tmp, store) = setup_store_duckdb();
553
554        let record = InvocationRecord::new(
555            "test-session",
556            "test",
557            "/home/user",
558            0,
559            "test@client",
560        );
561        store.write_invocation(&record).unwrap();
562
563        // Check that no parquet files were created in recent/invocations
564        let invocations_dir = tmp.path().join("db/data/recent/invocations");
565        if invocations_dir.exists() {
566            let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
567                .unwrap()
568                .filter_map(|e| e.ok())
569                .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
570                .collect();
571            assert!(
572                parquet_files.is_empty(),
573                "DuckDB mode should not create parquet files"
574            );
575        }
576    }
577
578    #[test]
579    fn test_pending_invocation_lifecycle() {
580        let (_tmp, store) = setup_store();
581
582        // V5: Use AttemptRecord and OutcomeRecord for pending lifecycle
583        use crate::schema::AttemptRecord;
584
585        // Create an attempt (invocation start)
586        let attempt = AttemptRecord::new(
587            "test-session",
588            "long-running-command",
589            "/home/user",
590            "test@client",
591        );
592
593        // Start the invocation (writes attempt, no outcome)
594        store.start_invocation(&attempt).unwrap();
595
596        // Verify attempt was written
597        let count = store.attempt_count().unwrap();
598        assert_eq!(count, 1, "Attempt should be written");
599
600        // Verify invocation shows as pending (no outcome yet)
601        let pending = store.get_pending_attempts().unwrap();
602        assert_eq!(pending.len(), 1, "Should have one pending attempt");
603        assert_eq!(pending[0].id, attempt.id);
604
605        // Complete the invocation (write outcome)
606        store.complete_invocation(attempt.id, 0, Some(100), attempt.date).unwrap();
607
608        // Verify outcome was written
609        let outcome_count = store.outcome_count().unwrap();
610        assert_eq!(outcome_count, 1, "Outcome should be written");
611
612        // Verify no pending attempts remain
613        let pending_after = store.get_pending_attempts().unwrap();
614        assert!(pending_after.is_empty(), "No pending attempts after completion");
615
616        // Verify invocation shows as completed in the view
617        let invocations = store.recent_invocations(10).unwrap();
618        assert_eq!(invocations.len(), 1, "Should have one invocation");
619        assert_eq!(invocations[0].exit_code, 0);
620    }
621
622    #[test]
623    fn test_recover_orphaned_invocations() {
624        let (_tmp, store) = setup_store();
625
626        // V5: Create an attempt without an outcome (simulating a crash)
627        // The machine_id field stores the runner_id (pid:NNNN format) for local invocations
628        use crate::schema::AttemptRecord;
629
630        let mut attempt = AttemptRecord::new(
631            "test-session",
632            "crashed-command",
633            "/home/user",
634            "test@client",
635        );
636        // Set machine_id to a dead PID (non-existent process) with pid: prefix
637        attempt.machine_id = Some("pid:999999999".to_string());
638
639        // Write attempt only (no outcome = pending status)
640        store.write_attempt(&attempt).unwrap();
641
642        // Verify attempt exists as pending
643        let pending = store.get_pending_attempts().unwrap();
644        assert_eq!(pending.len(), 1, "Should have one pending attempt");
645
646        // Run recovery (should mark as orphaned since PID doesn't exist)
647        let stats = store.recover_orphaned_invocations(24, false).unwrap();
648
649        assert_eq!(stats.pending_checked, 1);
650        assert_eq!(stats.orphaned, 1);
651        assert_eq!(stats.still_running, 0);
652
653        // Verify orphaned outcome was written
654        let outcome_count = store.outcome_count().unwrap();
655        assert_eq!(outcome_count, 1, "Orphaned outcome should be written");
656
657        // Verify no pending attempts remain
658        let pending_after = store.get_pending_attempts().unwrap();
659        assert!(pending_after.is_empty(), "No pending attempts after recovery");
660    }
661
662    #[test]
663    fn test_recover_skips_running_processes() {
664        let (_tmp, store) = setup_store();
665
666        // V5: Create an attempt with the current process PID (still alive)
667        use crate::schema::AttemptRecord;
668
669        let mut attempt = AttemptRecord::new(
670            "test-session",
671            "running-command",
672            "/home/user",
673            "test@client",
674        );
675        // Set machine_id to current process PID (still alive) with pid: prefix
676        attempt.machine_id = Some(format!("pid:{}", std::process::id()));
677
678        // Write attempt only (no outcome = pending status)
679        store.write_attempt(&attempt).unwrap();
680
681        // Verify attempt was written
682        let attempt_count = store.attempt_count().unwrap();
683        assert_eq!(attempt_count, 1, "Attempt should be written");
684
685        // Verify we can get pending attempts
686        let pending_before = store.get_pending_attempts().unwrap();
687        assert_eq!(pending_before.len(), 1, "Should have one pending attempt before recovery");
688
689        // Run recovery (should skip since process is still alive)
690        let stats = store.recover_orphaned_invocations(24, false).unwrap();
691
692        assert_eq!(stats.pending_checked, 1, "Should check one pending attempt");
693        assert_eq!(stats.still_running, 1, "Should detect process is still running");
694        assert_eq!(stats.orphaned, 0, "Should not orphan running process");
695
696        // Verify attempt is still pending (no outcome written)
697        let pending_after = store.get_pending_attempts().unwrap();
698        assert_eq!(pending_after.len(), 1, "Attempt should still be pending");
699    }
700}