Skip to main content

magic_bird/store/
events.rs

1//! Event storage operations - parsing and querying log events.
2
3use std::fs;
4
5use chrono::NaiveDate;
6use duckdb::params;
7use serde::Deserialize;
8use uuid::Uuid;
9
10use super::atomic;
11use super::Store;
12use crate::config::StorageMode;
13use crate::schema::EventRecord;
14use crate::{Error, Result};
15
16/// A format detection rule from event-formats.toml.
17#[derive(Debug, Clone, Deserialize)]
18pub struct FormatRule {
19    /// Glob pattern to match against command string.
20    pub pattern: String,
21    /// Format to use if pattern matches.
22    pub format: String,
23}
24
25/// Default format configuration.
26#[derive(Debug, Clone, Deserialize)]
27pub struct DefaultFormat {
28    /// Default format when no rules match.
29    pub format: String,
30}
31
32/// Event format configuration.
33#[derive(Debug, Clone, Deserialize)]
34pub struct FormatConfig {
35    /// List of format detection rules.
36    #[serde(default)]
37    pub rules: Vec<FormatRule>,
38    /// Default format configuration.
39    #[serde(default)]
40    pub default: Option<DefaultFormat>,
41}
42
43impl Default for FormatConfig {
44    fn default() -> Self {
45        Self {
46            rules: Vec::new(),
47            default: Some(DefaultFormat {
48                format: "auto".to_string(),
49            }),
50        }
51    }
52}
53
54impl FormatConfig {
55    /// Load format config from a TOML file.
56    pub fn load(path: &std::path::Path) -> Result<Self> {
57        if path.exists() {
58            let contents = fs::read_to_string(path)?;
59            toml::from_str(&contents)
60                .map_err(|e| Error::Config(format!("Failed to parse event-formats.toml: {}", e)))
61        } else {
62            Ok(Self::default())
63        }
64    }
65
66    /// Detect format for a command string.
67    /// Patterns use simple glob-like matching:
68    /// - `*` matches any characters (including none)
69    /// - Patterns are case-sensitive
70    pub fn detect_format(&self, cmd: &str) -> String {
71        // Check rules in order
72        for rule in &self.rules {
73            if pattern_matches(&rule.pattern, cmd) {
74                return rule.format.clone();
75            }
76        }
77
78        // Fall back to default
79        self.default
80            .as_ref()
81            .map(|d| d.format.clone())
82            .unwrap_or_else(|| "auto".to_string())
83    }
84}
85
86/// Convert a glob pattern to SQL LIKE pattern.
87/// `*` becomes `%`, `?` becomes `_`, and special chars are escaped.
88fn glob_to_like(pattern: &str) -> String {
89    let mut result = String::with_capacity(pattern.len() + 10);
90    for c in pattern.chars() {
91        match c {
92            '*' => result.push('%'),
93            '?' => result.push('_'),
94            '%' => result.push_str("\\%"),
95            '_' => result.push_str("\\_"),
96            '\\' => result.push_str("\\\\"),
97            _ => result.push(c),
98        }
99    }
100    result
101}
102
103/// Simple glob pattern matching for in-memory FormatConfig.
104/// Used by FormatConfig::detect_format() when no database is available.
105fn pattern_matches(pattern: &str, text: &str) -> bool {
106    // Convert glob to regex-like matching
107    let parts: Vec<&str> = pattern.split('*').collect();
108
109    if parts.len() == 1 {
110        return pattern == text;
111    }
112
113    // First part must match at start (if not empty)
114    if !parts[0].is_empty() && !text.starts_with(parts[0]) {
115        return false;
116    }
117    let mut pos = parts[0].len();
118
119    // Middle parts must appear in order
120    for part in &parts[1..parts.len() - 1] {
121        if part.is_empty() {
122            continue;
123        }
124        match text[pos..].find(part) {
125            Some(found) => pos += found + part.len(),
126            None => return false,
127        }
128    }
129
130    // Last part must match at end (if not empty)
131    let last = parts[parts.len() - 1];
132    if !last.is_empty() && !text[pos..].ends_with(last) {
133        return false;
134    }
135
136    true
137}
138
139/// Summary of an event (for listing).
140#[derive(Debug)]
141pub struct EventSummary {
142    pub id: String,
143    pub invocation_id: String,
144    pub severity: Option<String>,
145    pub message: Option<String>,
146    pub ref_file: Option<String>,
147    pub ref_line: Option<i32>,
148    pub error_code: Option<String>,
149    pub test_name: Option<String>,
150    pub status: Option<String>,
151}
152
153/// Filters for querying events.
154#[derive(Debug, Default)]
155pub struct EventFilters {
156    /// Filter by severity (error, warning, info, note).
157    pub severity: Option<String>,
158    /// Filter by invocation ID.
159    pub invocation_id: Option<String>,
160    /// Filter by multiple invocation IDs (for last_n queries).
161    pub invocation_ids: Option<Vec<String>>,
162    /// Filter by command pattern (glob).
163    pub cmd_pattern: Option<String>,
164    /// Filter by client ID.
165    pub client_id: Option<String>,
166    /// Filter by hostname.
167    pub hostname: Option<String>,
168    /// Filter by date range start.
169    pub date_from: Option<NaiveDate>,
170    /// Filter by date range end.
171    pub date_to: Option<NaiveDate>,
172    /// Maximum number of events to return.
173    pub limit: Option<usize>,
174}
175
176impl Store {
177    /// Load format config from BIRD_ROOT/event-formats.toml.
178    pub fn load_format_config(&self) -> Result<FormatConfig> {
179        FormatConfig::load(&self.config.event_formats_path())
180    }
181
182    /// Detect format for a command using DuckDB SQL matching.
183    ///
184    /// Uses SQL LIKE patterns for matching, which prepares for future
185    /// integration with duck_hunt_match_command_patterns().
186    pub fn detect_format(&self, cmd: &str) -> Result<String> {
187        let config = self.load_format_config()?;
188
189        // If no rules, fall back to default
190        if config.rules.is_empty() {
191            return Ok(config
192                .default
193                .as_ref()
194                .map(|d| d.format.clone())
195                .unwrap_or_else(|| "auto".to_string()));
196        }
197
198        let conn = self.connection()?;
199
200        // Create temp table with rules (convert glob to LIKE patterns)
201        conn.execute_batch("CREATE OR REPLACE TEMP TABLE format_rules (priority INT, pattern VARCHAR, format VARCHAR)")?;
202
203        {
204            let mut stmt = conn.prepare("INSERT INTO format_rules VALUES (?, ?, ?)")?;
205            for (i, rule) in config.rules.iter().enumerate() {
206                let like_pattern = glob_to_like(&rule.pattern);
207                stmt.execute(params![i as i32, like_pattern, rule.format.clone()])?;
208            }
209        }
210
211        // Query for matching format using SQL LIKE
212        // This can be easily swapped with duck_hunt_match_command_patterns() later
213        let result: std::result::Result<String, _> = conn.query_row(
214            "SELECT format FROM format_rules WHERE ? LIKE pattern ORDER BY priority LIMIT 1",
215            params![cmd],
216            |row| row.get(0),
217        );
218
219        match result {
220            Ok(format) => Ok(format),
221            Err(_) => {
222                // No match - fall back to default
223                Ok(config
224                    .default
225                    .as_ref()
226                    .map(|d| d.format.clone())
227                    .unwrap_or_else(|| "auto".to_string()))
228            }
229        }
230    }
231
232    /// Extract events from an invocation's output using duck_hunt.
233    ///
234    /// Parses the stdout/stderr of an invocation and stores the extracted events.
235    /// Returns the number of events extracted.
236    pub fn extract_events(
237        &self,
238        invocation_id: &str,
239        format_override: Option<&str>,
240    ) -> Result<usize> {
241        let conn = self.connection()?;
242
243        // Get invocation info for format detection and metadata
244        let (cmd, client_id, hostname, date): (String, String, Option<String>, String) = conn
245            .query_row(
246                "SELECT cmd, client_id, hostname, date::VARCHAR FROM invocations WHERE id = ?",
247                params![invocation_id],
248                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
249            )
250            .map_err(|e| Error::NotFound(format!("Invocation {}: {}", invocation_id, e)))?;
251
252        // Determine format to use
253        let format = match format_override {
254            Some(f) => f.to_string(),
255            None => self.detect_format(&cmd)?,
256        };
257
258        // Get output info for this invocation (both stdout and stderr)
259        let outputs: Vec<(String, String)> = {
260            let mut stmt = conn.prepare(
261                "SELECT storage_type, storage_ref FROM outputs WHERE invocation_id = ? AND stream IN ('stdout', 'stderr')",
262            )?;
263            let rows = stmt.query_map(params![invocation_id], |row| Ok((row.get(0)?, row.get(1)?)))?;
264            rows.filter_map(|r| r.ok()).collect()
265        };
266
267        if outputs.is_empty() {
268            return Ok(0);
269        }
270
271        // Collect content from all outputs using DuckDB's read_blob (handles both data: and file:// URLs)
272        let mut all_content = String::new();
273        for (_storage_type, storage_ref) in &outputs {
274            // Resolve relative file:// URLs to absolute paths
275            let resolved_ref = if storage_ref.starts_with("file://") {
276                let rel_path = storage_ref.strip_prefix("file://").unwrap();
277                let abs_path = self.config.data_dir().join(rel_path);
278                format!("file://{}", abs_path.display())
279            } else {
280                storage_ref.clone()
281            };
282
283            // Use scalarfs read_blob for unified content access
284            let content: std::result::Result<Vec<u8>, _> = conn.query_row(
285                "SELECT content FROM read_blob(?)",
286                params![&resolved_ref],
287                |row| row.get(0),
288            );
289
290            if let Ok(bytes) = content {
291                if let Ok(text) = String::from_utf8(bytes) {
292                    all_content.push_str(&text);
293                }
294            }
295        }
296
297        if all_content.is_empty() {
298            return Ok(0);
299        }
300
301        // Parse the date
302        let date = date
303            .parse::<NaiveDate>()
304            .map_err(|e| Error::Storage(format!("Invalid date: {}", e)))?;
305
306        // Ensure the events partition directory exists
307        let partition_dir = self.config.events_dir(&date);
308        fs::create_dir_all(&partition_dir)?;
309
310        // Create temp table for events
311        conn.execute_batch(
312            r#"
313            CREATE OR REPLACE TEMP TABLE temp_events (
314                id UUID,
315                invocation_id UUID,
316                client_id VARCHAR,
317                hostname VARCHAR,
318                event_type VARCHAR,
319                severity VARCHAR,
320                ref_file VARCHAR,
321                ref_line INTEGER,
322                ref_column INTEGER,
323                message VARCHAR,
324                error_code VARCHAR,
325                test_name VARCHAR,
326                status VARCHAR,
327                format_used VARCHAR,
328                date DATE
329            );
330            "#,
331        )?;
332
333        // Escape content for SQL (replace single quotes)
334        let escaped_content = all_content.replace("'", "''");
335
336        // Parse with duck_hunt using parse_duck_hunt_log (takes content directly)
337        let sql = format!(
338            r#"
339            INSERT INTO temp_events
340            SELECT
341                uuid() as id,
342                '{invocation_id}'::UUID as invocation_id,
343                '{client_id}' as client_id,
344                {hostname} as hostname,
345                event_type,
346                severity,
347                ref_file,
348                ref_line::INTEGER,
349                ref_column::INTEGER,
350                message,
351                error_code,
352                test_name,
353                status,
354                '{format}' as format_used,
355                '{date}'::DATE as date
356            FROM parse_duck_hunt_log('{content}', '{format}')
357            WHERE event_type IS NOT NULL OR message IS NOT NULL;
358            "#,
359            invocation_id = invocation_id,
360            client_id = client_id.replace("'", "''"),
361            hostname = hostname
362                .as_ref()
363                .map(|h| format!("'{}'", h.replace("'", "''")))
364                .unwrap_or_else(|| "NULL".to_string()),
365            content = escaped_content,
366            format = format.replace("'", "''"),
367            date = date,
368        );
369
370        if let Err(e) = conn.execute_batch(&sql) {
371            // duck_hunt might fail on some formats - log and continue
372            eprintln!("Warning: duck_hunt parsing failed: {}", e);
373            conn.execute("DROP TABLE IF EXISTS temp_events", [])?;
374            return Ok(0);
375        }
376
377        // Count how many events were extracted
378        let count: i64 = conn.query_row("SELECT COUNT(*) FROM temp_events", [], |row| row.get(0))?;
379
380        if count == 0 {
381            conn.execute("DROP TABLE temp_events", [])?;
382            return Ok(0);
383        }
384
385        // Write to storage based on mode
386        match self.config.storage_mode {
387            StorageMode::Parquet => {
388                // Write to parquet file
389                let filename = format!("{}--{}.parquet", invocation_id, Uuid::now_v7());
390                let file_path = partition_dir.join(&filename);
391
392                let temp_path = atomic::temp_path(&file_path);
393                conn.execute(
394                    &format!(
395                        "COPY temp_events TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
396                        temp_path.display()
397                    ),
398                    [],
399                )?;
400                conn.execute("DROP TABLE temp_events", [])?;
401
402                // Rename temp to final (atomic on POSIX)
403                atomic::rename_into_place(&temp_path, &file_path)?;
404            }
405            StorageMode::DuckDB => {
406                // Insert directly into local.events
407                conn.execute_batch("INSERT INTO local.events SELECT * FROM temp_events")?;
408                conn.execute("DROP TABLE temp_events", [])?;
409            }
410        }
411
412        Ok(count as usize)
413    }
414
415    /// Write event records to the store.
416    ///
417    /// Behavior depends on storage mode:
418    /// - Parquet: Creates Parquet files partitioned by date
419    /// - DuckDB: Inserts directly into the local.events
420    pub fn write_events(&self, records: &[EventRecord]) -> Result<()> {
421        if records.is_empty() {
422            return Ok(());
423        }
424
425        match self.config.storage_mode {
426            StorageMode::Parquet => self.write_events_parquet(records),
427            StorageMode::DuckDB => self.write_events_duckdb(records),
428        }
429    }
430
431    /// Write events to Parquet files (multi-writer safe).
432    fn write_events_parquet(&self, records: &[EventRecord]) -> Result<()> {
433        let conn = self.connection()?;
434
435        // Group by date for partitioning
436        let mut by_date: std::collections::HashMap<NaiveDate, Vec<&EventRecord>> =
437            std::collections::HashMap::new();
438        for record in records {
439            by_date.entry(record.date).or_default().push(record);
440        }
441
442        for (date, date_records) in by_date {
443            let partition_dir = self.config.events_dir(&date);
444            fs::create_dir_all(&partition_dir)?;
445
446            // Create temp table
447            conn.execute_batch(
448                r#"
449                CREATE OR REPLACE TEMP TABLE temp_events (
450                    id UUID,
451                    invocation_id UUID,
452                    client_id VARCHAR,
453                    hostname VARCHAR,
454                    event_type VARCHAR,
455                    severity VARCHAR,
456                    ref_file VARCHAR,
457                    ref_line INTEGER,
458                    ref_column INTEGER,
459                    message VARCHAR,
460                    error_code VARCHAR,
461                    test_name VARCHAR,
462                    status VARCHAR,
463                    format_used VARCHAR,
464                    date DATE
465                );
466                "#,
467            )?;
468
469            // Insert records
470            for record in &date_records {
471                conn.execute(
472                    r#"
473                    INSERT INTO temp_events VALUES (
474                        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
475                    )
476                    "#,
477                    params![
478                        record.id.to_string(),
479                        record.invocation_id.to_string(),
480                        record.client_id,
481                        record.hostname,
482                        record.event_type,
483                        record.severity,
484                        record.ref_file,
485                        record.ref_line,
486                        record.ref_column,
487                        record.message,
488                        record.error_code,
489                        record.test_name,
490                        record.status,
491                        record.format_used,
492                        date.to_string(),
493                    ],
494                )?;
495            }
496
497            // Write to parquet
498            let filename = format!(
499                "{}--{}.parquet",
500                date_records[0].invocation_id,
501                Uuid::now_v7()
502            );
503            let file_path = partition_dir.join(&filename);
504
505            let temp_path = atomic::temp_path(&file_path);
506            conn.execute(
507                &format!(
508                    "COPY temp_events TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
509                    temp_path.display()
510                ),
511                [],
512            )?;
513            conn.execute("DROP TABLE temp_events", [])?;
514
515            atomic::rename_into_place(&temp_path, &file_path)?;
516        }
517
518        Ok(())
519    }
520
521    /// Write events directly to DuckDB table.
522    fn write_events_duckdb(&self, records: &[EventRecord]) -> Result<()> {
523        let conn = self.connection()?;
524
525        for record in records {
526            conn.execute(
527                r#"
528                INSERT INTO local.events VALUES (
529                    ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
530                )
531                "#,
532                params![
533                    record.id.to_string(),
534                    record.invocation_id.to_string(),
535                    record.client_id,
536                    record.hostname,
537                    record.event_type,
538                    record.severity,
539                    record.ref_file,
540                    record.ref_line,
541                    record.ref_column,
542                    record.message,
543                    record.error_code,
544                    record.test_name,
545                    record.status,
546                    record.format_used,
547                    record.date.to_string(),
548                ],
549            )?;
550        }
551
552        Ok(())
553    }
554
555    /// Query events with optional filters.
556    pub fn query_events(&self, filters: &EventFilters) -> Result<Vec<EventSummary>> {
557        let conn = self.connection()?;
558
559        // Build WHERE clause
560        let mut conditions = Vec::new();
561
562        if let Some(ref sev) = filters.severity {
563            conditions.push(format!("e.severity = '{}'", sev.replace("'", "''")));
564        }
565
566        if let Some(ref inv_id) = filters.invocation_id {
567            conditions.push(format!(
568                "e.invocation_id = '{}'",
569                inv_id.replace("'", "''")
570            ));
571        }
572
573        if let Some(ref inv_ids) = filters.invocation_ids {
574            if !inv_ids.is_empty() {
575                let ids_list: Vec<String> = inv_ids
576                    .iter()
577                    .map(|id| format!("'{}'", id.replace("'", "''")))
578                    .collect();
579                conditions.push(format!("e.invocation_id IN ({})", ids_list.join(", ")));
580            }
581        }
582
583        if let Some(ref client) = filters.client_id {
584            conditions.push(format!("e.client_id = '{}'", client.replace("'", "''")));
585        }
586
587        if let Some(ref host) = filters.hostname {
588            conditions.push(format!("e.hostname = '{}'", host.replace("'", "''")));
589        }
590
591        if let Some(ref date_from) = filters.date_from {
592            conditions.push(format!("e.date >= '{}'", date_from));
593        }
594
595        if let Some(ref date_to) = filters.date_to {
596            conditions.push(format!("e.date <= '{}'", date_to));
597        }
598
599        let where_clause = if conditions.is_empty() {
600            String::new()
601        } else {
602            format!("WHERE {}", conditions.join(" AND "))
603        };
604
605        let limit_clause = filters
606            .limit
607            .map(|l| format!("LIMIT {}", l))
608            .unwrap_or_default();
609
610        // If filtering by command pattern, we need to join with invocations
611        let sql = if filters.cmd_pattern.is_some() {
612            let cmd_pattern = filters.cmd_pattern.as_ref().unwrap().replace("'", "''");
613            let where_prefix = if conditions.is_empty() {
614                "WHERE".to_string()
615            } else {
616                format!("{} AND", where_clause)
617            };
618            format!(
619                r#"
620                SELECT
621                    e.id::VARCHAR,
622                    e.invocation_id::VARCHAR,
623                    e.severity,
624                    e.message,
625                    e.ref_file,
626                    e.ref_line,
627                    e.error_code,
628                    e.test_name,
629                    e.status
630                FROM events e
631                JOIN invocations i ON e.invocation_id = i.id
632                {}
633                i.cmd LIKE '%{}%'
634                ORDER BY i.timestamp DESC
635                {}
636                "#,
637                where_prefix,
638                cmd_pattern,
639                limit_clause
640            )
641        } else {
642            format!(
643                r#"
644                SELECT
645                    e.id::VARCHAR,
646                    e.invocation_id::VARCHAR,
647                    e.severity,
648                    e.message,
649                    e.ref_file,
650                    e.ref_line,
651                    e.error_code,
652                    e.test_name,
653                    e.status
654                FROM events e
655                {}
656                ORDER BY e.date DESC
657                {}
658                "#,
659                where_clause, limit_clause
660            )
661        };
662
663        let mut stmt = match conn.prepare(&sql) {
664            Ok(stmt) => stmt,
665            Err(e) => {
666                if e.to_string().contains("No files found") {
667                    return Ok(Vec::new());
668                }
669                return Err(e.into());
670            }
671        };
672
673        let rows = stmt.query_map([], |row| {
674            Ok(EventSummary {
675                id: row.get(0)?,
676                invocation_id: row.get(1)?,
677                severity: row.get(2)?,
678                message: row.get(3)?,
679                ref_file: row.get(4)?,
680                ref_line: row.get(5)?,
681                error_code: row.get(6)?,
682                test_name: row.get(7)?,
683                status: row.get(8)?,
684            })
685        });
686
687        match rows {
688            Ok(rows) => {
689                let mut results = Vec::new();
690                for row in rows {
691                    results.push(row?);
692                }
693                Ok(results)
694            }
695            Err(e) => {
696                if e.to_string().contains("No files found") {
697                    Ok(Vec::new())
698                } else {
699                    Err(e.into())
700                }
701            }
702        }
703    }
704
705    /// Count events matching the given filters.
706    pub fn event_count(&self, filters: &EventFilters) -> Result<i64> {
707        let conn = self.connection()?;
708
709        let mut conditions = Vec::new();
710
711        if let Some(ref sev) = filters.severity {
712            conditions.push(format!("severity = '{}'", sev.replace("'", "''")));
713        }
714
715        if let Some(ref inv_id) = filters.invocation_id {
716            conditions.push(format!(
717                "invocation_id = '{}'",
718                inv_id.replace("'", "''")
719            ));
720        }
721
722        if let Some(ref inv_ids) = filters.invocation_ids {
723            if !inv_ids.is_empty() {
724                let ids_list: Vec<String> = inv_ids
725                    .iter()
726                    .map(|id| format!("'{}'", id.replace("'", "''")))
727                    .collect();
728                conditions.push(format!("invocation_id IN ({})", ids_list.join(", ")));
729            }
730        }
731
732        if let Some(ref client) = filters.client_id {
733            conditions.push(format!("client_id = '{}'", client.replace("'", "''")));
734        }
735
736        let where_clause = if conditions.is_empty() {
737            String::new()
738        } else {
739            format!("WHERE {}", conditions.join(" AND "))
740        };
741
742        let sql = format!("SELECT COUNT(*) FROM events {}", where_clause);
743
744        let result: std::result::Result<i64, _> = conn.query_row(&sql, [], |row| row.get(0));
745
746        match result {
747            Ok(count) => Ok(count),
748            Err(e) => {
749                if e.to_string().contains("No files found") {
750                    Ok(0)
751                } else {
752                    Err(e.into())
753                }
754            }
755        }
756    }
757
758    /// Delete events for an invocation (for re-extraction).
759    ///
760    /// Behavior depends on storage mode:
761    /// - Parquet: Deletes parquet files containing the events
762    /// - DuckDB: Deletes rows from local.events
763    pub fn delete_events_for_invocation(&self, invocation_id: &str) -> Result<usize> {
764        match self.config.storage_mode {
765            StorageMode::Parquet => self.delete_events_parquet(invocation_id),
766            StorageMode::DuckDB => self.delete_events_duckdb(invocation_id),
767        }
768    }
769
770    /// Delete events from parquet files.
771    fn delete_events_parquet(&self, invocation_id: &str) -> Result<usize> {
772        // Since we're using parquet files, we need to find and delete the files
773        // This is a simplified approach - in production you might want to rewrite
774        // the parquet files without these records
775        let conn = self.connection()?;
776
777        // Get the date(s) for this invocation's events
778        let dates: Vec<String> = {
779            let sql = format!(
780                "SELECT DISTINCT date::VARCHAR FROM events WHERE invocation_id = '{}'",
781                invocation_id.replace("'", "''")
782            );
783            let mut stmt = match conn.prepare(&sql) {
784                Ok(stmt) => stmt,
785                Err(e) => {
786                    if e.to_string().contains("No files found") {
787                        return Ok(0);
788                    }
789                    return Err(e.into());
790                }
791            };
792            let rows = stmt.query_map([], |row| row.get(0))?;
793            rows.filter_map(|r| r.ok()).collect()
794        };
795
796        let mut deleted = 0;
797
798        for date_str in dates {
799            let date = date_str
800                .parse::<NaiveDate>()
801                .map_err(|e| Error::Storage(format!("Invalid date: {}", e)))?;
802
803            let partition_dir = self.config.events_dir(&date);
804
805            // Find and delete parquet files that start with this invocation_id
806            if partition_dir.exists() {
807                for entry in fs::read_dir(&partition_dir)? {
808                    let entry = entry?;
809                    let name = entry.file_name();
810                    let name_str = name.to_string_lossy();
811                    if name_str.starts_with(invocation_id) && name_str.ends_with(".parquet") {
812                        fs::remove_file(entry.path())?;
813                        deleted += 1;
814                    }
815                }
816            }
817        }
818
819        Ok(deleted)
820    }
821
822    /// Delete events from DuckDB table.
823    fn delete_events_duckdb(&self, invocation_id: &str) -> Result<usize> {
824        let conn = self.connection()?;
825
826        // Count events before deletion
827        let count: i64 = conn
828            .query_row(
829                "SELECT COUNT(*) FROM local.events WHERE invocation_id = ?",
830                params![invocation_id],
831                |row| row.get(0),
832            )
833            .unwrap_or(0);
834
835        if count > 0 {
836            conn.execute(
837                "DELETE FROM local.events WHERE invocation_id = ?",
838                params![invocation_id],
839            )?;
840        }
841
842        Ok(count as usize)
843    }
844
845    /// Get invocations that have outputs but no events extracted yet.
846    ///
847    /// Useful for backfilling events from existing invocations.
848    pub fn invocations_without_events(
849        &self,
850        since: Option<NaiveDate>,
851        limit: Option<usize>,
852    ) -> Result<Vec<super::InvocationSummary>> {
853        let conn = self.connection()?;
854
855        // Default to last 30 days if not specified
856        let since_date = since.unwrap_or_else(|| {
857            chrono::Utc::now().date_naive() - chrono::Duration::days(30)
858        });
859
860        let limit_clause = limit
861            .map(|l| format!("LIMIT {}", l))
862            .unwrap_or_else(|| "LIMIT 1000".to_string());
863
864        let sql = format!(
865            r#"
866            SELECT i.id::VARCHAR, i.cmd, i.exit_code, i.timestamp::VARCHAR, i.duration_ms
867            FROM invocations i
868            WHERE EXISTS (SELECT 1 FROM outputs o WHERE o.invocation_id = i.id)
869              AND NOT EXISTS (SELECT 1 FROM events e WHERE e.invocation_id = i.id)
870              AND i.date >= '{}'
871            ORDER BY i.timestamp DESC
872            {}
873            "#,
874            since_date, limit_clause
875        );
876
877        let mut stmt = match conn.prepare(&sql) {
878            Ok(stmt) => stmt,
879            Err(e) => {
880                // Handle case where tables don't exist yet
881                if e.to_string().contains("No files found") {
882                    return Ok(Vec::new());
883                }
884                return Err(e.into());
885            }
886        };
887
888        let rows = stmt.query_map([], |row| {
889            Ok(super::InvocationSummary {
890                id: row.get(0)?,
891                cmd: row.get(1)?,
892                exit_code: row.get(2)?,
893                timestamp: row.get(3)?,
894                duration_ms: row.get(4)?,
895            })
896        })?;
897
898        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
899        Ok(results)
900    }
901}
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906    use crate::init::initialize;
907    use crate::Config;
908    use tempfile::TempDir;
909
910    fn setup_store() -> (TempDir, Store) {
911        let tmp = TempDir::new().unwrap();
912        let config = Config::with_root(tmp.path());
913        initialize(&config).unwrap();
914        let store = Store::open(config).unwrap();
915        (tmp, store)
916    }
917
918    #[test]
919    fn test_format_config_detect() {
920        let config = FormatConfig {
921            rules: vec![
922                FormatRule {
923                    pattern: "*gcc*".to_string(),
924                    format: "gcc".to_string(),
925                },
926                FormatRule {
927                    pattern: "*cargo test*".to_string(),
928                    format: "cargo_test_json".to_string(),
929                },
930            ],
931            default: Some(DefaultFormat {
932                format: "auto".to_string(),
933            }),
934        };
935
936        assert_eq!(config.detect_format("gcc -o foo foo.c"), "gcc");
937        assert_eq!(config.detect_format("/usr/bin/gcc main.c"), "gcc");
938        assert_eq!(config.detect_format("cargo test --release"), "cargo_test_json");
939        assert_eq!(config.detect_format("make test"), "auto");
940    }
941
942    #[test]
943    fn test_glob_to_like() {
944        use super::glob_to_like;
945
946        // Basic wildcards
947        assert_eq!(glob_to_like("*"), "%");
948        assert_eq!(glob_to_like("?"), "_");
949        assert_eq!(glob_to_like("*gcc*"), "%gcc%");
950        assert_eq!(glob_to_like("cargo test*"), "cargo test%");
951        assert_eq!(glob_to_like("*cargo test*"), "%cargo test%");
952
953        // Escape special LIKE chars
954        assert_eq!(glob_to_like("100%"), "100\\%");
955        assert_eq!(glob_to_like("file_name"), "file\\_name");
956
957        // Mixed
958        assert_eq!(glob_to_like("*test?file*"), "%test_file%");
959    }
960
961    #[test]
962    fn test_store_detect_format_sql() {
963        let (tmp, store) = setup_store();
964
965        // Write a config file with rules
966        let config_path = tmp.path().join("event-formats.toml");
967        std::fs::write(
968            &config_path,
969            r#"
970[[rules]]
971pattern = "*gcc*"
972format = "gcc"
973
974[[rules]]
975pattern = "*cargo test*"
976format = "cargo_test_json"
977
978[[rules]]
979pattern = "pytest*"
980format = "pytest_json"
981
982[default]
983format = "auto"
984"#,
985        )
986        .unwrap();
987
988        // Test SQL-based detection
989        assert_eq!(store.detect_format("gcc -o foo foo.c").unwrap(), "gcc");
990        assert_eq!(store.detect_format("/usr/bin/gcc main.c").unwrap(), "gcc");
991        assert_eq!(
992            store.detect_format("cargo test --release").unwrap(),
993            "cargo_test_json"
994        );
995        assert_eq!(store.detect_format("pytest tests/").unwrap(), "pytest_json");
996        assert_eq!(store.detect_format("make test").unwrap(), "auto");
997    }
998
999    #[test]
1000    fn test_store_has_events_dir() {
1001        let (tmp, store) = setup_store();
1002        let date = chrono::Utc::now().date_naive();
1003        let events_dir = store.config().events_dir(&date);
1004        assert!(events_dir.starts_with(tmp.path()));
1005        assert!(events_dir.to_string_lossy().contains("events"));
1006    }
1007
1008    #[test]
1009    fn test_query_events_empty() {
1010        let (_tmp, store) = setup_store();
1011
1012        let events = store.query_events(&EventFilters::default()).unwrap();
1013        assert!(events.is_empty());
1014    }
1015
1016    #[test]
1017    fn test_event_count_empty() {
1018        let (_tmp, store) = setup_store();
1019
1020        let count = store.event_count(&EventFilters::default()).unwrap();
1021        assert_eq!(count, 0);
1022    }
1023}