Skip to main content

magic_bird/store/
events.rs

1//! Event storage operations - parsing and querying log events.
2
3use std::fs;
4
5use chrono::NaiveDate;
6use duckdb::params;
7use serde::Deserialize;
8use uuid::Uuid;
9
10use super::atomic;
11use super::Store;
12use crate::config::StorageMode;
13use crate::schema::EventRecord;
14use crate::{Error, Result};
15
16/// A format detection rule from event-formats.toml.
17#[derive(Debug, Clone, Deserialize)]
18pub struct FormatRule {
19    /// Glob pattern to match against command string.
20    pub pattern: String,
21    /// Format to use if pattern matches.
22    pub format: String,
23}
24
25/// Default format configuration.
26#[derive(Debug, Clone, Deserialize)]
27pub struct DefaultFormat {
28    /// Default format when no rules match.
29    pub format: String,
30}
31
32/// Event format configuration.
33#[derive(Debug, Clone, Deserialize)]
34pub struct FormatConfig {
35    /// List of format detection rules.
36    #[serde(default)]
37    pub rules: Vec<FormatRule>,
38    /// Default format configuration.
39    #[serde(default)]
40    pub default: Option<DefaultFormat>,
41}
42
43impl Default for FormatConfig {
44    fn default() -> Self {
45        Self {
46            rules: Vec::new(),
47            default: Some(DefaultFormat {
48                format: "auto".to_string(),
49            }),
50        }
51    }
52}
53
54impl FormatConfig {
55    /// Load format config from a TOML file.
56    pub fn load(path: &std::path::Path) -> Result<Self> {
57        if path.exists() {
58            let contents = fs::read_to_string(path)?;
59            toml::from_str(&contents)
60                .map_err(|e| Error::Config(format!("Failed to parse event-formats.toml: {}", e)))
61        } else {
62            Ok(Self::default())
63        }
64    }
65
66    /// Detect format for a command string.
67    /// Patterns use simple glob-like matching:
68    /// - `*` matches any characters (including none)
69    /// - Patterns are case-sensitive
70    pub fn detect_format(&self, cmd: &str) -> String {
71        // Check rules in order
72        for rule in &self.rules {
73            if pattern_matches(&rule.pattern, cmd) {
74                return rule.format.clone();
75            }
76        }
77
78        // Fall back to default
79        self.default
80            .as_ref()
81            .map(|d| d.format.clone())
82            .unwrap_or_else(|| "auto".to_string())
83    }
84}
85
86/// Convert a glob pattern to SQL LIKE pattern.
87/// `*` becomes `%`, `?` becomes `_`, and special chars are escaped.
88fn glob_to_like(pattern: &str) -> String {
89    let mut result = String::with_capacity(pattern.len() + 10);
90    for c in pattern.chars() {
91        match c {
92            '*' => result.push('%'),
93            '?' => result.push('_'),
94            '%' => result.push_str("\\%"),
95            '_' => result.push_str("\\_"),
96            '\\' => result.push_str("\\\\"),
97            _ => result.push(c),
98        }
99    }
100    result
101}
102
103/// Simple glob pattern matching for in-memory FormatConfig.
104/// Used by FormatConfig::detect_format() when no database is available.
105fn pattern_matches(pattern: &str, text: &str) -> bool {
106    // Convert glob to regex-like matching
107    let parts: Vec<&str> = pattern.split('*').collect();
108
109    if parts.len() == 1 {
110        return pattern == text;
111    }
112
113    // First part must match at start (if not empty)
114    if !parts[0].is_empty() && !text.starts_with(parts[0]) {
115        return false;
116    }
117    let mut pos = parts[0].len();
118
119    // Middle parts must appear in order
120    for part in &parts[1..parts.len() - 1] {
121        if part.is_empty() {
122            continue;
123        }
124        match text[pos..].find(part) {
125            Some(found) => pos += found + part.len(),
126            None => return false,
127        }
128    }
129
130    // Last part must match at end (if not empty)
131    let last = parts[parts.len() - 1];
132    if !last.is_empty() && !text[pos..].ends_with(last) {
133        return false;
134    }
135
136    true
137}
138
139/// Summary of an event (for listing).
140#[derive(Debug)]
141pub struct EventSummary {
142    pub id: String,
143    pub invocation_id: String,
144    pub severity: Option<String>,
145    pub message: Option<String>,
146    pub ref_file: Option<String>,
147    pub ref_line: Option<i32>,
148    pub error_code: Option<String>,
149    pub test_name: Option<String>,
150    pub status: Option<String>,
151}
152
153/// Filters for querying events.
154#[derive(Debug, Default)]
155pub struct EventFilters {
156    /// Filter by severity (error, warning, info, note).
157    pub severity: Option<String>,
158    /// Filter by invocation ID.
159    pub invocation_id: Option<String>,
160    /// Filter by multiple invocation IDs (for last_n queries).
161    pub invocation_ids: Option<Vec<String>>,
162    /// Filter by command pattern (glob).
163    pub cmd_pattern: Option<String>,
164    /// Filter by client ID.
165    pub client_id: Option<String>,
166    /// Filter by hostname.
167    pub hostname: Option<String>,
168    /// Filter by date range start.
169    pub date_from: Option<NaiveDate>,
170    /// Filter by date range end.
171    pub date_to: Option<NaiveDate>,
172    /// Maximum number of events to return.
173    pub limit: Option<usize>,
174}
175
176impl Store {
177    /// Load format config from BIRD_ROOT/event-formats.toml.
178    pub fn load_format_config(&self) -> Result<FormatConfig> {
179        FormatConfig::load(&self.config.event_formats_path())
180    }
181
182    /// Detect format for a command using DuckDB SQL matching.
183    ///
184    /// Uses SQL LIKE patterns for matching, which prepares for future
185    /// integration with duck_hunt_match_command_patterns().
186    pub fn detect_format(&self, cmd: &str) -> Result<String> {
187        let config = self.load_format_config()?;
188
189        // If no rules, fall back to default
190        if config.rules.is_empty() {
191            return Ok(config
192                .default
193                .as_ref()
194                .map(|d| d.format.clone())
195                .unwrap_or_else(|| "auto".to_string()));
196        }
197
198        let conn = self.connection()?;
199
200        // Create temp table with rules (convert glob to LIKE patterns)
201        conn.execute_batch("CREATE OR REPLACE TEMP TABLE format_rules (priority INT, pattern VARCHAR, format VARCHAR)")?;
202
203        {
204            let mut stmt = conn.prepare("INSERT INTO format_rules VALUES (?, ?, ?)")?;
205            for (i, rule) in config.rules.iter().enumerate() {
206                let like_pattern = glob_to_like(&rule.pattern);
207                stmt.execute(params![i as i32, like_pattern, rule.format.clone()])?;
208            }
209        }
210
211        // Query for matching format using SQL LIKE
212        // This can be easily swapped with duck_hunt_match_command_patterns() later
213        let result: std::result::Result<String, _> = conn.query_row(
214            "SELECT format FROM format_rules WHERE ? LIKE pattern ORDER BY priority LIMIT 1",
215            params![cmd],
216            |row| row.get(0),
217        );
218
219        match result {
220            Ok(format) => Ok(format),
221            Err(_) => {
222                // No match - fall back to default
223                Ok(config
224                    .default
225                    .as_ref()
226                    .map(|d| d.format.clone())
227                    .unwrap_or_else(|| "auto".to_string()))
228            }
229        }
230    }
231
232    /// Extract events from an invocation's output using duck_hunt.
233    ///
234    /// Parses the stdout/stderr of an invocation and stores the extracted events.
235    /// Uses read_duck_hunt_log() directly on storage refs for efficiency - no content
236    /// is loaded into Rust memory.
237    ///
238    /// Returns the number of events extracted.
239    pub fn extract_events(
240        &self,
241        invocation_id: &str,
242        format_override: Option<&str>,
243    ) -> Result<usize> {
244        let conn = self.connection()?;
245
246        // Get invocation info for format detection and metadata
247        let (cmd, client_id, hostname, date): (String, String, Option<String>, String) = conn
248            .query_row(
249                "SELECT cmd, client_id, hostname, date::VARCHAR FROM invocations WHERE id = ?",
250                params![invocation_id],
251                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
252            )
253            .map_err(|e| Error::NotFound(format!("Invocation {}: {}", invocation_id, e)))?;
254
255        // Determine format to use
256        let format = match format_override {
257            Some(f) => f.to_string(),
258            None => self.detect_format(&cmd)?,
259        };
260
261        // Parse the date
262        let date = date
263            .parse::<NaiveDate>()
264            .map_err(|e| Error::Storage(format!("Invalid date: {}", e)))?;
265
266        // Ensure the events partition directory exists
267        let partition_dir = self.config.events_dir(&date);
268        fs::create_dir_all(&partition_dir)?;
269
270        // Get output storage refs for this invocation
271        // Note: DuckDB table functions don't support lateral joins with column refs,
272        // so we iterate in Rust and call read_duck_hunt_log with literal paths.
273        let data_dir = self.config.data_dir();
274        let mut stmt = conn.prepare(
275            "SELECT storage_ref FROM outputs
276             WHERE invocation_id = ? AND stream IN ('stdout', 'stderr', 'combined')",
277        )?;
278        let storage_refs: Vec<String> = stmt
279            .query_map(params![invocation_id], |row| row.get(0))?
280            .filter_map(|r| r.ok())
281            .collect();
282
283        if storage_refs.is_empty() {
284            return Ok(0);
285        }
286
287        // Create temp table for events
288        conn.execute_batch(
289            r#"
290            CREATE OR REPLACE TEMP TABLE temp_events (
291                id UUID,
292                invocation_id UUID,
293                client_id VARCHAR,
294                hostname VARCHAR,
295                event_type VARCHAR,
296                severity VARCHAR,
297                ref_file VARCHAR,
298                ref_line INTEGER,
299                ref_column INTEGER,
300                message VARCHAR,
301                error_code VARCHAR,
302                test_name VARCHAR,
303                status VARCHAR,
304                format_used VARCHAR,
305                date DATE
306            );
307            "#,
308        )?;
309
310        // Escape values for SQL
311        let escaped_format = format.replace("'", "''");
312        let escaped_client_id = client_id.replace("'", "''");
313        let hostname_sql = hostname
314            .as_ref()
315            .map(|h| format!("'{}'", h.replace("'", "''")))
316            .unwrap_or_else(|| "NULL".to_string());
317
318        // Process each output stream separately (DuckDB table functions need literal args)
319        for storage_ref in &storage_refs {
320            // Resolve file:// refs to absolute paths, pass data: refs through
321            let resolved_ref = if let Some(suffix) = storage_ref.strip_prefix("file://") {
322                data_dir.join(suffix).display().to_string()
323            } else {
324                storage_ref.clone()
325            };
326
327            let escaped_ref = resolved_ref.replace("'", "''");
328
329            let sql = format!(
330                r#"
331                INSERT INTO temp_events
332                SELECT
333                    uuid() as id,
334                    '{invocation_id}'::UUID as invocation_id,
335                    '{client_id}' as client_id,
336                    {hostname} as hostname,
337                    dh.event_type,
338                    dh.severity,
339                    dh.ref_file,
340                    dh.ref_line::INTEGER,
341                    dh.ref_column::INTEGER,
342                    dh.message,
343                    dh.error_code,
344                    dh.test_name,
345                    dh.status,
346                    '{format}' as format_used,
347                    '{date}'::DATE as date
348                FROM read_duck_hunt_log('{ref}', '{format}') dh
349                WHERE dh.event_type IS NOT NULL OR dh.message IS NOT NULL;
350                "#,
351                invocation_id = invocation_id,
352                client_id = escaped_client_id,
353                hostname = hostname_sql,
354                format = escaped_format,
355                date = date,
356                ref = escaped_ref,
357            );
358
359            if let Err(e) = conn.execute_batch(&sql) {
360                // duck_hunt might fail on some formats - log and continue to next stream
361                eprintln!("Warning: duck_hunt parsing failed for {}: {}", storage_ref, e);
362            }
363        }
364
365        // Count how many events were extracted
366        let count: i64 = conn.query_row("SELECT COUNT(*) FROM temp_events", [], |row| row.get(0))?;
367
368        if count == 0 {
369            conn.execute("DROP TABLE temp_events", [])?;
370            return Ok(0);
371        }
372
373        // Write to storage based on mode
374        match self.config.storage_mode {
375            StorageMode::Parquet => {
376                // Write to parquet file
377                let filename = format!("{}--{}.parquet", invocation_id, Uuid::now_v7());
378                let file_path = partition_dir.join(&filename);
379
380                let temp_path = atomic::temp_path(&file_path);
381                conn.execute(
382                    &format!(
383                        "COPY temp_events TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
384                        temp_path.display()
385                    ),
386                    [],
387                )?;
388                conn.execute("DROP TABLE temp_events", [])?;
389
390                // Rename temp to final (atomic on POSIX)
391                atomic::rename_into_place(&temp_path, &file_path)?;
392            }
393            StorageMode::DuckDB => {
394                // Insert directly into local.events
395                conn.execute_batch("INSERT INTO local.events SELECT * FROM temp_events")?;
396                conn.execute("DROP TABLE temp_events", [])?;
397            }
398        }
399
400        Ok(count as usize)
401    }
402
403    /// Write event records to the store.
404    ///
405    /// Behavior depends on storage mode:
406    /// - Parquet: Creates Parquet files partitioned by date
407    /// - DuckDB: Inserts directly into the local.events
408    pub fn write_events(&self, records: &[EventRecord]) -> Result<()> {
409        if records.is_empty() {
410            return Ok(());
411        }
412
413        match self.config.storage_mode {
414            StorageMode::Parquet => self.write_events_parquet(records),
415            StorageMode::DuckDB => self.write_events_duckdb(records),
416        }
417    }
418
419    /// Write events to Parquet files (multi-writer safe).
420    fn write_events_parquet(&self, records: &[EventRecord]) -> Result<()> {
421        let conn = self.connection()?;
422
423        // Group by date for partitioning
424        let mut by_date: std::collections::HashMap<NaiveDate, Vec<&EventRecord>> =
425            std::collections::HashMap::new();
426        for record in records {
427            by_date.entry(record.date).or_default().push(record);
428        }
429
430        for (date, date_records) in by_date {
431            let partition_dir = self.config.events_dir(&date);
432            fs::create_dir_all(&partition_dir)?;
433
434            // Create temp table
435            conn.execute_batch(
436                r#"
437                CREATE OR REPLACE TEMP TABLE temp_events (
438                    id UUID,
439                    invocation_id UUID,
440                    client_id VARCHAR,
441                    hostname VARCHAR,
442                    event_type VARCHAR,
443                    severity VARCHAR,
444                    ref_file VARCHAR,
445                    ref_line INTEGER,
446                    ref_column INTEGER,
447                    message VARCHAR,
448                    error_code VARCHAR,
449                    test_name VARCHAR,
450                    status VARCHAR,
451                    format_used VARCHAR,
452                    date DATE
453                );
454                "#,
455            )?;
456
457            // Insert records
458            for record in &date_records {
459                conn.execute(
460                    r#"
461                    INSERT INTO temp_events VALUES (
462                        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
463                    )
464                    "#,
465                    params![
466                        record.id.to_string(),
467                        record.invocation_id.to_string(),
468                        record.client_id,
469                        record.hostname,
470                        record.event_type,
471                        record.severity,
472                        record.ref_file,
473                        record.ref_line,
474                        record.ref_column,
475                        record.message,
476                        record.error_code,
477                        record.test_name,
478                        record.status,
479                        record.format_used,
480                        date.to_string(),
481                    ],
482                )?;
483            }
484
485            // Write to parquet
486            let filename = format!(
487                "{}--{}.parquet",
488                date_records[0].invocation_id,
489                Uuid::now_v7()
490            );
491            let file_path = partition_dir.join(&filename);
492
493            let temp_path = atomic::temp_path(&file_path);
494            conn.execute(
495                &format!(
496                    "COPY temp_events TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
497                    temp_path.display()
498                ),
499                [],
500            )?;
501            conn.execute("DROP TABLE temp_events", [])?;
502
503            atomic::rename_into_place(&temp_path, &file_path)?;
504        }
505
506        Ok(())
507    }
508
509    /// Write events directly to DuckDB table.
510    fn write_events_duckdb(&self, records: &[EventRecord]) -> Result<()> {
511        let conn = self.connection()?;
512
513        for record in records {
514            conn.execute(
515                r#"
516                INSERT INTO local.events VALUES (
517                    ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
518                )
519                "#,
520                params![
521                    record.id.to_string(),
522                    record.invocation_id.to_string(),
523                    record.client_id,
524                    record.hostname,
525                    record.event_type,
526                    record.severity,
527                    record.ref_file,
528                    record.ref_line,
529                    record.ref_column,
530                    record.message,
531                    record.error_code,
532                    record.test_name,
533                    record.status,
534                    record.format_used,
535                    record.date.to_string(),
536                ],
537            )?;
538        }
539
540        Ok(())
541    }
542
543    /// Query events with optional filters.
544    pub fn query_events(&self, filters: &EventFilters) -> Result<Vec<EventSummary>> {
545        let conn = self.connection()?;
546
547        // Build WHERE clause
548        let mut conditions = Vec::new();
549
550        if let Some(ref sev) = filters.severity {
551            conditions.push(format!("e.severity = '{}'", sev.replace("'", "''")));
552        }
553
554        if let Some(ref inv_id) = filters.invocation_id {
555            conditions.push(format!(
556                "e.invocation_id = '{}'",
557                inv_id.replace("'", "''")
558            ));
559        }
560
561        if let Some(ref inv_ids) = filters.invocation_ids {
562            if !inv_ids.is_empty() {
563                let ids_list: Vec<String> = inv_ids
564                    .iter()
565                    .map(|id| format!("'{}'", id.replace("'", "''")))
566                    .collect();
567                conditions.push(format!("e.invocation_id IN ({})", ids_list.join(", ")));
568            }
569        }
570
571        if let Some(ref client) = filters.client_id {
572            conditions.push(format!("e.client_id = '{}'", client.replace("'", "''")));
573        }
574
575        if let Some(ref host) = filters.hostname {
576            conditions.push(format!("e.hostname = '{}'", host.replace("'", "''")));
577        }
578
579        if let Some(ref date_from) = filters.date_from {
580            conditions.push(format!("e.date >= '{}'", date_from));
581        }
582
583        if let Some(ref date_to) = filters.date_to {
584            conditions.push(format!("e.date <= '{}'", date_to));
585        }
586
587        let where_clause = if conditions.is_empty() {
588            String::new()
589        } else {
590            format!("WHERE {}", conditions.join(" AND "))
591        };
592
593        let limit_clause = filters
594            .limit
595            .map(|l| format!("LIMIT {}", l))
596            .unwrap_or_default();
597
598        // If filtering by command pattern, we need to join with invocations
599        let sql = if filters.cmd_pattern.is_some() {
600            let cmd_pattern = filters.cmd_pattern.as_ref().unwrap().replace("'", "''");
601            let where_prefix = if conditions.is_empty() {
602                "WHERE".to_string()
603            } else {
604                format!("{} AND", where_clause)
605            };
606            format!(
607                r#"
608                SELECT
609                    e.id::VARCHAR,
610                    e.invocation_id::VARCHAR,
611                    e.severity,
612                    e.message,
613                    e.ref_file,
614                    e.ref_line,
615                    e.error_code,
616                    e.test_name,
617                    e.status
618                FROM events e
619                JOIN invocations i ON e.invocation_id = i.id
620                {}
621                i.cmd LIKE '%{}%'
622                ORDER BY i.timestamp DESC
623                {}
624                "#,
625                where_prefix,
626                cmd_pattern,
627                limit_clause
628            )
629        } else {
630            format!(
631                r#"
632                SELECT
633                    e.id::VARCHAR,
634                    e.invocation_id::VARCHAR,
635                    e.severity,
636                    e.message,
637                    e.ref_file,
638                    e.ref_line,
639                    e.error_code,
640                    e.test_name,
641                    e.status
642                FROM events e
643                {}
644                ORDER BY e.date DESC
645                {}
646                "#,
647                where_clause, limit_clause
648            )
649        };
650
651        let mut stmt = match conn.prepare(&sql) {
652            Ok(stmt) => stmt,
653            Err(e) => {
654                if e.to_string().contains("No files found") {
655                    return Ok(Vec::new());
656                }
657                return Err(e.into());
658            }
659        };
660
661        let rows = stmt.query_map([], |row| {
662            Ok(EventSummary {
663                id: row.get(0)?,
664                invocation_id: row.get(1)?,
665                severity: row.get(2)?,
666                message: row.get(3)?,
667                ref_file: row.get(4)?,
668                ref_line: row.get(5)?,
669                error_code: row.get(6)?,
670                test_name: row.get(7)?,
671                status: row.get(8)?,
672            })
673        });
674
675        match rows {
676            Ok(rows) => {
677                let mut results = Vec::new();
678                for row in rows {
679                    results.push(row?);
680                }
681                Ok(results)
682            }
683            Err(e) => {
684                if e.to_string().contains("No files found") {
685                    Ok(Vec::new())
686                } else {
687                    Err(e.into())
688                }
689            }
690        }
691    }
692
693    /// Count events matching the given filters.
694    pub fn event_count(&self, filters: &EventFilters) -> Result<i64> {
695        let conn = self.connection()?;
696
697        let mut conditions = Vec::new();
698
699        if let Some(ref sev) = filters.severity {
700            conditions.push(format!("severity = '{}'", sev.replace("'", "''")));
701        }
702
703        if let Some(ref inv_id) = filters.invocation_id {
704            conditions.push(format!(
705                "invocation_id = '{}'",
706                inv_id.replace("'", "''")
707            ));
708        }
709
710        if let Some(ref inv_ids) = filters.invocation_ids {
711            if !inv_ids.is_empty() {
712                let ids_list: Vec<String> = inv_ids
713                    .iter()
714                    .map(|id| format!("'{}'", id.replace("'", "''")))
715                    .collect();
716                conditions.push(format!("invocation_id IN ({})", ids_list.join(", ")));
717            }
718        }
719
720        if let Some(ref client) = filters.client_id {
721            conditions.push(format!("client_id = '{}'", client.replace("'", "''")));
722        }
723
724        let where_clause = if conditions.is_empty() {
725            String::new()
726        } else {
727            format!("WHERE {}", conditions.join(" AND "))
728        };
729
730        let sql = format!("SELECT COUNT(*) FROM events {}", where_clause);
731
732        let result: std::result::Result<i64, _> = conn.query_row(&sql, [], |row| row.get(0));
733
734        match result {
735            Ok(count) => Ok(count),
736            Err(e) => {
737                if e.to_string().contains("No files found") {
738                    Ok(0)
739                } else {
740                    Err(e.into())
741                }
742            }
743        }
744    }
745
746    /// Delete events for an invocation (for re-extraction).
747    ///
748    /// Behavior depends on storage mode:
749    /// - Parquet: Deletes parquet files containing the events
750    /// - DuckDB: Deletes rows from local.events
751    pub fn delete_events_for_invocation(&self, invocation_id: &str) -> Result<usize> {
752        match self.config.storage_mode {
753            StorageMode::Parquet => self.delete_events_parquet(invocation_id),
754            StorageMode::DuckDB => self.delete_events_duckdb(invocation_id),
755        }
756    }
757
758    /// Delete events from parquet files.
759    fn delete_events_parquet(&self, invocation_id: &str) -> Result<usize> {
760        // Since we're using parquet files, we need to find and delete the files
761        // This is a simplified approach - in production you might want to rewrite
762        // the parquet files without these records
763        let conn = self.connection()?;
764
765        // Get the date(s) for this invocation's events
766        let dates: Vec<String> = {
767            let sql = format!(
768                "SELECT DISTINCT date::VARCHAR FROM events WHERE invocation_id = '{}'",
769                invocation_id.replace("'", "''")
770            );
771            let mut stmt = match conn.prepare(&sql) {
772                Ok(stmt) => stmt,
773                Err(e) => {
774                    if e.to_string().contains("No files found") {
775                        return Ok(0);
776                    }
777                    return Err(e.into());
778                }
779            };
780            let rows = stmt.query_map([], |row| row.get(0))?;
781            rows.filter_map(|r| r.ok()).collect()
782        };
783
784        let mut deleted = 0;
785
786        for date_str in dates {
787            let date = date_str
788                .parse::<NaiveDate>()
789                .map_err(|e| Error::Storage(format!("Invalid date: {}", e)))?;
790
791            let partition_dir = self.config.events_dir(&date);
792
793            // Find and delete parquet files that start with this invocation_id
794            if partition_dir.exists() {
795                for entry in fs::read_dir(&partition_dir)? {
796                    let entry = entry?;
797                    let name = entry.file_name();
798                    let name_str = name.to_string_lossy();
799                    if name_str.starts_with(invocation_id) && name_str.ends_with(".parquet") {
800                        fs::remove_file(entry.path())?;
801                        deleted += 1;
802                    }
803                }
804            }
805        }
806
807        Ok(deleted)
808    }
809
810    /// Delete events from DuckDB table.
811    fn delete_events_duckdb(&self, invocation_id: &str) -> Result<usize> {
812        let conn = self.connection()?;
813
814        // Count events before deletion
815        let count: i64 = conn
816            .query_row(
817                "SELECT COUNT(*) FROM local.events WHERE invocation_id = ?",
818                params![invocation_id],
819                |row| row.get(0),
820            )
821            .unwrap_or(0);
822
823        if count > 0 {
824            conn.execute(
825                "DELETE FROM local.events WHERE invocation_id = ?",
826                params![invocation_id],
827            )?;
828        }
829
830        Ok(count as usize)
831    }
832
833    /// Get invocations that have outputs but no events extracted yet.
834    ///
835    /// Useful for backfilling events from existing invocations.
836    pub fn invocations_without_events(
837        &self,
838        since: Option<NaiveDate>,
839        limit: Option<usize>,
840    ) -> Result<Vec<super::InvocationSummary>> {
841        let conn = self.connection()?;
842
843        // Default to last 30 days if not specified
844        let since_date = since.unwrap_or_else(|| {
845            chrono::Utc::now().date_naive() - chrono::Duration::days(30)
846        });
847
848        let limit_clause = limit
849            .map(|l| format!("LIMIT {}", l))
850            .unwrap_or_else(|| "LIMIT 1000".to_string());
851
852        let sql = format!(
853            r#"
854            SELECT i.id::VARCHAR, i.cmd, i.exit_code, i.timestamp::VARCHAR, i.duration_ms
855            FROM invocations i
856            WHERE EXISTS (SELECT 1 FROM outputs o WHERE o.invocation_id = i.id)
857              AND NOT EXISTS (SELECT 1 FROM events e WHERE e.invocation_id = i.id)
858              AND i.date >= '{}'
859            ORDER BY i.timestamp DESC
860            {}
861            "#,
862            since_date, limit_clause
863        );
864
865        let mut stmt = match conn.prepare(&sql) {
866            Ok(stmt) => stmt,
867            Err(e) => {
868                // Handle case where tables don't exist yet
869                if e.to_string().contains("No files found") {
870                    return Ok(Vec::new());
871                }
872                return Err(e.into());
873            }
874        };
875
876        let rows = stmt.query_map([], |row| {
877            Ok(super::InvocationSummary {
878                id: row.get(0)?,
879                cmd: row.get(1)?,
880                exit_code: row.get(2)?,
881                timestamp: row.get(3)?,
882                duration_ms: row.get(4)?,
883            })
884        })?;
885
886        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
887        Ok(results)
888    }
889}
890
891#[cfg(test)]
892mod tests {
893    use super::*;
894    use crate::init::initialize;
895    use crate::Config;
896    use tempfile::TempDir;
897
898    fn setup_store() -> (TempDir, Store) {
899        let tmp = TempDir::new().unwrap();
900        let config = Config::with_root(tmp.path());
901        initialize(&config).unwrap();
902        let store = Store::open(config).unwrap();
903        (tmp, store)
904    }
905
906    #[test]
907    fn test_format_config_detect() {
908        let config = FormatConfig {
909            rules: vec![
910                FormatRule {
911                    pattern: "*gcc*".to_string(),
912                    format: "gcc".to_string(),
913                },
914                FormatRule {
915                    pattern: "*cargo test*".to_string(),
916                    format: "cargo_test_json".to_string(),
917                },
918            ],
919            default: Some(DefaultFormat {
920                format: "auto".to_string(),
921            }),
922        };
923
924        assert_eq!(config.detect_format("gcc -o foo foo.c"), "gcc");
925        assert_eq!(config.detect_format("/usr/bin/gcc main.c"), "gcc");
926        assert_eq!(config.detect_format("cargo test --release"), "cargo_test_json");
927        assert_eq!(config.detect_format("make test"), "auto");
928    }
929
930    #[test]
931    fn test_glob_to_like() {
932        use super::glob_to_like;
933
934        // Basic wildcards
935        assert_eq!(glob_to_like("*"), "%");
936        assert_eq!(glob_to_like("?"), "_");
937        assert_eq!(glob_to_like("*gcc*"), "%gcc%");
938        assert_eq!(glob_to_like("cargo test*"), "cargo test%");
939        assert_eq!(glob_to_like("*cargo test*"), "%cargo test%");
940
941        // Escape special LIKE chars
942        assert_eq!(glob_to_like("100%"), "100\\%");
943        assert_eq!(glob_to_like("file_name"), "file\\_name");
944
945        // Mixed
946        assert_eq!(glob_to_like("*test?file*"), "%test_file%");
947    }
948
949    #[test]
950    fn test_store_detect_format_sql() {
951        let (tmp, store) = setup_store();
952
953        // Write a config file with rules
954        let config_path = tmp.path().join("event-formats.toml");
955        std::fs::write(
956            &config_path,
957            r#"
958[[rules]]
959pattern = "*gcc*"
960format = "gcc"
961
962[[rules]]
963pattern = "*cargo test*"
964format = "cargo_test_json"
965
966[[rules]]
967pattern = "pytest*"
968format = "pytest_json"
969
970[default]
971format = "auto"
972"#,
973        )
974        .unwrap();
975
976        // Test SQL-based detection
977        assert_eq!(store.detect_format("gcc -o foo foo.c").unwrap(), "gcc");
978        assert_eq!(store.detect_format("/usr/bin/gcc main.c").unwrap(), "gcc");
979        assert_eq!(
980            store.detect_format("cargo test --release").unwrap(),
981            "cargo_test_json"
982        );
983        assert_eq!(store.detect_format("pytest tests/").unwrap(), "pytest_json");
984        assert_eq!(store.detect_format("make test").unwrap(), "auto");
985    }
986
987    #[test]
988    fn test_store_has_events_dir() {
989        let (tmp, store) = setup_store();
990        let date = chrono::Utc::now().date_naive();
991        let events_dir = store.config().events_dir(&date);
992        assert!(events_dir.starts_with(tmp.path()));
993        assert!(events_dir.to_string_lossy().contains("events"));
994    }
995
996    #[test]
997    fn test_query_events_empty() {
998        let (_tmp, store) = setup_store();
999
1000        let events = store.query_events(&EventFilters::default()).unwrap();
1001        assert!(events.is_empty());
1002    }
1003
1004    #[test]
1005    fn test_event_count_empty() {
1006        let (_tmp, store) = setup_store();
1007
1008        let count = store.event_count(&EventFilters::default()).unwrap();
1009        assert_eq!(count, 0);
1010    }
1011}