Skip to main content

magic_bird/store/
mod.rs

1//! Store - handles writing and reading records.
2//!
3//! Uses DuckDB to write Parquet files and query across them.
4
5mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod remote;
11mod sessions;
12
13use std::fs;
14use std::thread;
15use std::time::Duration;
16
17use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
18use duckdb::{
19    params,
20    types::{TimeUnit, Value, ValueRef},
21    Connection,
22};
23
24use crate::config::StorageMode;
25use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
26use crate::{Config, Error, Result};
27
28/// Format a DuckDB Value to a human-readable string.
29/// Handles complex types like List, Array, Map, and Struct recursively.
30fn format_value(value: &Value) -> String {
31    match value {
32        Value::Null => "NULL".to_string(),
33        Value::Boolean(b) => b.to_string(),
34        Value::TinyInt(n) => n.to_string(),
35        Value::SmallInt(n) => n.to_string(),
36        Value::Int(n) => n.to_string(),
37        Value::BigInt(n) => n.to_string(),
38        Value::HugeInt(n) => n.to_string(),
39        Value::UTinyInt(n) => n.to_string(),
40        Value::USmallInt(n) => n.to_string(),
41        Value::UInt(n) => n.to_string(),
42        Value::UBigInt(n) => n.to_string(),
43        Value::Float(f) => f.to_string(),
44        Value::Double(f) => f.to_string(),
45        Value::Decimal(d) => d.to_string(),
46        Value::Timestamp(_, micros) => {
47            DateTime::<Utc>::from_timestamp_micros(*micros)
48                .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
49                .unwrap_or_else(|| format!("<timestamp {}>", micros))
50        }
51        Value::Text(s) => s.clone(),
52        Value::Blob(b) => format!("<blob {} bytes>", b.len()),
53        Value::Date32(days) => {
54            NaiveDate::from_ymd_opt(1970, 1, 1)
55                .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
56                .map(|d| d.format("%Y-%m-%d").to_string())
57                .unwrap_or_else(|| format!("<date {}>", days))
58        }
59        Value::Time64(_, micros) => {
60            let secs = (*micros / 1_000_000) as u32;
61            let micro_part = (*micros % 1_000_000) as u32;
62            NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
63                .map(|t| t.format("%H:%M:%S").to_string())
64                .unwrap_or_else(|| format!("<time {}>", micros))
65        }
66        Value::Interval { months, days, nanos } => {
67            format!("{} months {} days {} ns", months, days, nanos)
68        }
69        // Complex types
70        Value::List(items) => {
71            let formatted: Vec<String> = items.iter().map(format_value).collect();
72            format!("[{}]", formatted.join(", "))
73        }
74        Value::Array(items) => {
75            let formatted: Vec<String> = items.iter().map(format_value).collect();
76            format!("[{}]", formatted.join(", "))
77        }
78        Value::Map(map) => {
79            let formatted: Vec<String> = map
80                .iter()
81                .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
82                .collect();
83            format!("{{{}}}", formatted.join(", "))
84        }
85        Value::Struct(fields) => {
86            let formatted: Vec<String> = fields
87                .iter()
88                .map(|(k, v)| format!("{}: {}", k, format_value(v)))
89                .collect();
90            format!("{{{}}}", formatted.join(", "))
91        }
92        Value::Enum(s) => s.clone(),
93        _ => "<unknown>".to_string(),
94    }
95}
96
97// Re-export types from submodules
98pub use compact::{ArchiveStats, AutoCompactOptions, CompactOptions, CompactStats};
99pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
100pub use invocations::InvocationSummary;
101pub use outputs::OutputInfo;
102pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
103
104// Re-export format detection types (defined below)
105// BuiltinFormat, FormatMatch, FormatSource are defined at the bottom of this file
106
107/// A batch of related records to write atomically.
108///
109/// Use this when you want to write an invocation along with its outputs,
110/// session, and/or events in a single transaction.
111#[derive(Debug, Default)]
112pub struct InvocationBatch {
113    /// The invocation record (required).
114    pub invocation: Option<InvocationRecord>,
115
116    /// Output streams with their content: (stream_name, content).
117    /// Common streams: "stdout", "stderr", "combined".
118    pub outputs: Vec<(String, Vec<u8>)>,
119
120    /// Session record (optional, created if not already registered).
121    pub session: Option<SessionRecord>,
122
123    /// Pre-extracted events (optional).
124    pub events: Option<Vec<EventRecord>>,
125}
126
127impl InvocationBatch {
128    /// Create a new batch with an invocation.
129    pub fn new(invocation: InvocationRecord) -> Self {
130        Self {
131            invocation: Some(invocation),
132            outputs: Vec::new(),
133            session: None,
134            events: None,
135        }
136    }
137
138    /// Add an output stream.
139    pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
140        self.outputs.push((stream.into(), content));
141        self
142    }
143
144    /// Add a session record.
145    pub fn with_session(mut self, session: SessionRecord) -> Self {
146        self.session = Some(session);
147        self
148    }
149
150    /// Add pre-extracted events.
151    pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
152        self.events = Some(events);
153        self
154    }
155}
156
157/// Options for creating a database connection.
158///
159/// Controls what gets loaded and attached when opening a connection.
160/// Use this for explicit control over connection behavior.
161///
162/// # Connection Stages
163///
164/// 1. **Extensions** (always): parquet, icu, scalarfs, duck_hunt
165/// 2. **Blob resolution** (always): S3 credentials, blob_roots macro
166/// 3. **Migration** (optional): Upgrade existing installations to new schema
167/// 4. **Remotes** (optional): Attach remote databases, rebuild remotes.* views
168/// 5. **Project** (optional): Attach project-local database if in a project
169/// 6. **CWD views** (optional): Rebuild cwd.* views for current directory
170#[derive(Debug, Clone, Default)]
171pub struct ConnectionOptions {
172    /// Attach configured remotes (default: true).
173    /// When true, remote databases are attached and remotes.* views are rebuilt
174    /// to include the attached data.
175    pub attach_remotes: bool,
176
177    /// Attach project database if in a project directory (default: true).
178    pub attach_project: bool,
179
180    /// Rebuild cwd.* views for current working directory (default: true).
181    /// These views filter main.* data to entries matching the current directory.
182    pub create_ephemeral_views: bool,
183
184    /// Run migration for existing installations (default: false).
185    /// Only enable this for explicit upgrade operations.
186    pub run_migration: bool,
187}
188
189impl ConnectionOptions {
190    /// Create options for a full connection (default behavior).
191    pub fn full() -> Self {
192        Self {
193            attach_remotes: true,
194            attach_project: true,
195            create_ephemeral_views: true,
196            run_migration: false,
197        }
198    }
199
200    /// Create options for a minimal connection (no attachments).
201    /// Useful for write operations that don't need remote data.
202    pub fn minimal() -> Self {
203        Self {
204            attach_remotes: false,
205            attach_project: false,
206            create_ephemeral_views: false,
207            run_migration: false,
208        }
209    }
210
211    /// Create options for a migration/upgrade connection.
212    pub fn for_migration() -> Self {
213        Self {
214            attach_remotes: false,
215            attach_project: false,
216            create_ephemeral_views: false,
217            run_migration: true,
218        }
219    }
220}
221
222/// Ensure a DuckDB extension is loaded, installing if necessary.
223///
224/// Attempts in order:
225/// 1. LOAD (extension might already be available)
226/// 2. INSTALL from default repository, then LOAD
227/// 3. INSTALL FROM community, then LOAD
228///
229/// Returns Ok(true) if loaded successfully, Ok(false) if extension unavailable.
230fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
231    // Try loading directly first (already installed/cached)
232    if conn.execute(&format!("LOAD {}", name), []).is_ok() {
233        return Ok(true);
234    }
235
236    // Try installing from default repository
237    if conn.execute(&format!("INSTALL {}", name), []).is_ok()
238        && conn.execute(&format!("LOAD {}", name), []).is_ok()
239    {
240        return Ok(true);
241    }
242
243    // Try installing from community repository
244    if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
245        && conn.execute(&format!("LOAD {}", name), []).is_ok()
246    {
247        return Ok(true);
248    }
249
250    Ok(false)
251}
252
253/// A BIRD store for reading and writing records.
254pub struct Store {
255    config: Config,
256}
257
258impl Store {
259    /// Open an existing BIRD store.
260    pub fn open(config: Config) -> Result<Self> {
261        if !config.db_path().exists() {
262            return Err(Error::NotInitialized(config.bird_root.clone()));
263        }
264        Ok(Self { config })
265    }
266
267    /// Open a DuckDB connection with retry and exponential backoff.
268    ///
269    /// DuckDB uses file locking for concurrent access. When multiple processes
270    /// (e.g., background shell hook saves) try to access the database simultaneously,
271    /// this method retries with exponential backoff to avoid lock conflicts.
272    fn open_connection_with_retry(&self) -> Result<Connection> {
273        const MAX_RETRIES: u32 = 10;
274        const INITIAL_DELAY_MS: u64 = 10;
275        const MAX_DELAY_MS: u64 = 1000;
276
277        let db_path = self.config.db_path();
278        let mut delay_ms = INITIAL_DELAY_MS;
279        let mut last_error = None;
280
281        for attempt in 0..MAX_RETRIES {
282            match Connection::open(&db_path) {
283                Ok(conn) => return Ok(conn),
284                Err(e) => {
285                    let err_msg = e.to_string();
286                    // Check if this is a lock conflict error
287                    if err_msg.contains("Could not set lock")
288                        || err_msg.contains("Conflicting lock")
289                        || err_msg.contains("database is locked")
290                    {
291                        last_error = Some(e);
292                        if attempt < MAX_RETRIES - 1 {
293                            // Add jitter to avoid thundering herd
294                            let jitter = (attempt as u64 * 7) % 10;
295                            thread::sleep(Duration::from_millis(delay_ms + jitter));
296                            delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
297                            continue;
298                        }
299                    } else {
300                        // Non-lock error, fail immediately
301                        return Err(e.into());
302                    }
303                }
304            }
305        }
306
307        // All retries exhausted
308        Err(last_error
309            .map(|e| e.into())
310            .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
311    }
312
313    /// Get a DuckDB connection with full features (attachments, ephemeral views).
314    pub fn connection(&self) -> Result<Connection> {
315        self.connect(ConnectionOptions::full())
316    }
317
318    /// Get a DuckDB connection with optional remote attachment (legacy API).
319    pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
320        let opts = if attach_remotes {
321            ConnectionOptions::full()
322        } else {
323            ConnectionOptions::minimal()
324        };
325        self.connect(opts)
326    }
327
328    /// Get a DuckDB connection with explicit options.
329    ///
330    /// This is the main connection method. Use `ConnectionOptions` to control:
331    /// - Whether remotes are attached
332    /// - Whether project database is attached
333    /// - Whether ephemeral views are created
334    /// - Whether migration should run
335    ///
336    /// Uses retry with exponential backoff to handle concurrent access.
337    pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
338        let conn = self.open_connection_with_retry()?;
339
340        // ===== Load required extensions =====
341        // Uses default extension directory (typically ~/.duckdb/extensions)
342        // Falls back to community repository if not in default
343        conn.execute("SET allow_community_extensions = true", [])?;
344
345        for ext in ["parquet", "icu"] {
346            if !ensure_extension(&conn, ext)? {
347                return Err(Error::Extension(format!(
348                    "Required extension '{}' could not be loaded",
349                    ext
350                )));
351            }
352        }
353
354        // Optional community extensions - warn if missing
355        for (ext, desc) in [
356            ("scalarfs", "data: URL support for inline blobs"),
357            ("duck_hunt", "log/output parsing for event extraction"),
358        ] {
359            if !ensure_extension(&conn, ext)? {
360                eprintln!("Warning: {} extension not available ({})", ext, desc);
361            }
362        }
363
364        // Set file search path so views resolve relative paths correctly
365        conn.execute(
366            &format!(
367                "SET file_search_path = '{}'",
368                self.config.data_dir().display()
369            ),
370            [],
371        )?;
372
373        // ===== Optional: Run migration for existing installations =====
374        if opts.run_migration {
375            self.migrate_to_new_schema(&conn)?;
376        }
377
378        // ===== Always set up blob resolution =====
379        // S3 credentials needed before blob_roots is used
380        self.setup_s3_credentials(&conn)?;
381        self.setup_blob_resolution(&conn)?;
382
383        // ===== Optional: Attach remotes and create access macros =====
384        if opts.attach_remotes && !self.config.remotes.is_empty() {
385            self.attach_remotes(&conn)?;
386            self.create_remote_macros(&conn)?;
387        }
388
389        // ===== Optional: Attach project database =====
390        if opts.attach_project {
391            self.attach_project_db(&conn)?;
392        }
393
394        // ===== Optional: Create cwd macros =====
395        // These TEMPORARY macros filter by current working directory
396        if opts.create_ephemeral_views {
397            self.create_cwd_macros(&conn)?;
398        }
399
400        Ok(conn)
401    }
402
403    /// Attach project-level `.bird/` database if we're in a project directory.
404    ///
405    /// The project database is attached as read-only under schema "project".
406    /// This allows queries like `SELECT * FROM project.invocations`.
407    fn attach_project_db(&self, conn: &Connection) -> Result<()> {
408        use crate::project::find_current_project;
409
410        let Some(project) = find_current_project() else {
411            return Ok(()); // Not in a project
412        };
413
414        if !project.is_initialized() {
415            return Ok(()); // Project not initialized
416        }
417
418        // Don't attach if project DB is the same as user DB
419        if project.db_path == self.config.db_path() {
420            return Ok(());
421        }
422
423        // Attach as read-only
424        let attach_sql = format!(
425            "ATTACH '{}' AS project (READ_ONLY)",
426            project.db_path.display()
427        );
428
429        if let Err(e) = conn.execute(&attach_sql, []) {
430            // Log but don't fail - project DB might be locked or inaccessible
431            eprintln!("Note: Could not attach project database: {}", e);
432        }
433
434        Ok(())
435    }
436
437    /// Migrate existing installations to the new schema architecture.
438    ///
439    /// Checks if the `local` schema exists; if not, creates the new schema
440    /// structure and migrates data from old `*_table` tables.
441    fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
442        // Check if already migrated (local schema exists)
443        let local_exists: bool = conn
444            .query_row(
445                "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
446                [],
447                |row| row.get(0),
448            )
449            .unwrap_or(false);
450
451        if local_exists {
452            return Ok(());
453        }
454
455        // This is an old installation - need to migrate
456        // For now, just create the new schemas. Data migration would require
457        // moving data from old tables/views to new structure.
458        // TODO: Implement full data migration if needed
459
460        eprintln!("Note: Migrating to new schema architecture...");
461
462        // Create core schemas
463        conn.execute_batch(
464            r#"
465            CREATE SCHEMA IF NOT EXISTS local;
466            CREATE SCHEMA IF NOT EXISTS cached_placeholder;
467            CREATE SCHEMA IF NOT EXISTS remote_placeholder;
468            CREATE SCHEMA IF NOT EXISTS caches;
469            CREATE SCHEMA IF NOT EXISTS remotes;
470            CREATE SCHEMA IF NOT EXISTS unified;
471            CREATE SCHEMA IF NOT EXISTS cwd;
472            "#,
473        )?;
474
475        // For DuckDB mode, create local tables
476        if self.config.storage_mode == crate::StorageMode::DuckDB {
477            conn.execute_batch(
478                r#"
479                CREATE TABLE IF NOT EXISTS local.sessions (
480                    session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
481                    invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
482                );
483                CREATE TABLE IF NOT EXISTS local.invocations (
484                    id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
485                    cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
486                    format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
487                    tag VARCHAR, date DATE
488                );
489                CREATE TABLE IF NOT EXISTS local.outputs (
490                    id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
491                    byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
492                    content_type VARCHAR, date DATE
493                );
494                CREATE TABLE IF NOT EXISTS local.events (
495                    id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
496                    event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
497                    ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
498                    status VARCHAR, format_used VARCHAR, date DATE
499                );
500                "#,
501            )?;
502
503            // Copy data from old tables if they exist
504            let old_tables_exist: bool = conn
505                .query_row(
506                    "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
507                    [],
508                    |row| row.get(0),
509                )
510                .unwrap_or(false);
511
512            if old_tables_exist {
513                conn.execute_batch(
514                    r#"
515                    INSERT INTO local.sessions SELECT * FROM sessions_table;
516                    INSERT INTO local.invocations SELECT * FROM invocations_table;
517                    INSERT INTO local.outputs SELECT * FROM outputs_table;
518                    INSERT INTO local.events SELECT * FROM events_table;
519                    "#,
520                )?;
521            }
522        } else {
523            // Parquet mode - create views over parquet files
524            conn.execute_batch(
525                r#"
526                CREATE OR REPLACE VIEW local.sessions AS
527                SELECT * EXCLUDE (filename) FROM read_parquet(
528                    'recent/sessions/**/*.parquet',
529                    union_by_name = true, hive_partitioning = true, filename = true
530                );
531                CREATE OR REPLACE VIEW local.invocations AS
532                SELECT * EXCLUDE (filename) FROM read_parquet(
533                    'recent/invocations/**/*.parquet',
534                    union_by_name = true, hive_partitioning = true, filename = true
535                );
536                CREATE OR REPLACE VIEW local.outputs AS
537                SELECT * EXCLUDE (filename) FROM read_parquet(
538                    'recent/outputs/**/*.parquet',
539                    union_by_name = true, hive_partitioning = true, filename = true
540                );
541                CREATE OR REPLACE VIEW local.events AS
542                SELECT * EXCLUDE (filename) FROM read_parquet(
543                    'recent/events/**/*.parquet',
544                    union_by_name = true, hive_partitioning = true, filename = true
545                );
546                "#,
547            )?;
548        }
549
550        // Create placeholder schemas
551        conn.execute_batch(
552            r#"
553            CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
554                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
555                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
556            );
557            CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
558                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
559                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
560                format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
561            );
562            CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
563                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
564                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
565                content_type VARCHAR, date DATE, _source VARCHAR
566            );
567            CREATE TABLE IF NOT EXISTS cached_placeholder.events (
568                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
569                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
570                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
571                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
572            );
573            CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
574                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
575                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
576            );
577            CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
578                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
579                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
580                format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
581            );
582            CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
583                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
584                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
585                content_type VARCHAR, date DATE, _source VARCHAR
586            );
587            CREATE TABLE IF NOT EXISTS remote_placeholder.events (
588                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
592            );
593            "#,
594        )?;
595
596        // Create union schemas
597        conn.execute_batch(
598            r#"
599            CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
600            CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
601            CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
602            CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
603
604            CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
605            CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
606            CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
607            CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
608
609            CREATE OR REPLACE VIEW main.sessions AS
610                SELECT *, 'local' as _source FROM local.sessions
611                UNION ALL BY NAME SELECT * FROM caches.sessions;
612            CREATE OR REPLACE VIEW main.invocations AS
613                SELECT *, 'local' as _source FROM local.invocations
614                UNION ALL BY NAME SELECT * FROM caches.invocations;
615            CREATE OR REPLACE VIEW main.outputs AS
616                SELECT *, 'local' as _source FROM local.outputs
617                UNION ALL BY NAME SELECT * FROM caches.outputs;
618            CREATE OR REPLACE VIEW main.events AS
619                SELECT *, 'local' as _source FROM local.events
620                UNION ALL BY NAME SELECT * FROM caches.events;
621
622            CREATE OR REPLACE VIEW unified.sessions AS
623                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
624            CREATE OR REPLACE VIEW unified.invocations AS
625                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
626            CREATE OR REPLACE VIEW unified.outputs AS
627                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
628            CREATE OR REPLACE VIEW unified.events AS
629                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
630
631            -- Qualified views: deduplicated with source list
632            CREATE OR REPLACE VIEW unified.qualified_sessions AS
633                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
634                FROM unified.sessions GROUP BY ALL;
635            CREATE OR REPLACE VIEW unified.qualified_invocations AS
636                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
637                FROM unified.invocations GROUP BY ALL;
638            CREATE OR REPLACE VIEW unified.qualified_outputs AS
639                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
640                FROM unified.outputs GROUP BY ALL;
641            CREATE OR REPLACE VIEW unified.qualified_events AS
642                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
643                FROM unified.events GROUP BY ALL;
644            "#,
645        )?;
646
647        Ok(())
648    }
649
650    /// Set up S3 credentials for all remotes that use S3.
651    /// This is called early so that blob resolution can access S3 paths.
652    fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
653        // Check if any remote uses S3
654        let has_s3 = self.config.remotes.iter().any(|r| {
655            r.remote_type == crate::config::RemoteType::S3
656        });
657
658        if !has_s3 {
659            return Ok(());
660        }
661
662        // Set up credentials for each S3 remote
663        for remote in &self.config.remotes {
664            if remote.remote_type == crate::config::RemoteType::S3 {
665                if let Some(provider) = &remote.credential_provider {
666                    let secret_sql = format!(
667                        "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
668                        remote.name, provider
669                    );
670                    if let Err(e) = conn.execute(&secret_sql, []) {
671                        eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
672                    }
673                }
674            }
675        }
676
677        Ok(())
678    }
679
680    /// Set up blob_roots variable and blob resolution macros.
681    ///
682    /// Storage refs use URI schemes to indicate type:
683    /// - `data:`, `data+varchar:`, `data+blob:` - inline content (scalarfs)
684    /// - `file:path` - relative path, resolved against blob_roots
685    /// - Absolute paths (`s3://`, `/path/`) - used directly
686    fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
687        let blob_roots = self.config.blob_roots();
688
689        // Format as SQL array literal
690        let roots_sql: String = blob_roots
691            .iter()
692            .map(|r| format!("'{}'", r.replace('\'', "''")))
693            .collect::<Vec<_>>()
694            .join(", ");
695
696        // Set blob_roots variable
697        conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
698
699        // Helper: check if ref is inline data (scalarfs data: protocol)
700        conn.execute(
701            r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
702                ref[:5] = 'data:' OR ref[:5] = 'data+'
703            )"#,
704            [],
705        )?;
706
707        // Helper: check if ref is a relative file: path
708        conn.execute(
709            r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
710                ref[:5] = 'file:'
711            )"#,
712            [],
713        )?;
714
715        // Resolve storage ref to list of paths for pathvariable:
716        // - Inline data: pass through as single-element list
717        // - file: refs: expand to glob patterns across all blob_roots
718        // - Other (absolute paths): pass through
719        conn.execute(
720            r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
721                CASE
722                    WHEN is_inline_data(ref) THEN [ref]
723                    WHEN is_file_ref(ref) THEN
724                        [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
725                    ELSE [ref]
726                END
727            )"#,
728            [],
729        )?;
730
731        Ok(())
732    }
733
734    /// Attach configured remotes to the connection.
735    /// Note: S3 credentials are already set up by setup_s3_credentials().
736    fn attach_remotes(&self, conn: &Connection) -> Result<()> {
737        for remote in self.config.auto_attach_remotes() {
738            // Attach the remote database
739            let attach_sql = remote.attach_sql();
740            if let Err(e) = conn.execute(&attach_sql, []) {
741                eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
742            }
743        }
744
745        Ok(())
746    }
747
748    /// Create TEMPORARY macros for accessing remote data.
749    ///
750    /// We use TEMPORARY macros to avoid persisting references to attached databases
751    /// in the catalog. Persisted references cause database corruption when the
752    /// attachment is not present.
753    ///
754    /// Usage: `SELECT * FROM remotes_invocations()` or `SELECT * FROM remote_<name>_invocations()`
755    fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
756        let remotes = self.config.auto_attach_remotes();
757        if remotes.is_empty() {
758            return Ok(());
759        }
760
761        // Create per-remote TEMPORARY macros for each table type
762        for remote in &remotes {
763            let schema = remote.quoted_schema_name();
764            let name = &remote.name;
765            // Sanitize name for use in macro identifier
766            let safe_name = name.replace(['-', '.'], "_");
767
768            for table in &["sessions", "invocations", "outputs", "events"] {
769                let macro_name = format!("\"remote_{safe_name}_{table}\"");
770                let sql = format!(
771                    r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
772                        SELECT *, '{name}' as _source FROM {schema}.{table}
773                    )"#,
774                    macro_name = macro_name,
775                    name = name,
776                    schema = schema,
777                    table = table
778                );
779                if let Err(e) = conn.execute(&sql, []) {
780                    eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
781                }
782            }
783        }
784
785        // Create combined remotes_* TEMPORARY macros that union all remotes
786        for table in &["sessions", "invocations", "outputs", "events"] {
787            let mut union_parts: Vec<String> = remotes
788                .iter()
789                .map(|r| {
790                    let safe_name = r.name.replace(['-', '.'], "_");
791                    format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
792                })
793                .collect();
794
795            // Include placeholder for empty case
796            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
797
798            let sql = format!(
799                r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
800                    {union}
801                )"#,
802                table = table,
803                union = union_parts.join(" UNION ALL BY NAME ")
804            );
805            if let Err(e) = conn.execute(&sql, []) {
806                eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
807            }
808        }
809
810        Ok(())
811    }
812
813    /// Create TEMPORARY macros for cwd-filtered data.
814    ///
815    /// These filter main.* data to entries matching the current working directory.
816    /// Uses TEMPORARY macros to avoid persisting anything that changes per-connection.
817    ///
818    /// Usage: `SELECT * FROM cwd_invocations()`
819    fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
820        let cwd = std::env::current_dir()
821            .map(|p| p.to_string_lossy().to_string())
822            .unwrap_or_default();
823        let cwd_escaped = cwd.replace('\'', "''");
824
825        // Create TEMPORARY macros for cwd-filtered data
826        let macros = format!(
827            r#"
828            CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
829                SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
830            );
831            CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
832                SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
833            );
834            CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
835                SELECT o.* FROM main.outputs o
836                JOIN main.invocations i ON o.invocation_id = i.id
837                WHERE i.cwd LIKE '{}%'
838            );
839            CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
840                SELECT e.* FROM main.events e
841                JOIN main.invocations i ON e.invocation_id = i.id
842                WHERE i.cwd LIKE '{}%'
843            );
844            "#,
845            cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
846        );
847
848        conn.execute_batch(&macros)?;
849        Ok(())
850    }
851
852    /// Manually attach a specific remote.
853    pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
854        // Set up credentials
855        if let Some(provider) = &remote.credential_provider {
856            if remote.remote_type == crate::config::RemoteType::S3 {
857                let secret_sql = format!(
858                    "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
859                    remote.name, provider
860                );
861                conn.execute(&secret_sql, [])?;
862            }
863        }
864
865        // Attach
866        conn.execute(&remote.attach_sql(), [])?;
867        Ok(())
868    }
869
870    /// Detach a remote.
871    pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
872        conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
873        Ok(())
874    }
875
876    /// Test connection to a remote. Returns Ok if successful.
877    pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
878        let conn = self.connection_with_options(false)?;
879        self.attach_remote(&conn, remote)?;
880
881        // Try a simple query
882        let test_sql = format!(
883            "SELECT 1 FROM {}.invocations LIMIT 1",
884            remote.quoted_schema_name()
885        );
886        conn.execute(&test_sql, [])?;
887
888        Ok(())
889    }
890
891    /// Get config reference.
892    pub fn config(&self) -> &Config {
893        &self.config
894    }
895
896    /// Query the store using SQL.
897    ///
898    /// Returns results as a Vec of rows, where each row is a Vec of string values.
899    pub fn query(&self, sql: &str) -> Result<QueryResult> {
900        let conn = self.connection()?;
901        let mut stmt = conn.prepare(sql)?;
902
903        // Execute the query first to get column info
904        let mut rows_iter = stmt.query([])?;
905
906        // Get column info from the rows iterator
907        let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
908        let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
909            (0..column_count)
910                .map(|i| {
911                    row_ref
912                        .column_name(i)
913                        .map(|s| s.to_string())
914                        .unwrap_or_else(|_| format!("col{}", i))
915                })
916                .collect()
917        } else {
918            Vec::new()
919        };
920
921        // Collect all rows
922        let mut result_rows = Vec::new();
923        while let Some(row) = rows_iter.next()? {
924            let mut values = Vec::with_capacity(column_count);
925            for i in 0..column_count {
926                // Get value as generic ValueRef and convert to string
927                let value = match row.get_ref(i)? {
928                    ValueRef::Null => "NULL".to_string(),
929                    ValueRef::Boolean(b) => b.to_string(),
930                    ValueRef::TinyInt(n) => n.to_string(),
931                    ValueRef::SmallInt(n) => n.to_string(),
932                    ValueRef::Int(n) => n.to_string(),
933                    ValueRef::BigInt(n) => n.to_string(),
934                    ValueRef::HugeInt(n) => n.to_string(),
935                    ValueRef::UTinyInt(n) => n.to_string(),
936                    ValueRef::USmallInt(n) => n.to_string(),
937                    ValueRef::UInt(n) => n.to_string(),
938                    ValueRef::UBigInt(n) => n.to_string(),
939                    ValueRef::Float(f) => f.to_string(),
940                    ValueRef::Double(f) => f.to_string(),
941                    ValueRef::Decimal(d) => d.to_string(),
942                    ValueRef::Timestamp(unit, val) => {
943                        // Convert to microseconds then to DateTime
944                        let micros = match unit {
945                            TimeUnit::Second => val * 1_000_000,
946                            TimeUnit::Millisecond => val * 1_000,
947                            TimeUnit::Microsecond => val,
948                            TimeUnit::Nanosecond => val / 1_000,
949                        };
950                        DateTime::<Utc>::from_timestamp_micros(micros)
951                            .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
952                            .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
953                    }
954                    ValueRef::Date32(days) => {
955                        // Days since 1970-01-01
956                        NaiveDate::from_ymd_opt(1970, 1, 1)
957                            .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
958                            .map(|d| d.format("%Y-%m-%d").to_string())
959                            .unwrap_or_else(|| format!("<invalid date {}>", days))
960                    }
961                    ValueRef::Time64(unit, val) => {
962                        // Convert to microseconds then to NaiveTime
963                        let micros = match unit {
964                            TimeUnit::Second => val * 1_000_000,
965                            TimeUnit::Millisecond => val * 1_000,
966                            TimeUnit::Microsecond => val,
967                            TimeUnit::Nanosecond => val / 1_000,
968                        };
969                        let secs = (micros / 1_000_000) as u32;
970                        let micro_part = (micros % 1_000_000) as u32;
971                        NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
972                            .map(|t| t.format("%H:%M:%S").to_string())
973                            .unwrap_or_else(|| format!("<invalid time {}>", val))
974                    }
975                    ValueRef::Interval { months, days, nanos } => {
976                        format!("{} months {} days {} ns", months, days, nanos)
977                    }
978                    ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
979                    ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
980                    other => {
981                        // Convert to owned Value for complex types (List, Array, Map, Struct)
982                        let owned: Value = other.into();
983                        format_value(&owned)
984                    }
985                };
986                values.push(value);
987            }
988            result_rows.push(values);
989        }
990
991        Ok(QueryResult {
992            columns: column_names,
993            rows: result_rows,
994        })
995    }
996
997    /// Get the last invocation with its output (if any).
998    pub fn last_invocation_with_output(
999        &self,
1000    ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1001        if let Some(inv) = self.last_invocation()? {
1002            let output = self.get_output(&inv.id)?;
1003            Ok(Some((inv, output)))
1004        } else {
1005            Ok(None)
1006        }
1007    }
1008
1009    /// Write a batch of related records atomically.
1010    ///
1011    /// This is the preferred way to write an invocation with its outputs,
1012    /// session, and events together. In DuckDB mode, all writes are wrapped
1013    /// in a transaction. In Parquet mode, files are written atomically.
1014    pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1015        let invocation = batch
1016            .invocation
1017            .as_ref()
1018            .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1019
1020        match self.config.storage_mode {
1021            StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1022            StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1023        }
1024    }
1025
1026    /// Write batch using Parquet files (multi-writer safe).
1027    fn write_batch_parquet(
1028        &self,
1029        batch: &InvocationBatch,
1030        invocation: &InvocationRecord,
1031    ) -> Result<()> {
1032        // For Parquet mode, we write each record type separately.
1033        // Atomicity is per-file (temp + rename), but not across files.
1034        // This is acceptable because Parquet mode prioritizes concurrent writes.
1035
1036        // Write session first (if provided and not already registered)
1037        if let Some(ref session) = batch.session {
1038            self.ensure_session(session)?;
1039        }
1040
1041        // Write invocation
1042        self.write_invocation(invocation)?;
1043
1044        let date = invocation.date();
1045        let inv_id = invocation.id;
1046
1047        // Write outputs
1048        for (stream, content) in &batch.outputs {
1049            self.store_output(
1050                inv_id,
1051                stream,
1052                content,
1053                date,
1054                invocation.executable.as_deref(),
1055            )?;
1056        }
1057
1058        // Write events (if provided)
1059        if let Some(ref events) = batch.events {
1060            if !events.is_empty() {
1061                self.write_events(events)?;
1062            }
1063        }
1064
1065        Ok(())
1066    }
1067
1068    /// Write batch using DuckDB tables with transaction.
1069    fn write_batch_duckdb(
1070        &self,
1071        batch: &InvocationBatch,
1072        invocation: &InvocationRecord,
1073    ) -> Result<()> {
1074        let conn = self.connection()?;
1075
1076        // Begin transaction
1077        conn.execute("BEGIN TRANSACTION", [])?;
1078
1079        let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1080
1081        match result {
1082            Ok(()) => {
1083                conn.execute("COMMIT", [])?;
1084                Ok(())
1085            }
1086            Err(e) => {
1087                // Rollback on error
1088                let _ = conn.execute("ROLLBACK", []);
1089                Err(e)
1090            }
1091        }
1092    }
1093
1094    /// Inner implementation for DuckDB batch write (within transaction).
1095    fn write_batch_duckdb_inner(
1096        &self,
1097        conn: &Connection,
1098        batch: &InvocationBatch,
1099        invocation: &InvocationRecord,
1100    ) -> Result<()> {
1101        use base64::Engine;
1102
1103        let date = invocation.date();
1104        let inv_id = invocation.id;
1105
1106        // Write session (if provided)
1107        if let Some(ref session) = batch.session {
1108            // Check if session exists
1109            let exists: i64 = conn
1110                .query_row(
1111                    "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1112                    params![&session.session_id],
1113                    |row| row.get(0),
1114                )
1115                .unwrap_or(0);
1116
1117            if exists == 0 {
1118                conn.execute(
1119                    r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1120                    params![
1121                        session.session_id,
1122                        session.client_id,
1123                        session.invoker,
1124                        session.invoker_pid,
1125                        session.invoker_type,
1126                        session.registered_at.to_rfc3339(),
1127                        session.cwd,
1128                        session.date.to_string(),
1129                    ],
1130                )?;
1131            }
1132        }
1133
1134        // Write invocation
1135        conn.execute(
1136            r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1137            params![
1138                invocation.id.to_string(),
1139                invocation.session_id,
1140                invocation.timestamp.to_rfc3339(),
1141                invocation.duration_ms,
1142                invocation.cwd,
1143                invocation.cmd,
1144                invocation.executable,
1145                invocation.exit_code,
1146                invocation.format_hint,
1147                invocation.client_id,
1148                invocation.hostname,
1149                invocation.username,
1150                invocation.tag,
1151                date.to_string(),
1152            ],
1153        )?;
1154
1155        // Write outputs
1156        for (stream, content) in &batch.outputs {
1157            // Compute hash
1158            let hash = blake3::hash(content);
1159            let hash_hex = hash.to_hex().to_string();
1160
1161            // Route by size
1162            let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1163                // Inline: use data: URL
1164                let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1165                let data_url = format!("data:application/octet-stream;base64,{}", b64);
1166                ("inline".to_string(), data_url)
1167            } else {
1168                // Blob: write file and register
1169                let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1170                let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1171
1172                if let Some(parent) = blob_path.parent() {
1173                    fs::create_dir_all(parent)?;
1174                }
1175
1176                let rel_path = blob_path
1177                    .strip_prefix(self.config.data_dir())
1178                    .map(|p| p.to_string_lossy().to_string())
1179                    .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1180
1181                // Write blob atomically
1182                let wrote_new = atomic::write_file(&blob_path, content)?;
1183
1184                if wrote_new {
1185                    conn.execute(
1186                        "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1187                        params![&hash_hex, content.len() as i64, &rel_path],
1188                    )?;
1189                } else {
1190                    conn.execute(
1191                        "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1192                        params![&hash_hex],
1193                    )?;
1194                }
1195
1196                ("blob".to_string(), format!("file://{}", rel_path))
1197            };
1198
1199            // Write output record
1200            let output_id = uuid::Uuid::now_v7();
1201            conn.execute(
1202                r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1203                params![
1204                    output_id.to_string(),
1205                    inv_id.to_string(),
1206                    stream,
1207                    hash_hex,
1208                    content.len() as i64,
1209                    storage_type,
1210                    storage_ref,
1211                    Option::<String>::None, // content_type
1212                    date.to_string(),
1213                ],
1214            )?;
1215        }
1216
1217        // Write events (if provided)
1218        if let Some(ref events) = batch.events {
1219            for event in events {
1220                conn.execute(
1221                    r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1222                    params![
1223                        event.id.to_string(),
1224                        event.invocation_id.to_string(),
1225                        event.client_id,
1226                        event.hostname,
1227                        event.event_type,
1228                        event.severity,
1229                        event.ref_file,
1230                        event.ref_line,
1231                        event.ref_column,
1232                        event.message,
1233                        event.error_code,
1234                        event.test_name,
1235                        event.status,
1236                        event.format_used,
1237                        event.date.to_string(),
1238                    ],
1239                )?;
1240            }
1241        }
1242
1243        Ok(())
1244    }
1245
1246    /// Load format hints from the config file.
1247    pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1248        let path = self.config.format_hints_path();
1249
1250        // Try new format-hints.toml first
1251        if path.exists() {
1252            return crate::FormatHints::load(&path);
1253        }
1254
1255        // Fall back to legacy event-formats.toml
1256        let legacy_path = self.config.event_formats_path();
1257        if legacy_path.exists() {
1258            return crate::FormatHints::load(&legacy_path);
1259        }
1260
1261        Ok(crate::FormatHints::new())
1262    }
1263
1264    /// Save format hints to the config file.
1265    pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1266        hints.save(&self.config.format_hints_path())
1267    }
1268
1269    /// Detect format for a command using format hints.
1270    ///
1271    /// Priority:
1272    /// 1. User-defined format hints (by priority)
1273    /// 2. Default format from config (or "auto")
1274    ///
1275    /// Note: duck_hunt detects formats from content analysis, not command names.
1276    /// Use format hints to map commands to formats, then duck_hunt parses the output.
1277    pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1278        let hints = self.load_format_hints()?;
1279        Ok(hints.detect(cmd).to_string())
1280    }
1281
1282    /// Get list of duck_hunt built-in formats.
1283    ///
1284    /// Note: duck_hunt detects formats from content analysis, not command patterns.
1285    /// This lists available format names that can be used with duck_hunt parsing.
1286    pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1287        let conn = self.connection()?;
1288
1289        let mut stmt = conn.prepare(
1290            "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1291        )?;
1292
1293        let rows = stmt.query_map([], |row| {
1294            Ok(BuiltinFormat {
1295                format: row.get(0)?,
1296                pattern: row.get::<_, String>(1)?, // description as "pattern" for display
1297                priority: row.get(2)?,
1298            })
1299        })?;
1300
1301        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1302        Ok(results)
1303    }
1304
1305    /// Check which format would be detected for a command.
1306    /// Returns the format name and source (user-defined or default).
1307    ///
1308    /// Note: duck_hunt detects formats from content, not command names.
1309    /// This only checks user-defined format hints.
1310    pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1311        let hints = self.load_format_hints()?;
1312
1313        // Check user-defined hints
1314        for hint in hints.hints() {
1315            if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1316                return Ok(FormatMatch {
1317                    format: hint.format.clone(),
1318                    source: FormatSource::UserDefined {
1319                        pattern: hint.pattern.clone(),
1320                        priority: hint.priority,
1321                    },
1322                });
1323            }
1324        }
1325
1326        // No match - use default
1327        Ok(FormatMatch {
1328            format: hints.default_format().to_string(),
1329            source: FormatSource::Default,
1330        })
1331    }
1332}
1333
1334/// A built-in format from duck_hunt.
1335#[derive(Debug, Clone)]
1336pub struct BuiltinFormat {
1337    pub format: String,
1338    pub pattern: String,
1339    pub priority: i32,
1340}
1341
1342/// Result of format detection.
1343#[derive(Debug, Clone)]
1344pub struct FormatMatch {
1345    pub format: String,
1346    pub source: FormatSource,
1347}
1348
1349/// Source of a format match.
1350#[derive(Debug, Clone)]
1351pub enum FormatSource {
1352    UserDefined { pattern: String, priority: i32 },
1353    Builtin { pattern: String, priority: i32 },
1354    Default,
1355}
1356
1357/// Result of a SQL query.
1358#[derive(Debug)]
1359pub struct QueryResult {
1360    pub columns: Vec<String>,
1361    pub rows: Vec<Vec<String>>,
1362}
1363
1364/// Sanitize a string for use in filenames.
1365fn sanitize_filename(s: &str) -> String {
1366    s.chars()
1367        .map(|c| match c {
1368            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1369            ' ' => '-',
1370            c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1371            _ => '_',
1372        })
1373        .take(64)
1374        .collect()
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use super::*;
1380    use crate::init::initialize;
1381    use crate::schema::SessionRecord;
1382    use tempfile::TempDir;
1383
1384    fn setup_store() -> (TempDir, Store) {
1385        let tmp = TempDir::new().unwrap();
1386        let config = Config::with_root(tmp.path());
1387        initialize(&config).unwrap();
1388        let store = Store::open(config).unwrap();
1389        (tmp, store)
1390    }
1391
1392    fn setup_store_duckdb() -> (TempDir, Store) {
1393        let tmp = TempDir::new().unwrap();
1394        let config = Config::with_duckdb_mode(tmp.path());
1395        initialize(&config).unwrap();
1396        let store = Store::open(config).unwrap();
1397        (tmp, store)
1398    }
1399
1400    #[test]
1401    fn test_store_open_uninitialized_fails() {
1402        let tmp = TempDir::new().unwrap();
1403        let config = Config::with_root(tmp.path());
1404
1405        let result = Store::open(config);
1406        assert!(matches!(result, Err(Error::NotInitialized(_))));
1407    }
1408
1409    #[test]
1410    fn test_sanitize_filename() {
1411        assert_eq!(sanitize_filename("make test"), "make-test");
1412        assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1413        assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1414    }
1415
1416    // Batch write tests - Parquet mode
1417
1418    #[test]
1419    fn test_batch_write_parquet_invocation_only() {
1420        let (_tmp, store) = setup_store();
1421
1422        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1423
1424        let batch = InvocationBatch::new(inv);
1425        store.write_batch(&batch).unwrap();
1426
1427        assert_eq!(store.invocation_count().unwrap(), 1);
1428    }
1429
1430    #[test]
1431    fn test_batch_write_parquet_with_output() {
1432        let (_tmp, store) = setup_store();
1433
1434        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1435        let inv_id = inv.id;
1436
1437        let batch = InvocationBatch::new(inv)
1438            .with_output("stdout", b"hello world\n".to_vec());
1439
1440        store.write_batch(&batch).unwrap();
1441
1442        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1443        assert_eq!(outputs.len(), 1);
1444        assert_eq!(outputs[0].stream, "stdout");
1445    }
1446
1447    #[test]
1448    fn test_batch_write_parquet_with_session() {
1449        let (_tmp, store) = setup_store();
1450
1451        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1452        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1453
1454        let batch = InvocationBatch::new(inv).with_session(session);
1455        store.write_batch(&batch).unwrap();
1456
1457        assert!(store.session_exists("test-session").unwrap());
1458    }
1459
1460    #[test]
1461    fn test_batch_write_parquet_full() {
1462        let (_tmp, store) = setup_store();
1463
1464        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1465        let inv_id = inv.id;
1466        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1467
1468        let batch = InvocationBatch::new(inv)
1469            .with_session(session)
1470            .with_output("stdout", b"Building...\n".to_vec())
1471            .with_output("stderr", b"error: failed\n".to_vec());
1472
1473        store.write_batch(&batch).unwrap();
1474
1475        assert_eq!(store.invocation_count().unwrap(), 1);
1476        assert!(store.session_exists("test-session").unwrap());
1477
1478        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1479        assert_eq!(outputs.len(), 2);
1480    }
1481
1482    // Batch write tests - DuckDB mode
1483
1484    #[test]
1485    fn test_batch_write_duckdb_invocation_only() {
1486        let (_tmp, store) = setup_store_duckdb();
1487
1488        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1489
1490        let batch = InvocationBatch::new(inv);
1491        store.write_batch(&batch).unwrap();
1492
1493        assert_eq!(store.invocation_count().unwrap(), 1);
1494    }
1495
1496    #[test]
1497    fn test_batch_write_duckdb_with_output() {
1498        let (_tmp, store) = setup_store_duckdb();
1499
1500        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1501        let inv_id = inv.id;
1502
1503        let batch = InvocationBatch::new(inv)
1504            .with_output("stdout", b"hello world\n".to_vec());
1505
1506        store.write_batch(&batch).unwrap();
1507
1508        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1509        assert_eq!(outputs.len(), 1);
1510        assert_eq!(outputs[0].stream, "stdout");
1511    }
1512
1513    #[test]
1514    fn test_batch_write_duckdb_with_session() {
1515        let (_tmp, store) = setup_store_duckdb();
1516
1517        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1518        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1519
1520        let batch = InvocationBatch::new(inv).with_session(session);
1521        store.write_batch(&batch).unwrap();
1522
1523        assert!(store.session_exists("test-session").unwrap());
1524    }
1525
1526    #[test]
1527    fn test_batch_write_duckdb_full() {
1528        let (_tmp, store) = setup_store_duckdb();
1529
1530        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1531        let inv_id = inv.id;
1532        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1533
1534        let batch = InvocationBatch::new(inv)
1535            .with_session(session)
1536            .with_output("stdout", b"Building...\n".to_vec())
1537            .with_output("stderr", b"error: failed\n".to_vec());
1538
1539        store.write_batch(&batch).unwrap();
1540
1541        assert_eq!(store.invocation_count().unwrap(), 1);
1542        assert!(store.session_exists("test-session").unwrap());
1543
1544        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1545        assert_eq!(outputs.len(), 2);
1546    }
1547
1548    #[test]
1549    fn test_batch_requires_invocation() {
1550        let (_tmp, store) = setup_store();
1551
1552        let batch = InvocationBatch::default();
1553        let result = store.write_batch(&batch);
1554
1555        assert!(result.is_err());
1556    }
1557
1558    // Extension loading tests
1559
1560    #[test]
1561    fn test_ensure_extension_parquet() {
1562        // Parquet is an official extension, should always be available
1563        let conn = duckdb::Connection::open_in_memory().unwrap();
1564        let result = ensure_extension(&conn, "parquet").unwrap();
1565        assert!(result, "parquet extension should be loadable");
1566    }
1567
1568    #[test]
1569    fn test_ensure_extension_icu() {
1570        // ICU is an official extension, should always be available
1571        let conn = duckdb::Connection::open_in_memory().unwrap();
1572        let result = ensure_extension(&conn, "icu").unwrap();
1573        assert!(result, "icu extension should be loadable");
1574    }
1575
1576    #[test]
1577    fn test_ensure_extension_community() {
1578        // Community extensions require allow_community_extensions
1579        let conn = duckdb::Connection::open_in_memory().unwrap();
1580        conn.execute("SET allow_community_extensions = true", []).unwrap();
1581
1582        // scalarfs and duck_hunt are community extensions
1583        let result = ensure_extension(&conn, "scalarfs").unwrap();
1584        assert!(result, "scalarfs extension should be loadable from community");
1585
1586        let result = ensure_extension(&conn, "duck_hunt").unwrap();
1587        assert!(result, "duck_hunt extension should be loadable from community");
1588    }
1589
1590    #[test]
1591    fn test_ensure_extension_nonexistent() {
1592        let conn = duckdb::Connection::open_in_memory().unwrap();
1593        conn.execute("SET allow_community_extensions = true", []).unwrap();
1594
1595        // A made-up extension should return false (not error)
1596        let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1597        assert!(!result, "nonexistent extension should return false");
1598    }
1599
1600    #[test]
1601    fn test_extension_loading_is_cached() {
1602        // Once installed, extensions should load quickly from cache
1603        let conn = duckdb::Connection::open_in_memory().unwrap();
1604
1605        // First load might install
1606        ensure_extension(&conn, "parquet").unwrap();
1607
1608        // Second load should be fast (from cache)
1609        let start = std::time::Instant::now();
1610        ensure_extension(&conn, "parquet").unwrap();
1611        let elapsed = start.elapsed();
1612
1613        // Should be very fast if cached (< 100ms)
1614        assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1615    }
1616}