Skip to main content

magic_bird/store/
invocations.rs

1//! Invocation storage operations.
2
3use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14/// Summary of an invocation (for listing).
15#[derive(Debug)]
16pub struct InvocationSummary {
17    pub id: String,
18    pub cmd: String,
19    pub exit_code: i32,
20    pub timestamp: String,
21    pub duration_ms: Option<i64>,
22}
23
24impl Store {
25    /// Write an invocation record to the store.
26    ///
27    /// Behavior depends on storage mode:
28    /// - Parquet: Creates a new Parquet file in the appropriate date partition
29    /// - DuckDB: Inserts directly into the local.invocations
30    pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31        match self.config.storage_mode {
32            StorageMode::Parquet => self.write_invocation_parquet(record),
33            StorageMode::DuckDB => self.write_invocation_duckdb(record),
34        }
35    }
36
37    /// Write invocation to a Parquet file (multi-writer safe).
38    fn write_invocation_parquet(&self, record: &InvocationRecord) -> Result<()> {
39        let conn = self.connection()?;
40        let date = record.date();
41
42        // Ensure the partition directory exists
43        let partition_dir = self.config.invocations_dir(&date);
44        fs::create_dir_all(&partition_dir)?;
45
46        // Generate filename: {session}--{executable}--{id}.parquet
47        let executable = record.executable.as_deref().unwrap_or("unknown");
48        let filename = format!(
49            "{}--{}--{}.parquet",
50            sanitize_filename(&record.session_id),
51            sanitize_filename(executable),
52            record.id
53        );
54        let file_path = partition_dir.join(&filename);
55
56        // Write via DuckDB using COPY
57        conn.execute_batch(
58            r#"
59            CREATE OR REPLACE TEMP TABLE temp_invocation (
60                id UUID,
61                session_id VARCHAR,
62                timestamp TIMESTAMP,
63                duration_ms BIGINT,
64                cwd VARCHAR,
65                cmd VARCHAR,
66                executable VARCHAR,
67                exit_code INTEGER,
68                format_hint VARCHAR,
69                client_id VARCHAR,
70                hostname VARCHAR,
71                username VARCHAR,
72                tag VARCHAR,
73                date DATE
74            );
75            "#,
76        )?;
77
78        conn.execute(
79            r#"
80            INSERT INTO temp_invocation VALUES (
81                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
82            )
83            "#,
84            params![
85                record.id.to_string(),
86                record.session_id,
87                record.timestamp.to_rfc3339(),
88                record.duration_ms,
89                record.cwd,
90                record.cmd,
91                record.executable,
92                record.exit_code,
93                record.format_hint,
94                record.client_id,
95                record.hostname,
96                record.username,
97                record.tag,
98                date.to_string(),
99            ],
100        )?;
101
102        // Atomic write: COPY to temp file, then rename
103        let temp_path = atomic::temp_path(&file_path);
104        conn.execute(
105            &format!(
106                "COPY temp_invocation TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
107                temp_path.display()
108            ),
109            [],
110        )?;
111        conn.execute("DROP TABLE temp_invocation", [])?;
112
113        // Rename temp to final (atomic on POSIX)
114        atomic::rename_into_place(&temp_path, &file_path)?;
115
116        Ok(())
117    }
118
119    /// Write invocation directly to DuckDB table.
120    fn write_invocation_duckdb(&self, record: &InvocationRecord) -> Result<()> {
121        let conn = self.connection()?;
122        let date = record.date();
123
124        conn.execute(
125            r#"
126            INSERT INTO local.invocations VALUES (
127                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
128            )
129            "#,
130            params![
131                record.id.to_string(),
132                record.session_id,
133                record.timestamp.to_rfc3339(),
134                record.duration_ms,
135                record.cwd,
136                record.cmd,
137                record.executable,
138                record.exit_code,
139                record.format_hint,
140                record.client_id,
141                record.hostname,
142                record.username,
143                record.tag,
144                date.to_string(),
145            ],
146        )?;
147
148        Ok(())
149    }
150
151    /// Get recent invocations (last 7 days).
152    pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
153        let conn = self.connection()?;
154
155        let sql = format!(
156            r#"
157            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
158            FROM recent_invocations
159            LIMIT {}
160            "#,
161            limit
162        );
163
164        let mut stmt = match conn.prepare(&sql) {
165            Ok(stmt) => stmt,
166            Err(e) => {
167                if e.to_string().contains("No files found") {
168                    return Ok(Vec::new());
169                }
170                return Err(e.into());
171            }
172        };
173
174        let rows = stmt.query_map([], |row| {
175            Ok(InvocationSummary {
176                id: row.get(0)?,
177                cmd: row.get(1)?,
178                exit_code: row.get(2)?,
179                timestamp: row.get(3)?,
180                duration_ms: row.get(4)?,
181            })
182        });
183
184        match rows {
185            Ok(rows) => {
186                let mut results = Vec::new();
187                for row in rows {
188                    results.push(row?);
189                }
190                Ok(results)
191            }
192            Err(e) => {
193                if e.to_string().contains("No files found") {
194                    Ok(Vec::new())
195                } else {
196                    Err(e.into())
197                }
198            }
199        }
200    }
201
202    /// Get the last invocation (most recent).
203    pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
204        let invocations = self.recent_invocations(1)?;
205        Ok(invocations.into_iter().next())
206    }
207
208    /// Query invocations with filters from the query micro-language.
209    ///
210    /// Supports:
211    /// - `~N` range selector (limit to N results)
212    /// - `%exit<>0` field filters (exit code, duration, etc.)
213    /// - `%/pattern/` command regex
214    ///
215    /// Use `default_limit` to specify the limit when no range is provided:
216    /// - 20 for listing commands (shq i)
217    /// - 1 for single-item commands (shq o, shq I, shq R)
218    pub fn query_invocations_with_limit(
219        &self,
220        query: &Query,
221        default_limit: usize,
222    ) -> Result<Vec<InvocationSummary>> {
223        let conn = self.connection()?;
224
225        // Build WHERE clauses from query filters
226        let mut where_clauses: Vec<String> = Vec::new();
227
228        for component in &query.filters {
229            match component {
230                QueryComponent::CommandRegex(pattern) => {
231                    // Use regexp_matches for regex filtering
232                    let escaped = pattern.replace('\'', "''");
233                    where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
234                }
235                QueryComponent::FieldFilter(filter) => {
236                    // Map field names to SQL column names
237                    let column = match filter.field.as_str() {
238                        "exit" | "exit_code" => "exit_code",
239                        "duration" | "duration_ms" => "duration_ms",
240                        "cmd" | "command" => "cmd",
241                        "cwd" => "cwd",
242                        other => other, // Pass through unknown fields
243                    };
244
245                    let escaped_value = filter.value.replace('\'', "''");
246
247                    let clause = match filter.op {
248                        CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
249                        CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
250                        CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
251                        CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
252                        CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
253                        CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
254                        CompareOp::Regex => {
255                            format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
256                        }
257                    };
258                    where_clauses.push(clause);
259                }
260                QueryComponent::Tag(_) => {
261                    // Tags not implemented in MVP
262                }
263            }
264        }
265
266        // Build the SQL query
267        let where_sql = if where_clauses.is_empty() {
268            String::new()
269        } else {
270            format!("WHERE {}", where_clauses.join(" AND "))
271        };
272
273        let limit = query.range.map(|r| r.start).unwrap_or(default_limit);
274
275        let sql = format!(
276            r#"
277            SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
278            FROM recent_invocations
279            {}
280            LIMIT {}
281            "#,
282            where_sql, limit
283        );
284
285        let mut stmt = match conn.prepare(&sql) {
286            Ok(stmt) => stmt,
287            Err(e) => {
288                if e.to_string().contains("No files found") {
289                    return Ok(Vec::new());
290                }
291                return Err(e.into());
292            }
293        };
294
295        let rows = stmt.query_map([], |row| {
296            Ok(InvocationSummary {
297                id: row.get(0)?,
298                cmd: row.get(1)?,
299                exit_code: row.get(2)?,
300                timestamp: row.get(3)?,
301                duration_ms: row.get(4)?,
302            })
303        });
304
305        match rows {
306            Ok(rows) => {
307                let mut results = Vec::new();
308                for row in rows {
309                    results.push(row?);
310                }
311                Ok(results)
312            }
313            Err(e) => {
314                if e.to_string().contains("No files found") {
315                    Ok(Vec::new())
316                } else {
317                    Err(e.into())
318                }
319            }
320        }
321    }
322
323    /// Query invocations with default limit of 20 (for listing).
324    pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
325        self.query_invocations_with_limit(query, 20)
326    }
327
328    /// Count total invocations in the store.
329    pub fn invocation_count(&self) -> Result<i64> {
330        let conn = self.connection()?;
331
332        let result: std::result::Result<i64, _> =
333            conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
334
335        match result {
336            Ok(count) => Ok(count),
337            Err(e) => {
338                if e.to_string().contains("No files found") {
339                    Ok(0)
340                } else {
341                    Err(e.into())
342                }
343            }
344        }
345    }
346
347    /// Find an invocation by its tag.
348    /// Returns the full invocation ID if found.
349    pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
350        let conn = self.connection()?;
351
352        // Normalize tag (remove leading : if present)
353        let tag = tag.trim_start_matches(':');
354
355        let result: std::result::Result<String, _> = conn.query_row(
356            "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
357            params![tag],
358            |row| row.get(0),
359        );
360
361        match result {
362            Ok(id) => Ok(Some(id)),
363            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
364            Err(e) => Err(e.into()),
365        }
366    }
367
368    /// Set or update the tag on an invocation.
369    pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
370        let conn = self.connection()?;
371
372        conn.execute(
373            "UPDATE local.invocations SET tag = ? WHERE id = ?",
374            params![tag, invocation_id],
375        )?;
376
377        Ok(())
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::init::initialize;
385    use crate::Config;
386    use tempfile::TempDir;
387
388    fn setup_store() -> (TempDir, Store) {
389        let tmp = TempDir::new().unwrap();
390        let config = Config::with_root(tmp.path());
391        initialize(&config).unwrap();
392        let store = Store::open(config).unwrap();
393        (tmp, store)
394    }
395
396    #[test]
397    fn test_write_and_count_invocation() {
398        let (_tmp, store) = setup_store();
399
400        let record = InvocationRecord::new(
401            "test-session",
402            "make test",
403            "/home/user/project",
404            0,
405            "test@client",
406        );
407
408        store.write_invocation(&record).unwrap();
409
410        let count = store.invocation_count().unwrap();
411        assert_eq!(count, 1);
412    }
413
414    #[test]
415    fn test_write_and_query_invocation() {
416        let (_tmp, store) = setup_store();
417
418        let record = InvocationRecord::new(
419            "test-session",
420            "cargo build",
421            "/home/user/project",
422            0,
423            "test@client",
424        )
425        .with_duration(1500);
426
427        store.write_invocation(&record).unwrap();
428
429        // Query using SQL
430        let result = store
431            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
432            .unwrap();
433
434        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
435        assert_eq!(result.rows.len(), 1);
436        assert_eq!(result.rows[0][0], "cargo build");
437        assert_eq!(result.rows[0][1], "0");
438        assert_eq!(result.rows[0][2], "1500");
439    }
440
441    #[test]
442    fn test_recent_invocations_empty() {
443        let (_tmp, store) = setup_store();
444
445        let recent = store.recent_invocations(10).unwrap();
446        assert!(recent.is_empty());
447    }
448
449    #[test]
450    fn test_recent_invocations() {
451        let (_tmp, store) = setup_store();
452
453        // Write a few invocations
454        for i in 0..3 {
455            let record = InvocationRecord::new(
456                "test-session",
457                format!("command-{}", i),
458                "/home/user",
459                i,
460                "test@client",
461            );
462            store.write_invocation(&record).unwrap();
463        }
464
465        let recent = store.recent_invocations(10).unwrap();
466        assert_eq!(recent.len(), 3);
467    }
468
469    #[test]
470    fn test_atomic_parquet_no_temp_files() {
471        let (_tmp, store) = setup_store();
472
473        let record = InvocationRecord::new(
474            "test-session",
475            "test",
476            "/home/user",
477            0,
478            "test@client",
479        );
480        store.write_invocation(&record).unwrap();
481
482        // Check no .tmp files in invocations directory
483        let date = record.date();
484        let inv_dir = store.config().invocations_dir(&date);
485        let temps: Vec<_> = std::fs::read_dir(&inv_dir)
486            .unwrap()
487            .filter_map(|e| e.ok())
488            .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
489            .collect();
490        assert!(
491            temps.is_empty(),
492            "No temp files should remain in {:?}",
493            inv_dir
494        );
495    }
496
497    // DuckDB mode tests
498
499    fn setup_store_duckdb() -> (TempDir, Store) {
500        let tmp = TempDir::new().unwrap();
501        let config = Config::with_duckdb_mode(tmp.path());
502        initialize(&config).unwrap();
503        let store = Store::open(config).unwrap();
504        (tmp, store)
505    }
506
507    #[test]
508    fn test_duckdb_mode_write_and_count_invocation() {
509        let (_tmp, store) = setup_store_duckdb();
510
511        let record = InvocationRecord::new(
512            "test-session",
513            "make test",
514            "/home/user/project",
515            0,
516            "test@client",
517        );
518
519        store.write_invocation(&record).unwrap();
520
521        let count = store.invocation_count().unwrap();
522        assert_eq!(count, 1);
523    }
524
525    #[test]
526    fn test_duckdb_mode_write_and_query_invocation() {
527        let (_tmp, store) = setup_store_duckdb();
528
529        let record = InvocationRecord::new(
530            "test-session",
531            "cargo build",
532            "/home/user/project",
533            0,
534            "test@client",
535        )
536        .with_duration(1500);
537
538        store.write_invocation(&record).unwrap();
539
540        // Query using SQL
541        let result = store
542            .query("SELECT cmd, exit_code, duration_ms FROM invocations")
543            .unwrap();
544
545        assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
546        assert_eq!(result.rows.len(), 1);
547        assert_eq!(result.rows[0][0], "cargo build");
548        assert_eq!(result.rows[0][1], "0");
549        assert_eq!(result.rows[0][2], "1500");
550    }
551
552    #[test]
553    fn test_duckdb_mode_recent_invocations() {
554        let (_tmp, store) = setup_store_duckdb();
555
556        // Write a few invocations
557        for i in 0..3 {
558            let record = InvocationRecord::new(
559                "test-session",
560                format!("command-{}", i),
561                "/home/user",
562                i,
563                "test@client",
564            );
565            store.write_invocation(&record).unwrap();
566        }
567
568        let recent = store.recent_invocations(10).unwrap();
569        assert_eq!(recent.len(), 3);
570    }
571
572    #[test]
573    fn test_duckdb_mode_no_parquet_files() {
574        let (tmp, store) = setup_store_duckdb();
575
576        let record = InvocationRecord::new(
577            "test-session",
578            "test",
579            "/home/user",
580            0,
581            "test@client",
582        );
583        store.write_invocation(&record).unwrap();
584
585        // Check that no parquet files were created in recent/invocations
586        let invocations_dir = tmp.path().join("db/data/recent/invocations");
587        if invocations_dir.exists() {
588            let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
589                .unwrap()
590                .filter_map(|e| e.ok())
591                .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
592                .collect();
593            assert!(
594                parquet_files.is_empty(),
595                "DuckDB mode should not create parquet files"
596            );
597        }
598    }
599}