Skip to main content

magic_bird/store/
mod.rs

1//! Store - handles writing and reading records.
2//!
3//! Uses DuckDB to write Parquet files and query across them.
4
5mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod remote;
11mod sessions;
12
13use std::fs;
14use std::thread;
15use std::time::Duration;
16
17use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
18use duckdb::{
19    params,
20    types::{TimeUnit, Value, ValueRef},
21    Connection,
22};
23
24use crate::config::StorageMode;
25use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
26use crate::{Config, Error, Result};
27
28/// Format a DuckDB Value to a human-readable string.
29/// Handles complex types like List, Array, Map, and Struct recursively.
30fn format_value(value: &Value) -> String {
31    match value {
32        Value::Null => "NULL".to_string(),
33        Value::Boolean(b) => b.to_string(),
34        Value::TinyInt(n) => n.to_string(),
35        Value::SmallInt(n) => n.to_string(),
36        Value::Int(n) => n.to_string(),
37        Value::BigInt(n) => n.to_string(),
38        Value::HugeInt(n) => n.to_string(),
39        Value::UTinyInt(n) => n.to_string(),
40        Value::USmallInt(n) => n.to_string(),
41        Value::UInt(n) => n.to_string(),
42        Value::UBigInt(n) => n.to_string(),
43        Value::Float(f) => f.to_string(),
44        Value::Double(f) => f.to_string(),
45        Value::Decimal(d) => d.to_string(),
46        Value::Timestamp(_, micros) => {
47            DateTime::<Utc>::from_timestamp_micros(*micros)
48                .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
49                .unwrap_or_else(|| format!("<timestamp {}>", micros))
50        }
51        Value::Text(s) => s.clone(),
52        Value::Blob(b) => format!("<blob {} bytes>", b.len()),
53        Value::Date32(days) => {
54            NaiveDate::from_ymd_opt(1970, 1, 1)
55                .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
56                .map(|d| d.format("%Y-%m-%d").to_string())
57                .unwrap_or_else(|| format!("<date {}>", days))
58        }
59        Value::Time64(_, micros) => {
60            let secs = (*micros / 1_000_000) as u32;
61            let micro_part = (*micros % 1_000_000) as u32;
62            NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
63                .map(|t| t.format("%H:%M:%S").to_string())
64                .unwrap_or_else(|| format!("<time {}>", micros))
65        }
66        Value::Interval { months, days, nanos } => {
67            format!("{} months {} days {} ns", months, days, nanos)
68        }
69        // Complex types
70        Value::List(items) => {
71            let formatted: Vec<String> = items.iter().map(format_value).collect();
72            format!("[{}]", formatted.join(", "))
73        }
74        Value::Array(items) => {
75            let formatted: Vec<String> = items.iter().map(format_value).collect();
76            format!("[{}]", formatted.join(", "))
77        }
78        Value::Map(map) => {
79            let formatted: Vec<String> = map
80                .iter()
81                .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
82                .collect();
83            format!("{{{}}}", formatted.join(", "))
84        }
85        Value::Struct(fields) => {
86            let formatted: Vec<String> = fields
87                .iter()
88                .map(|(k, v)| format!("{}: {}", k, format_value(v)))
89                .collect();
90            format!("{{{}}}", formatted.join(", "))
91        }
92        Value::Enum(s) => s.clone(),
93        _ => "<unknown>".to_string(),
94    }
95}
96
97// Re-export types from submodules
98pub use compact::{ArchiveStats, AutoCompactOptions, CompactOptions, CompactStats};
99pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
100pub use invocations::InvocationSummary;
101pub use outputs::OutputInfo;
102pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
103
104// Re-export format detection types (defined below)
105// BuiltinFormat, FormatMatch, FormatSource are defined at the bottom of this file
106
107/// A batch of related records to write atomically.
108///
109/// Use this when you want to write an invocation along with its outputs,
110/// session, and/or events in a single transaction.
111#[derive(Debug, Default)]
112pub struct InvocationBatch {
113    /// The invocation record (required).
114    pub invocation: Option<InvocationRecord>,
115
116    /// Output streams with their content: (stream_name, content).
117    /// Common streams: "stdout", "stderr", "combined".
118    pub outputs: Vec<(String, Vec<u8>)>,
119
120    /// Session record (optional, created if not already registered).
121    pub session: Option<SessionRecord>,
122
123    /// Pre-extracted events (optional).
124    pub events: Option<Vec<EventRecord>>,
125}
126
127impl InvocationBatch {
128    /// Create a new batch with an invocation.
129    pub fn new(invocation: InvocationRecord) -> Self {
130        Self {
131            invocation: Some(invocation),
132            outputs: Vec::new(),
133            session: None,
134            events: None,
135        }
136    }
137
138    /// Add an output stream.
139    pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
140        self.outputs.push((stream.into(), content));
141        self
142    }
143
144    /// Add a session record.
145    pub fn with_session(mut self, session: SessionRecord) -> Self {
146        self.session = Some(session);
147        self
148    }
149
150    /// Add pre-extracted events.
151    pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
152        self.events = Some(events);
153        self
154    }
155}
156
157/// Options for creating a database connection.
158///
159/// Controls what gets loaded and attached when opening a connection.
160/// Use this for explicit control over connection behavior.
161///
162/// # Connection Stages
163///
164/// 1. **Extensions** (always): parquet, icu, scalarfs, duck_hunt
165/// 2. **Blob resolution** (always): S3 credentials, blob_roots macro
166/// 3. **Migration** (optional): Upgrade existing installations to new schema
167/// 4. **Remotes** (optional): Attach remote databases, rebuild remotes.* views
168/// 5. **Project** (optional): Attach project-local database if in a project
169/// 6. **CWD views** (optional): Rebuild cwd.* views for current directory
170#[derive(Debug, Clone, Default)]
171pub struct ConnectionOptions {
172    /// Attach configured remotes (default: true).
173    /// When true, remote databases are attached and remotes.* views are rebuilt
174    /// to include the attached data.
175    pub attach_remotes: bool,
176
177    /// Attach project database if in a project directory (default: true).
178    pub attach_project: bool,
179
180    /// Rebuild cwd.* views for current working directory (default: true).
181    /// These views filter main.* data to entries matching the current directory.
182    pub create_ephemeral_views: bool,
183
184    /// Run migration for existing installations (default: false).
185    /// Only enable this for explicit upgrade operations.
186    pub run_migration: bool,
187}
188
189impl ConnectionOptions {
190    /// Create options for a full connection (default behavior).
191    pub fn full() -> Self {
192        Self {
193            attach_remotes: true,
194            attach_project: true,
195            create_ephemeral_views: true,
196            run_migration: false,
197        }
198    }
199
200    /// Create options for a minimal connection (no attachments).
201    /// Useful for write operations that don't need remote data.
202    pub fn minimal() -> Self {
203        Self {
204            attach_remotes: false,
205            attach_project: false,
206            create_ephemeral_views: false,
207            run_migration: false,
208        }
209    }
210
211    /// Create options for a migration/upgrade connection.
212    pub fn for_migration() -> Self {
213        Self {
214            attach_remotes: false,
215            attach_project: false,
216            create_ephemeral_views: false,
217            run_migration: true,
218        }
219    }
220}
221
222/// Ensure a DuckDB extension is loaded, installing if necessary.
223///
224/// Attempts in order:
225/// 1. LOAD (extension might already be available)
226/// 2. INSTALL from default repository, then LOAD
227/// 3. INSTALL FROM community, then LOAD
228///
229/// Returns Ok(true) if loaded successfully, Ok(false) if extension unavailable.
230fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
231    // Try loading directly first (already installed/cached)
232    if conn.execute(&format!("LOAD {}", name), []).is_ok() {
233        return Ok(true);
234    }
235
236    // Try installing from default repository
237    if conn.execute(&format!("INSTALL {}", name), []).is_ok() {
238        if conn.execute(&format!("LOAD {}", name), []).is_ok() {
239            return Ok(true);
240        }
241    }
242
243    // Try installing from community repository
244    if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok() {
245        if conn.execute(&format!("LOAD {}", name), []).is_ok() {
246            return Ok(true);
247        }
248    }
249
250    Ok(false)
251}
252
253/// A BIRD store for reading and writing records.
254pub struct Store {
255    config: Config,
256}
257
258impl Store {
259    /// Open an existing BIRD store.
260    pub fn open(config: Config) -> Result<Self> {
261        if !config.db_path().exists() {
262            return Err(Error::NotInitialized(config.bird_root.clone()));
263        }
264        Ok(Self { config })
265    }
266
267    /// Open a DuckDB connection with retry and exponential backoff.
268    ///
269    /// DuckDB uses file locking for concurrent access. When multiple processes
270    /// (e.g., background shell hook saves) try to access the database simultaneously,
271    /// this method retries with exponential backoff to avoid lock conflicts.
272    fn open_connection_with_retry(&self) -> Result<Connection> {
273        const MAX_RETRIES: u32 = 10;
274        const INITIAL_DELAY_MS: u64 = 10;
275        const MAX_DELAY_MS: u64 = 1000;
276
277        let db_path = self.config.db_path();
278        let mut delay_ms = INITIAL_DELAY_MS;
279        let mut last_error = None;
280
281        for attempt in 0..MAX_RETRIES {
282            match Connection::open(&db_path) {
283                Ok(conn) => return Ok(conn),
284                Err(e) => {
285                    let err_msg = e.to_string();
286                    // Check if this is a lock conflict error
287                    if err_msg.contains("Could not set lock")
288                        || err_msg.contains("Conflicting lock")
289                        || err_msg.contains("database is locked")
290                    {
291                        last_error = Some(e);
292                        if attempt < MAX_RETRIES - 1 {
293                            // Add jitter to avoid thundering herd
294                            let jitter = (attempt as u64 * 7) % 10;
295                            thread::sleep(Duration::from_millis(delay_ms + jitter));
296                            delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
297                            continue;
298                        }
299                    } else {
300                        // Non-lock error, fail immediately
301                        return Err(e.into());
302                    }
303                }
304            }
305        }
306
307        // All retries exhausted
308        Err(last_error
309            .map(|e| e.into())
310            .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
311    }
312
313    /// Get a DuckDB connection with full features (attachments, ephemeral views).
314    pub fn connection(&self) -> Result<Connection> {
315        self.connect(ConnectionOptions::full())
316    }
317
318    /// Get a DuckDB connection with optional remote attachment (legacy API).
319    pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
320        let opts = if attach_remotes {
321            ConnectionOptions::full()
322        } else {
323            ConnectionOptions::minimal()
324        };
325        self.connect(opts)
326    }
327
328    /// Get a DuckDB connection with explicit options.
329    ///
330    /// This is the main connection method. Use `ConnectionOptions` to control:
331    /// - Whether remotes are attached
332    /// - Whether project database is attached
333    /// - Whether ephemeral views are created
334    /// - Whether migration should run
335    ///
336    /// Uses retry with exponential backoff to handle concurrent access.
337    pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
338        let conn = self.open_connection_with_retry()?;
339
340        // ===== Load required extensions =====
341        // Uses default extension directory (typically ~/.duckdb/extensions)
342        // Falls back to community repository if not in default
343        conn.execute("SET allow_community_extensions = true", [])?;
344
345        for ext in ["parquet", "icu"] {
346            if !ensure_extension(&conn, ext)? {
347                return Err(Error::Extension(format!(
348                    "Required extension '{}' could not be loaded",
349                    ext
350                )));
351            }
352        }
353
354        // Optional community extensions - warn if missing
355        for (ext, desc) in [
356            ("scalarfs", "data: URL support for inline blobs"),
357            ("duck_hunt", "log/output parsing for event extraction"),
358        ] {
359            if !ensure_extension(&conn, ext)? {
360                eprintln!("Warning: {} extension not available ({})", ext, desc);
361            }
362        }
363
364        // Set file search path so views resolve relative paths correctly
365        conn.execute(
366            &format!(
367                "SET file_search_path = '{}'",
368                self.config.data_dir().display()
369            ),
370            [],
371        )?;
372
373        // ===== Optional: Run migration for existing installations =====
374        if opts.run_migration {
375            self.migrate_to_new_schema(&conn)?;
376        }
377
378        // ===== Always set up blob resolution =====
379        // S3 credentials needed before blob_roots is used
380        self.setup_s3_credentials(&conn)?;
381        self.setup_blob_resolution(&conn)?;
382
383        // ===== Optional: Attach remotes and create access macros =====
384        if opts.attach_remotes && !self.config.remotes.is_empty() {
385            self.attach_remotes(&conn)?;
386            self.create_remote_macros(&conn)?;
387        }
388
389        // ===== Optional: Attach project database =====
390        if opts.attach_project {
391            self.attach_project_db(&conn)?;
392        }
393
394        // ===== Optional: Create cwd macros =====
395        // These TEMPORARY macros filter by current working directory
396        if opts.create_ephemeral_views {
397            self.create_cwd_macros(&conn)?;
398        }
399
400        Ok(conn)
401    }
402
403    /// Attach project-level `.bird/` database if we're in a project directory.
404    ///
405    /// The project database is attached as read-only under schema "project".
406    /// This allows queries like `SELECT * FROM project.invocations`.
407    fn attach_project_db(&self, conn: &Connection) -> Result<()> {
408        use crate::project::find_current_project;
409
410        let Some(project) = find_current_project() else {
411            return Ok(()); // Not in a project
412        };
413
414        if !project.is_initialized() {
415            return Ok(()); // Project not initialized
416        }
417
418        // Don't attach if project DB is the same as user DB
419        if project.db_path == self.config.db_path() {
420            return Ok(());
421        }
422
423        // Attach as read-only
424        let attach_sql = format!(
425            "ATTACH '{}' AS project (READ_ONLY)",
426            project.db_path.display()
427        );
428
429        if let Err(e) = conn.execute(&attach_sql, []) {
430            // Log but don't fail - project DB might be locked or inaccessible
431            eprintln!("Note: Could not attach project database: {}", e);
432        }
433
434        Ok(())
435    }
436
437    /// Migrate existing installations to the new schema architecture.
438    ///
439    /// Checks if the `local` schema exists; if not, creates the new schema
440    /// structure and migrates data from old `*_table` tables.
441    fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
442        // Check if already migrated (local schema exists)
443        let local_exists: bool = conn
444            .query_row(
445                "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
446                [],
447                |row| row.get(0),
448            )
449            .unwrap_or(false);
450
451        if local_exists {
452            return Ok(());
453        }
454
455        // This is an old installation - need to migrate
456        // For now, just create the new schemas. Data migration would require
457        // moving data from old tables/views to new structure.
458        // TODO: Implement full data migration if needed
459
460        eprintln!("Note: Migrating to new schema architecture...");
461
462        // Create core schemas
463        conn.execute_batch(
464            r#"
465            CREATE SCHEMA IF NOT EXISTS local;
466            CREATE SCHEMA IF NOT EXISTS cached_placeholder;
467            CREATE SCHEMA IF NOT EXISTS remote_placeholder;
468            CREATE SCHEMA IF NOT EXISTS caches;
469            CREATE SCHEMA IF NOT EXISTS remotes;
470            CREATE SCHEMA IF NOT EXISTS unified;
471            CREATE SCHEMA IF NOT EXISTS cwd;
472            "#,
473        )?;
474
475        // For DuckDB mode, create local tables
476        if self.config.storage_mode == crate::StorageMode::DuckDB {
477            conn.execute_batch(
478                r#"
479                CREATE TABLE IF NOT EXISTS local.sessions (
480                    session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
481                    invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
482                );
483                CREATE TABLE IF NOT EXISTS local.invocations (
484                    id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
485                    cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
486                    format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
487                    tag VARCHAR, date DATE
488                );
489                CREATE TABLE IF NOT EXISTS local.outputs (
490                    id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
491                    byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
492                    content_type VARCHAR, date DATE
493                );
494                CREATE TABLE IF NOT EXISTS local.events (
495                    id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
496                    event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
497                    ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
498                    status VARCHAR, format_used VARCHAR, date DATE
499                );
500                "#,
501            )?;
502
503            // Copy data from old tables if they exist
504            let old_tables_exist: bool = conn
505                .query_row(
506                    "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
507                    [],
508                    |row| row.get(0),
509                )
510                .unwrap_or(false);
511
512            if old_tables_exist {
513                conn.execute_batch(
514                    r#"
515                    INSERT INTO local.sessions SELECT * FROM sessions_table;
516                    INSERT INTO local.invocations SELECT * FROM invocations_table;
517                    INSERT INTO local.outputs SELECT * FROM outputs_table;
518                    INSERT INTO local.events SELECT * FROM events_table;
519                    "#,
520                )?;
521            }
522        } else {
523            // Parquet mode - create views over parquet files
524            conn.execute_batch(
525                r#"
526                CREATE OR REPLACE VIEW local.sessions AS
527                SELECT * EXCLUDE (filename) FROM read_parquet(
528                    'recent/sessions/**/*.parquet',
529                    union_by_name = true, hive_partitioning = true, filename = true
530                );
531                CREATE OR REPLACE VIEW local.invocations AS
532                SELECT * EXCLUDE (filename) FROM read_parquet(
533                    'recent/invocations/**/*.parquet',
534                    union_by_name = true, hive_partitioning = true, filename = true
535                );
536                CREATE OR REPLACE VIEW local.outputs AS
537                SELECT * EXCLUDE (filename) FROM read_parquet(
538                    'recent/outputs/**/*.parquet',
539                    union_by_name = true, hive_partitioning = true, filename = true
540                );
541                CREATE OR REPLACE VIEW local.events AS
542                SELECT * EXCLUDE (filename) FROM read_parquet(
543                    'recent/events/**/*.parquet',
544                    union_by_name = true, hive_partitioning = true, filename = true
545                );
546                "#,
547            )?;
548        }
549
550        // Create placeholder schemas
551        conn.execute_batch(
552            r#"
553            CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
554                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
555                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
556            );
557            CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
558                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
559                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
560                format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
561            );
562            CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
563                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
564                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
565                content_type VARCHAR, date DATE, _source VARCHAR
566            );
567            CREATE TABLE IF NOT EXISTS cached_placeholder.events (
568                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
569                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
570                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
571                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
572            );
573            CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
574                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
575                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
576            );
577            CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
578                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
579                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
580                format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
581            );
582            CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
583                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
584                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
585                content_type VARCHAR, date DATE, _source VARCHAR
586            );
587            CREATE TABLE IF NOT EXISTS remote_placeholder.events (
588                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
592            );
593            "#,
594        )?;
595
596        // Create union schemas
597        conn.execute_batch(
598            r#"
599            CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
600            CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
601            CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
602            CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
603
604            CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
605            CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
606            CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
607            CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
608
609            CREATE OR REPLACE VIEW main.sessions AS
610                SELECT *, 'local' as _source FROM local.sessions
611                UNION ALL BY NAME SELECT * FROM caches.sessions;
612            CREATE OR REPLACE VIEW main.invocations AS
613                SELECT *, 'local' as _source FROM local.invocations
614                UNION ALL BY NAME SELECT * FROM caches.invocations;
615            CREATE OR REPLACE VIEW main.outputs AS
616                SELECT *, 'local' as _source FROM local.outputs
617                UNION ALL BY NAME SELECT * FROM caches.outputs;
618            CREATE OR REPLACE VIEW main.events AS
619                SELECT *, 'local' as _source FROM local.events
620                UNION ALL BY NAME SELECT * FROM caches.events;
621
622            CREATE OR REPLACE VIEW unified.sessions AS
623                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
624            CREATE OR REPLACE VIEW unified.invocations AS
625                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
626            CREATE OR REPLACE VIEW unified.outputs AS
627                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
628            CREATE OR REPLACE VIEW unified.events AS
629                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
630
631            -- Qualified views: deduplicated with source list
632            CREATE OR REPLACE VIEW unified.qualified_sessions AS
633                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
634                FROM unified.sessions GROUP BY ALL;
635            CREATE OR REPLACE VIEW unified.qualified_invocations AS
636                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
637                FROM unified.invocations GROUP BY ALL;
638            CREATE OR REPLACE VIEW unified.qualified_outputs AS
639                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
640                FROM unified.outputs GROUP BY ALL;
641            CREATE OR REPLACE VIEW unified.qualified_events AS
642                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
643                FROM unified.events GROUP BY ALL;
644            "#,
645        )?;
646
647        Ok(())
648    }
649
650    /// Set up S3 credentials for all remotes that use S3.
651    /// This is called early so that blob resolution can access S3 paths.
652    fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
653        // Check if any remote uses S3
654        let has_s3 = self.config.remotes.iter().any(|r| {
655            r.remote_type == crate::config::RemoteType::S3
656        });
657
658        if !has_s3 {
659            return Ok(());
660        }
661
662        // Load httpfs for S3 support
663        conn.execute("LOAD httpfs", [])?;
664
665        // Set up credentials for each S3 remote
666        for remote in &self.config.remotes {
667            if remote.remote_type == crate::config::RemoteType::S3 {
668                if let Some(provider) = &remote.credential_provider {
669                    let secret_sql = format!(
670                        "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
671                        remote.name, provider
672                    );
673                    if let Err(e) = conn.execute(&secret_sql, []) {
674                        eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
675                    }
676                }
677            }
678        }
679
680        Ok(())
681    }
682
683    /// Set up blob_roots variable and blob resolution macros.
684    ///
685    /// Storage refs use URI schemes to indicate type:
686    /// - `data:`, `data+varchar:`, `data+blob:` - inline content (scalarfs)
687    /// - `file:path` - relative path, resolved against blob_roots
688    /// - Absolute paths (`s3://`, `/path/`) - used directly
689    fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
690        let blob_roots = self.config.blob_roots();
691
692        // Format as SQL array literal
693        let roots_sql: String = blob_roots
694            .iter()
695            .map(|r| format!("'{}'", r.replace('\'', "''")))
696            .collect::<Vec<_>>()
697            .join(", ");
698
699        // Set blob_roots variable
700        conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
701
702        // Helper: check if ref is inline data (scalarfs data: protocol)
703        conn.execute(
704            r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
705                ref[:5] = 'data:' OR ref[:5] = 'data+'
706            )"#,
707            [],
708        )?;
709
710        // Helper: check if ref is a relative file: path
711        conn.execute(
712            r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
713                ref[:5] = 'file:'
714            )"#,
715            [],
716        )?;
717
718        // Resolve storage ref to list of paths for pathvariable:
719        // - Inline data: pass through as single-element list
720        // - file: refs: expand to glob patterns across all blob_roots
721        // - Other (absolute paths): pass through
722        conn.execute(
723            r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
724                CASE
725                    WHEN is_inline_data(ref) THEN [ref]
726                    WHEN is_file_ref(ref) THEN
727                        [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
728                    ELSE [ref]
729                END
730            )"#,
731            [],
732        )?;
733
734        Ok(())
735    }
736
737    /// Attach configured remotes to the connection.
738    /// Note: S3 credentials are already set up by setup_s3_credentials().
739    fn attach_remotes(&self, conn: &Connection) -> Result<()> {
740        for remote in self.config.auto_attach_remotes() {
741            // Attach the remote database
742            let attach_sql = remote.attach_sql();
743            if let Err(e) = conn.execute(&attach_sql, []) {
744                eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
745            }
746        }
747
748        Ok(())
749    }
750
751    /// Create TEMPORARY macros for accessing remote data.
752    ///
753    /// We use TEMPORARY macros to avoid persisting references to attached databases
754    /// in the catalog. Persisted references cause database corruption when the
755    /// attachment is not present.
756    ///
757    /// Usage: `SELECT * FROM remotes_invocations()` or `SELECT * FROM remote_<name>_invocations()`
758    fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
759        let remotes = self.config.auto_attach_remotes();
760        if remotes.is_empty() {
761            return Ok(());
762        }
763
764        // Create per-remote TEMPORARY macros for each table type
765        for remote in &remotes {
766            let schema = remote.quoted_schema_name();
767            let name = &remote.name;
768            // Sanitize name for use in macro identifier
769            let safe_name = name.replace('-', "_").replace('.', "_");
770
771            for table in &["sessions", "invocations", "outputs", "events"] {
772                let macro_name = format!("\"remote_{safe_name}_{table}\"");
773                let sql = format!(
774                    r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
775                        SELECT *, '{name}' as _source FROM {schema}.{table}
776                    )"#,
777                    macro_name = macro_name,
778                    name = name,
779                    schema = schema,
780                    table = table
781                );
782                if let Err(e) = conn.execute(&sql, []) {
783                    eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
784                }
785            }
786        }
787
788        // Create combined remotes_* TEMPORARY macros that union all remotes
789        for table in &["sessions", "invocations", "outputs", "events"] {
790            let mut union_parts: Vec<String> = remotes
791                .iter()
792                .map(|r| {
793                    let safe_name = r.name.replace('-', "_").replace('.', "_");
794                    format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
795                })
796                .collect();
797
798            // Include placeholder for empty case
799            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
800
801            let sql = format!(
802                r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
803                    {union}
804                )"#,
805                table = table,
806                union = union_parts.join(" UNION ALL BY NAME ")
807            );
808            if let Err(e) = conn.execute(&sql, []) {
809                eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
810            }
811        }
812
813        Ok(())
814    }
815
816    /// Create TEMPORARY macros for cwd-filtered data.
817    ///
818    /// These filter main.* data to entries matching the current working directory.
819    /// Uses TEMPORARY macros to avoid persisting anything that changes per-connection.
820    ///
821    /// Usage: `SELECT * FROM cwd_invocations()`
822    fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
823        let cwd = std::env::current_dir()
824            .map(|p| p.to_string_lossy().to_string())
825            .unwrap_or_default();
826        let cwd_escaped = cwd.replace('\'', "''");
827
828        // Create TEMPORARY macros for cwd-filtered data
829        let macros = format!(
830            r#"
831            CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
832                SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
833            );
834            CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
835                SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
836            );
837            CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
838                SELECT o.* FROM main.outputs o
839                JOIN main.invocations i ON o.invocation_id = i.id
840                WHERE i.cwd LIKE '{}%'
841            );
842            CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
843                SELECT e.* FROM main.events e
844                JOIN main.invocations i ON e.invocation_id = i.id
845                WHERE i.cwd LIKE '{}%'
846            );
847            "#,
848            cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
849        );
850
851        conn.execute_batch(&macros)?;
852        Ok(())
853    }
854
855    /// Manually attach a specific remote.
856    pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
857        // Load httpfs if needed
858        conn.execute("LOAD httpfs", [])?;
859
860        // Set up credentials
861        if let Some(provider) = &remote.credential_provider {
862            if remote.remote_type == crate::config::RemoteType::S3 {
863                let secret_sql = format!(
864                    "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
865                    remote.name, provider
866                );
867                conn.execute(&secret_sql, [])?;
868            }
869        }
870
871        // Attach
872        conn.execute(&remote.attach_sql(), [])?;
873        Ok(())
874    }
875
876    /// Detach a remote.
877    pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
878        conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
879        Ok(())
880    }
881
882    /// Test connection to a remote. Returns Ok if successful.
883    pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
884        let conn = self.connection_with_options(false)?;
885        self.attach_remote(&conn, remote)?;
886
887        // Try a simple query
888        let test_sql = format!(
889            "SELECT 1 FROM {}.invocations LIMIT 1",
890            remote.quoted_schema_name()
891        );
892        conn.execute(&test_sql, [])?;
893
894        Ok(())
895    }
896
897    /// Get config reference.
898    pub fn config(&self) -> &Config {
899        &self.config
900    }
901
902    /// Query the store using SQL.
903    ///
904    /// Returns results as a Vec of rows, where each row is a Vec of string values.
905    pub fn query(&self, sql: &str) -> Result<QueryResult> {
906        let conn = self.connection()?;
907        let mut stmt = conn.prepare(sql)?;
908
909        // Execute the query first to get column info
910        let mut rows_iter = stmt.query([])?;
911
912        // Get column info from the rows iterator
913        let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
914        let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
915            (0..column_count)
916                .map(|i| {
917                    row_ref
918                        .column_name(i)
919                        .map(|s| s.to_string())
920                        .unwrap_or_else(|_| format!("col{}", i))
921                })
922                .collect()
923        } else {
924            Vec::new()
925        };
926
927        // Collect all rows
928        let mut result_rows = Vec::new();
929        while let Some(row) = rows_iter.next()? {
930            let mut values = Vec::with_capacity(column_count);
931            for i in 0..column_count {
932                // Get value as generic ValueRef and convert to string
933                let value = match row.get_ref(i)? {
934                    ValueRef::Null => "NULL".to_string(),
935                    ValueRef::Boolean(b) => b.to_string(),
936                    ValueRef::TinyInt(n) => n.to_string(),
937                    ValueRef::SmallInt(n) => n.to_string(),
938                    ValueRef::Int(n) => n.to_string(),
939                    ValueRef::BigInt(n) => n.to_string(),
940                    ValueRef::HugeInt(n) => n.to_string(),
941                    ValueRef::UTinyInt(n) => n.to_string(),
942                    ValueRef::USmallInt(n) => n.to_string(),
943                    ValueRef::UInt(n) => n.to_string(),
944                    ValueRef::UBigInt(n) => n.to_string(),
945                    ValueRef::Float(f) => f.to_string(),
946                    ValueRef::Double(f) => f.to_string(),
947                    ValueRef::Decimal(d) => d.to_string(),
948                    ValueRef::Timestamp(unit, val) => {
949                        // Convert to microseconds then to DateTime
950                        let micros = match unit {
951                            TimeUnit::Second => val * 1_000_000,
952                            TimeUnit::Millisecond => val * 1_000,
953                            TimeUnit::Microsecond => val,
954                            TimeUnit::Nanosecond => val / 1_000,
955                        };
956                        DateTime::<Utc>::from_timestamp_micros(micros)
957                            .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
958                            .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
959                    }
960                    ValueRef::Date32(days) => {
961                        // Days since 1970-01-01
962                        NaiveDate::from_ymd_opt(1970, 1, 1)
963                            .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
964                            .map(|d| d.format("%Y-%m-%d").to_string())
965                            .unwrap_or_else(|| format!("<invalid date {}>", days))
966                    }
967                    ValueRef::Time64(unit, val) => {
968                        // Convert to microseconds then to NaiveTime
969                        let micros = match unit {
970                            TimeUnit::Second => val * 1_000_000,
971                            TimeUnit::Millisecond => val * 1_000,
972                            TimeUnit::Microsecond => val,
973                            TimeUnit::Nanosecond => val / 1_000,
974                        };
975                        let secs = (micros / 1_000_000) as u32;
976                        let micro_part = (micros % 1_000_000) as u32;
977                        NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
978                            .map(|t| t.format("%H:%M:%S").to_string())
979                            .unwrap_or_else(|| format!("<invalid time {}>", val))
980                    }
981                    ValueRef::Interval { months, days, nanos } => {
982                        format!("{} months {} days {} ns", months, days, nanos)
983                    }
984                    ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
985                    ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
986                    other => {
987                        // Convert to owned Value for complex types (List, Array, Map, Struct)
988                        let owned: Value = other.into();
989                        format_value(&owned)
990                    }
991                };
992                values.push(value);
993            }
994            result_rows.push(values);
995        }
996
997        Ok(QueryResult {
998            columns: column_names,
999            rows: result_rows,
1000        })
1001    }
1002
1003    /// Get the last invocation with its output (if any).
1004    pub fn last_invocation_with_output(
1005        &self,
1006    ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1007        if let Some(inv) = self.last_invocation()? {
1008            let output = self.get_output(&inv.id)?;
1009            Ok(Some((inv, output)))
1010        } else {
1011            Ok(None)
1012        }
1013    }
1014
1015    /// Write a batch of related records atomically.
1016    ///
1017    /// This is the preferred way to write an invocation with its outputs,
1018    /// session, and events together. In DuckDB mode, all writes are wrapped
1019    /// in a transaction. In Parquet mode, files are written atomically.
1020    pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1021        let invocation = batch
1022            .invocation
1023            .as_ref()
1024            .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1025
1026        match self.config.storage_mode {
1027            StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1028            StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1029        }
1030    }
1031
1032    /// Write batch using Parquet files (multi-writer safe).
1033    fn write_batch_parquet(
1034        &self,
1035        batch: &InvocationBatch,
1036        invocation: &InvocationRecord,
1037    ) -> Result<()> {
1038        // For Parquet mode, we write each record type separately.
1039        // Atomicity is per-file (temp + rename), but not across files.
1040        // This is acceptable because Parquet mode prioritizes concurrent writes.
1041
1042        // Write session first (if provided and not already registered)
1043        if let Some(ref session) = batch.session {
1044            self.ensure_session(session)?;
1045        }
1046
1047        // Write invocation
1048        self.write_invocation(invocation)?;
1049
1050        let date = invocation.date();
1051        let inv_id = invocation.id;
1052
1053        // Write outputs
1054        for (stream, content) in &batch.outputs {
1055            self.store_output(
1056                inv_id,
1057                stream,
1058                content,
1059                date,
1060                invocation.executable.as_deref(),
1061            )?;
1062        }
1063
1064        // Write events (if provided)
1065        if let Some(ref events) = batch.events {
1066            if !events.is_empty() {
1067                self.write_events(events)?;
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073
1074    /// Write batch using DuckDB tables with transaction.
1075    fn write_batch_duckdb(
1076        &self,
1077        batch: &InvocationBatch,
1078        invocation: &InvocationRecord,
1079    ) -> Result<()> {
1080        let conn = self.connection()?;
1081
1082        // Begin transaction
1083        conn.execute("BEGIN TRANSACTION", [])?;
1084
1085        let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1086
1087        match result {
1088            Ok(()) => {
1089                conn.execute("COMMIT", [])?;
1090                Ok(())
1091            }
1092            Err(e) => {
1093                // Rollback on error
1094                let _ = conn.execute("ROLLBACK", []);
1095                Err(e)
1096            }
1097        }
1098    }
1099
1100    /// Inner implementation for DuckDB batch write (within transaction).
1101    fn write_batch_duckdb_inner(
1102        &self,
1103        conn: &Connection,
1104        batch: &InvocationBatch,
1105        invocation: &InvocationRecord,
1106    ) -> Result<()> {
1107        use base64::Engine;
1108
1109        let date = invocation.date();
1110        let inv_id = invocation.id;
1111
1112        // Write session (if provided)
1113        if let Some(ref session) = batch.session {
1114            // Check if session exists
1115            let exists: i64 = conn
1116                .query_row(
1117                    "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1118                    params![&session.session_id],
1119                    |row| row.get(0),
1120                )
1121                .unwrap_or(0);
1122
1123            if exists == 0 {
1124                conn.execute(
1125                    r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1126                    params![
1127                        session.session_id,
1128                        session.client_id,
1129                        session.invoker,
1130                        session.invoker_pid,
1131                        session.invoker_type,
1132                        session.registered_at.to_rfc3339(),
1133                        session.cwd,
1134                        session.date.to_string(),
1135                    ],
1136                )?;
1137            }
1138        }
1139
1140        // Write invocation
1141        conn.execute(
1142            r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1143            params![
1144                invocation.id.to_string(),
1145                invocation.session_id,
1146                invocation.timestamp.to_rfc3339(),
1147                invocation.duration_ms,
1148                invocation.cwd,
1149                invocation.cmd,
1150                invocation.executable,
1151                invocation.exit_code,
1152                invocation.format_hint,
1153                invocation.client_id,
1154                invocation.hostname,
1155                invocation.username,
1156                invocation.tag,
1157                date.to_string(),
1158            ],
1159        )?;
1160
1161        // Write outputs
1162        for (stream, content) in &batch.outputs {
1163            // Compute hash
1164            let hash = blake3::hash(content);
1165            let hash_hex = hash.to_hex().to_string();
1166
1167            // Route by size
1168            let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1169                // Inline: use data: URL
1170                let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1171                let data_url = format!("data:application/octet-stream;base64,{}", b64);
1172                ("inline".to_string(), data_url)
1173            } else {
1174                // Blob: write file and register
1175                let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1176                let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1177
1178                if let Some(parent) = blob_path.parent() {
1179                    fs::create_dir_all(parent)?;
1180                }
1181
1182                let rel_path = blob_path
1183                    .strip_prefix(&self.config.data_dir())
1184                    .map(|p| p.to_string_lossy().to_string())
1185                    .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1186
1187                // Write blob atomically
1188                let wrote_new = atomic::write_file(&blob_path, content)?;
1189
1190                if wrote_new {
1191                    conn.execute(
1192                        "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1193                        params![&hash_hex, content.len() as i64, &rel_path],
1194                    )?;
1195                } else {
1196                    conn.execute(
1197                        "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1198                        params![&hash_hex],
1199                    )?;
1200                }
1201
1202                ("blob".to_string(), format!("file://{}", rel_path))
1203            };
1204
1205            // Write output record
1206            let output_id = uuid::Uuid::now_v7();
1207            conn.execute(
1208                r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1209                params![
1210                    output_id.to_string(),
1211                    inv_id.to_string(),
1212                    stream,
1213                    hash_hex,
1214                    content.len() as i64,
1215                    storage_type,
1216                    storage_ref,
1217                    Option::<String>::None, // content_type
1218                    date.to_string(),
1219                ],
1220            )?;
1221        }
1222
1223        // Write events (if provided)
1224        if let Some(ref events) = batch.events {
1225            for event in events {
1226                conn.execute(
1227                    r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1228                    params![
1229                        event.id.to_string(),
1230                        event.invocation_id.to_string(),
1231                        event.client_id,
1232                        event.hostname,
1233                        event.event_type,
1234                        event.severity,
1235                        event.ref_file,
1236                        event.ref_line,
1237                        event.ref_column,
1238                        event.message,
1239                        event.error_code,
1240                        event.test_name,
1241                        event.status,
1242                        event.format_used,
1243                        event.date.to_string(),
1244                    ],
1245                )?;
1246            }
1247        }
1248
1249        Ok(())
1250    }
1251
1252    /// Load format hints from the config file.
1253    pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1254        let path = self.config.format_hints_path();
1255
1256        // Try new format-hints.toml first
1257        if path.exists() {
1258            return crate::FormatHints::load(&path);
1259        }
1260
1261        // Fall back to legacy event-formats.toml
1262        let legacy_path = self.config.event_formats_path();
1263        if legacy_path.exists() {
1264            return crate::FormatHints::load(&legacy_path);
1265        }
1266
1267        Ok(crate::FormatHints::new())
1268    }
1269
1270    /// Save format hints to the config file.
1271    pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1272        hints.save(&self.config.format_hints_path())
1273    }
1274
1275    /// Detect format for a command using format hints.
1276    ///
1277    /// Priority:
1278    /// 1. User-defined format hints (by priority)
1279    /// 2. Default format from config (or "auto")
1280    ///
1281    /// Note: duck_hunt detects formats from content analysis, not command names.
1282    /// Use format hints to map commands to formats, then duck_hunt parses the output.
1283    pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1284        let hints = self.load_format_hints()?;
1285        Ok(hints.detect(cmd).to_string())
1286    }
1287
1288    /// Get list of duck_hunt built-in formats.
1289    ///
1290    /// Note: duck_hunt detects formats from content analysis, not command patterns.
1291    /// This lists available format names that can be used with duck_hunt parsing.
1292    pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1293        let conn = self.connection()?;
1294
1295        let mut stmt = conn.prepare(
1296            "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1297        )?;
1298
1299        let rows = stmt.query_map([], |row| {
1300            Ok(BuiltinFormat {
1301                format: row.get(0)?,
1302                pattern: row.get::<_, String>(1)?, // description as "pattern" for display
1303                priority: row.get(2)?,
1304            })
1305        })?;
1306
1307        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1308        Ok(results)
1309    }
1310
1311    /// Check which format would be detected for a command.
1312    /// Returns the format name and source (user-defined or default).
1313    ///
1314    /// Note: duck_hunt detects formats from content, not command names.
1315    /// This only checks user-defined format hints.
1316    pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1317        let hints = self.load_format_hints()?;
1318
1319        // Check user-defined hints
1320        for hint in hints.hints() {
1321            if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1322                return Ok(FormatMatch {
1323                    format: hint.format.clone(),
1324                    source: FormatSource::UserDefined {
1325                        pattern: hint.pattern.clone(),
1326                        priority: hint.priority,
1327                    },
1328                });
1329            }
1330        }
1331
1332        // No match - use default
1333        Ok(FormatMatch {
1334            format: hints.default_format().to_string(),
1335            source: FormatSource::Default,
1336        })
1337    }
1338}
1339
1340/// A built-in format from duck_hunt.
1341#[derive(Debug, Clone)]
1342pub struct BuiltinFormat {
1343    pub format: String,
1344    pub pattern: String,
1345    pub priority: i32,
1346}
1347
1348/// Result of format detection.
1349#[derive(Debug, Clone)]
1350pub struct FormatMatch {
1351    pub format: String,
1352    pub source: FormatSource,
1353}
1354
1355/// Source of a format match.
1356#[derive(Debug, Clone)]
1357pub enum FormatSource {
1358    UserDefined { pattern: String, priority: i32 },
1359    Builtin { pattern: String, priority: i32 },
1360    Default,
1361}
1362
1363/// Result of a SQL query.
1364#[derive(Debug)]
1365pub struct QueryResult {
1366    pub columns: Vec<String>,
1367    pub rows: Vec<Vec<String>>,
1368}
1369
1370/// Sanitize a string for use in filenames.
1371fn sanitize_filename(s: &str) -> String {
1372    s.chars()
1373        .map(|c| match c {
1374            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1375            ' ' => '-',
1376            c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1377            _ => '_',
1378        })
1379        .take(64)
1380        .collect()
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use super::*;
1386    use crate::init::initialize;
1387    use crate::schema::SessionRecord;
1388    use tempfile::TempDir;
1389
1390    fn setup_store() -> (TempDir, Store) {
1391        let tmp = TempDir::new().unwrap();
1392        let config = Config::with_root(tmp.path());
1393        initialize(&config).unwrap();
1394        let store = Store::open(config).unwrap();
1395        (tmp, store)
1396    }
1397
1398    fn setup_store_duckdb() -> (TempDir, Store) {
1399        let tmp = TempDir::new().unwrap();
1400        let config = Config::with_duckdb_mode(tmp.path());
1401        initialize(&config).unwrap();
1402        let store = Store::open(config).unwrap();
1403        (tmp, store)
1404    }
1405
1406    #[test]
1407    fn test_store_open_uninitialized_fails() {
1408        let tmp = TempDir::new().unwrap();
1409        let config = Config::with_root(tmp.path());
1410
1411        let result = Store::open(config);
1412        assert!(matches!(result, Err(Error::NotInitialized(_))));
1413    }
1414
1415    #[test]
1416    fn test_sanitize_filename() {
1417        assert_eq!(sanitize_filename("make test"), "make-test");
1418        assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1419        assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1420    }
1421
1422    // Batch write tests - Parquet mode
1423
1424    #[test]
1425    fn test_batch_write_parquet_invocation_only() {
1426        let (_tmp, store) = setup_store();
1427
1428        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1429
1430        let batch = InvocationBatch::new(inv);
1431        store.write_batch(&batch).unwrap();
1432
1433        assert_eq!(store.invocation_count().unwrap(), 1);
1434    }
1435
1436    #[test]
1437    fn test_batch_write_parquet_with_output() {
1438        let (_tmp, store) = setup_store();
1439
1440        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1441        let inv_id = inv.id;
1442
1443        let batch = InvocationBatch::new(inv)
1444            .with_output("stdout", b"hello world\n".to_vec());
1445
1446        store.write_batch(&batch).unwrap();
1447
1448        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1449        assert_eq!(outputs.len(), 1);
1450        assert_eq!(outputs[0].stream, "stdout");
1451    }
1452
1453    #[test]
1454    fn test_batch_write_parquet_with_session() {
1455        let (_tmp, store) = setup_store();
1456
1457        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1458        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1459
1460        let batch = InvocationBatch::new(inv).with_session(session);
1461        store.write_batch(&batch).unwrap();
1462
1463        assert!(store.session_exists("test-session").unwrap());
1464    }
1465
1466    #[test]
1467    fn test_batch_write_parquet_full() {
1468        let (_tmp, store) = setup_store();
1469
1470        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1471        let inv_id = inv.id;
1472        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1473
1474        let batch = InvocationBatch::new(inv)
1475            .with_session(session)
1476            .with_output("stdout", b"Building...\n".to_vec())
1477            .with_output("stderr", b"error: failed\n".to_vec());
1478
1479        store.write_batch(&batch).unwrap();
1480
1481        assert_eq!(store.invocation_count().unwrap(), 1);
1482        assert!(store.session_exists("test-session").unwrap());
1483
1484        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1485        assert_eq!(outputs.len(), 2);
1486    }
1487
1488    // Batch write tests - DuckDB mode
1489
1490    #[test]
1491    fn test_batch_write_duckdb_invocation_only() {
1492        let (_tmp, store) = setup_store_duckdb();
1493
1494        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1495
1496        let batch = InvocationBatch::new(inv);
1497        store.write_batch(&batch).unwrap();
1498
1499        assert_eq!(store.invocation_count().unwrap(), 1);
1500    }
1501
1502    #[test]
1503    fn test_batch_write_duckdb_with_output() {
1504        let (_tmp, store) = setup_store_duckdb();
1505
1506        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1507        let inv_id = inv.id;
1508
1509        let batch = InvocationBatch::new(inv)
1510            .with_output("stdout", b"hello world\n".to_vec());
1511
1512        store.write_batch(&batch).unwrap();
1513
1514        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1515        assert_eq!(outputs.len(), 1);
1516        assert_eq!(outputs[0].stream, "stdout");
1517    }
1518
1519    #[test]
1520    fn test_batch_write_duckdb_with_session() {
1521        let (_tmp, store) = setup_store_duckdb();
1522
1523        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1524        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1525
1526        let batch = InvocationBatch::new(inv).with_session(session);
1527        store.write_batch(&batch).unwrap();
1528
1529        assert!(store.session_exists("test-session").unwrap());
1530    }
1531
1532    #[test]
1533    fn test_batch_write_duckdb_full() {
1534        let (_tmp, store) = setup_store_duckdb();
1535
1536        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1537        let inv_id = inv.id;
1538        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1539
1540        let batch = InvocationBatch::new(inv)
1541            .with_session(session)
1542            .with_output("stdout", b"Building...\n".to_vec())
1543            .with_output("stderr", b"error: failed\n".to_vec());
1544
1545        store.write_batch(&batch).unwrap();
1546
1547        assert_eq!(store.invocation_count().unwrap(), 1);
1548        assert!(store.session_exists("test-session").unwrap());
1549
1550        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1551        assert_eq!(outputs.len(), 2);
1552    }
1553
1554    #[test]
1555    fn test_batch_requires_invocation() {
1556        let (_tmp, store) = setup_store();
1557
1558        let batch = InvocationBatch::default();
1559        let result = store.write_batch(&batch);
1560
1561        assert!(result.is_err());
1562    }
1563
1564    // Extension loading tests
1565
1566    #[test]
1567    fn test_ensure_extension_parquet() {
1568        // Parquet is an official extension, should always be available
1569        let conn = duckdb::Connection::open_in_memory().unwrap();
1570        let result = ensure_extension(&conn, "parquet").unwrap();
1571        assert!(result, "parquet extension should be loadable");
1572    }
1573
1574    #[test]
1575    fn test_ensure_extension_icu() {
1576        // ICU is an official extension, should always be available
1577        let conn = duckdb::Connection::open_in_memory().unwrap();
1578        let result = ensure_extension(&conn, "icu").unwrap();
1579        assert!(result, "icu extension should be loadable");
1580    }
1581
1582    #[test]
1583    fn test_ensure_extension_community() {
1584        // Community extensions require allow_community_extensions
1585        let conn = duckdb::Connection::open_in_memory().unwrap();
1586        conn.execute("SET allow_community_extensions = true", []).unwrap();
1587
1588        // scalarfs and duck_hunt are community extensions
1589        let result = ensure_extension(&conn, "scalarfs").unwrap();
1590        assert!(result, "scalarfs extension should be loadable from community");
1591
1592        let result = ensure_extension(&conn, "duck_hunt").unwrap();
1593        assert!(result, "duck_hunt extension should be loadable from community");
1594    }
1595
1596    #[test]
1597    fn test_ensure_extension_nonexistent() {
1598        let conn = duckdb::Connection::open_in_memory().unwrap();
1599        conn.execute("SET allow_community_extensions = true", []).unwrap();
1600
1601        // A made-up extension should return false (not error)
1602        let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1603        assert!(!result, "nonexistent extension should return false");
1604    }
1605
1606    #[test]
1607    fn test_extension_loading_is_cached() {
1608        // Once installed, extensions should load quickly from cache
1609        let conn = duckdb::Connection::open_in_memory().unwrap();
1610
1611        // First load might install
1612        ensure_extension(&conn, "parquet").unwrap();
1613
1614        // Second load should be fast (from cache)
1615        let start = std::time::Instant::now();
1616        ensure_extension(&conn, "parquet").unwrap();
1617        let elapsed = start.elapsed();
1618
1619        // Should be very fast if cached (< 100ms)
1620        assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1621    }
1622}