Skip to main content

magic_bird/store/
mod.rs

1//! Store - handles writing and reading records.
2//!
3//! Uses DuckDB to write Parquet files and query across them.
4
5mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod pending;
11mod remote;
12mod sessions;
13
14use std::fs;
15use std::thread;
16use std::time::Duration;
17
18use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
19use duckdb::{
20    params,
21    types::{TimeUnit, Value, ValueRef},
22    Connection,
23};
24
25use crate::config::StorageMode;
26use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
27use crate::{Config, Error, Result};
28
29/// Format a DuckDB Value to a human-readable string.
30/// Handles complex types like List, Array, Map, and Struct recursively.
31fn format_value(value: &Value) -> String {
32    match value {
33        Value::Null => "NULL".to_string(),
34        Value::Boolean(b) => b.to_string(),
35        Value::TinyInt(n) => n.to_string(),
36        Value::SmallInt(n) => n.to_string(),
37        Value::Int(n) => n.to_string(),
38        Value::BigInt(n) => n.to_string(),
39        Value::HugeInt(n) => n.to_string(),
40        Value::UTinyInt(n) => n.to_string(),
41        Value::USmallInt(n) => n.to_string(),
42        Value::UInt(n) => n.to_string(),
43        Value::UBigInt(n) => n.to_string(),
44        Value::Float(f) => f.to_string(),
45        Value::Double(f) => f.to_string(),
46        Value::Decimal(d) => d.to_string(),
47        Value::Timestamp(_, micros) => {
48            DateTime::<Utc>::from_timestamp_micros(*micros)
49                .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
50                .unwrap_or_else(|| format!("<timestamp {}>", micros))
51        }
52        Value::Text(s) => s.clone(),
53        Value::Blob(b) => format!("<blob {} bytes>", b.len()),
54        Value::Date32(days) => {
55            NaiveDate::from_ymd_opt(1970, 1, 1)
56                .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
57                .map(|d| d.format("%Y-%m-%d").to_string())
58                .unwrap_or_else(|| format!("<date {}>", days))
59        }
60        Value::Time64(_, micros) => {
61            let secs = (*micros / 1_000_000) as u32;
62            let micro_part = (*micros % 1_000_000) as u32;
63            NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
64                .map(|t| t.format("%H:%M:%S").to_string())
65                .unwrap_or_else(|| format!("<time {}>", micros))
66        }
67        Value::Interval { months, days, nanos } => {
68            format!("{} months {} days {} ns", months, days, nanos)
69        }
70        // Complex types
71        Value::List(items) => {
72            let formatted: Vec<String> = items.iter().map(format_value).collect();
73            format!("[{}]", formatted.join(", "))
74        }
75        Value::Array(items) => {
76            let formatted: Vec<String> = items.iter().map(format_value).collect();
77            format!("[{}]", formatted.join(", "))
78        }
79        Value::Map(map) => {
80            let formatted: Vec<String> = map
81                .iter()
82                .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
83                .collect();
84            format!("{{{}}}", formatted.join(", "))
85        }
86        Value::Struct(fields) => {
87            let formatted: Vec<String> = fields
88                .iter()
89                .map(|(k, v)| format!("{}: {}", k, format_value(v)))
90                .collect();
91            format!("{{{}}}", formatted.join(", "))
92        }
93        Value::Enum(s) => s.clone(),
94        _ => "<unknown>".to_string(),
95    }
96}
97
98// Re-export types from submodules
99pub use compact::{
100    ArchiveStats, AutoCompactOptions, CleanOptions, CleanStats, CompactOptions, CompactStats,
101    PruneStats,
102};
103pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
104pub use invocations::InvocationSummary;
105pub use outputs::OutputInfo;
106pub use pending::{
107    delete_pending_file, is_runner_alive, list_pending_files, write_pending_file,
108    PendingInvocation, RecoveryStats,
109};
110pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
111
112// Re-export format detection types (defined below)
113// BuiltinFormat, FormatMatch, FormatSource are defined at the bottom of this file
114
115/// A batch of related records to write atomically.
116///
117/// Use this when you want to write an invocation along with its outputs,
118/// session, and/or events in a single transaction.
119#[derive(Debug, Default)]
120pub struct InvocationBatch {
121    /// The invocation record (required).
122    pub invocation: Option<InvocationRecord>,
123
124    /// Output streams with their content: (stream_name, content).
125    /// Common streams: "stdout", "stderr", "combined".
126    pub outputs: Vec<(String, Vec<u8>)>,
127
128    /// Session record (optional, created if not already registered).
129    pub session: Option<SessionRecord>,
130
131    /// Pre-extracted events (optional).
132    pub events: Option<Vec<EventRecord>>,
133}
134
135impl InvocationBatch {
136    /// Create a new batch with an invocation.
137    pub fn new(invocation: InvocationRecord) -> Self {
138        Self {
139            invocation: Some(invocation),
140            outputs: Vec::new(),
141            session: None,
142            events: None,
143        }
144    }
145
146    /// Add an output stream.
147    pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
148        self.outputs.push((stream.into(), content));
149        self
150    }
151
152    /// Add a session record.
153    pub fn with_session(mut self, session: SessionRecord) -> Self {
154        self.session = Some(session);
155        self
156    }
157
158    /// Add pre-extracted events.
159    pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
160        self.events = Some(events);
161        self
162    }
163}
164
165/// Options for creating a database connection.
166///
167/// Controls what gets loaded and attached when opening a connection.
168/// Use this for explicit control over connection behavior.
169///
170/// # Connection Stages
171///
172/// 1. **Extensions** (always): parquet, icu, scalarfs, duck_hunt
173/// 2. **Blob resolution** (always): S3 credentials, blob_roots macro
174/// 3. **Migration** (optional): Upgrade existing installations to new schema
175/// 4. **Remotes** (optional): Attach remote databases, rebuild remotes.* views
176/// 5. **Project** (optional): Attach project-local database if in a project
177/// 6. **CWD views** (optional): Rebuild cwd.* views for current directory
178#[derive(Debug, Clone, Default)]
179pub struct ConnectionOptions {
180    /// Attach configured remotes (default: true).
181    /// When true, remote databases are attached and remotes.* views are rebuilt
182    /// to include the attached data.
183    pub attach_remotes: bool,
184
185    /// Attach project database if in a project directory (default: true).
186    pub attach_project: bool,
187
188    /// Rebuild cwd.* views for current working directory (default: true).
189    /// These views filter main.* data to entries matching the current directory.
190    pub create_ephemeral_views: bool,
191
192    /// Run migration for existing installations (default: false).
193    /// Only enable this for explicit upgrade operations.
194    pub run_migration: bool,
195}
196
197impl ConnectionOptions {
198    /// Create options for a full connection (default behavior).
199    pub fn full() -> Self {
200        Self {
201            attach_remotes: true,
202            attach_project: true,
203            create_ephemeral_views: true,
204            run_migration: false,
205        }
206    }
207
208    /// Create options for a minimal connection (no attachments).
209    /// Useful for write operations that don't need remote data.
210    pub fn minimal() -> Self {
211        Self {
212            attach_remotes: false,
213            attach_project: false,
214            create_ephemeral_views: false,
215            run_migration: false,
216        }
217    }
218
219    /// Create options for a migration/upgrade connection.
220    pub fn for_migration() -> Self {
221        Self {
222            attach_remotes: false,
223            attach_project: false,
224            create_ephemeral_views: false,
225            run_migration: true,
226        }
227    }
228}
229
230/// Ensure a DuckDB extension is loaded, installing if necessary.
231///
232/// Attempts in order:
233/// 1. LOAD (extension might already be available)
234/// 2. INSTALL from default repository, then LOAD
235/// 3. INSTALL FROM community, then LOAD
236///
237/// Returns Ok(true) if loaded successfully, Ok(false) if extension unavailable.
238fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
239    // Try loading directly first (already installed/cached)
240    if conn.execute(&format!("LOAD {}", name), []).is_ok() {
241        return Ok(true);
242    }
243
244    // Try installing from default repository
245    if conn.execute(&format!("INSTALL {}", name), []).is_ok()
246        && conn.execute(&format!("LOAD {}", name), []).is_ok()
247    {
248        return Ok(true);
249    }
250
251    // Try installing from community repository
252    if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
253        && conn.execute(&format!("LOAD {}", name), []).is_ok()
254    {
255        return Ok(true);
256    }
257
258    Ok(false)
259}
260
261/// A BIRD store for reading and writing records.
262pub struct Store {
263    config: Config,
264}
265
266impl Store {
267    /// Open an existing BIRD store.
268    pub fn open(config: Config) -> Result<Self> {
269        if !config.db_path().exists() {
270            return Err(Error::NotInitialized(config.bird_root.clone()));
271        }
272        Ok(Self { config })
273    }
274
275    /// Open a DuckDB connection with retry and exponential backoff.
276    ///
277    /// DuckDB uses file locking for concurrent access. When multiple processes
278    /// (e.g., background shell hook saves) try to access the database simultaneously,
279    /// this method retries with exponential backoff to avoid lock conflicts.
280    fn open_connection_with_retry(&self) -> Result<Connection> {
281        const MAX_RETRIES: u32 = 10;
282        const INITIAL_DELAY_MS: u64 = 10;
283        const MAX_DELAY_MS: u64 = 1000;
284
285        let db_path = self.config.db_path();
286        let mut delay_ms = INITIAL_DELAY_MS;
287        let mut last_error = None;
288
289        for attempt in 0..MAX_RETRIES {
290            match Connection::open(&db_path) {
291                Ok(conn) => return Ok(conn),
292                Err(e) => {
293                    let err_msg = e.to_string();
294                    // Check if this is a lock conflict error
295                    if err_msg.contains("Could not set lock")
296                        || err_msg.contains("Conflicting lock")
297                        || err_msg.contains("database is locked")
298                    {
299                        last_error = Some(e);
300                        if attempt < MAX_RETRIES - 1 {
301                            // Add jitter to avoid thundering herd
302                            let jitter = (attempt as u64 * 7) % 10;
303                            thread::sleep(Duration::from_millis(delay_ms + jitter));
304                            delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
305                            continue;
306                        }
307                    } else {
308                        // Non-lock error, fail immediately
309                        return Err(e.into());
310                    }
311                }
312            }
313        }
314
315        // All retries exhausted
316        Err(last_error
317            .map(|e| e.into())
318            .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
319    }
320
321    /// Get a DuckDB connection with full features (attachments, ephemeral views).
322    pub fn connection(&self) -> Result<Connection> {
323        self.connect(ConnectionOptions::full())
324    }
325
326    /// Get a DuckDB connection with optional remote attachment (legacy API).
327    pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
328        let opts = if attach_remotes {
329            ConnectionOptions::full()
330        } else {
331            ConnectionOptions::minimal()
332        };
333        self.connect(opts)
334    }
335
336    /// Get a DuckDB connection with explicit options.
337    ///
338    /// This is the main connection method. Use `ConnectionOptions` to control:
339    /// - Whether remotes are attached
340    /// - Whether project database is attached
341    /// - Whether ephemeral views are created
342    /// - Whether migration should run
343    ///
344    /// Uses retry with exponential backoff to handle concurrent access.
345    pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
346        let conn = self.open_connection_with_retry()?;
347
348        // ===== Load required extensions =====
349        // Uses default extension directory (typically ~/.duckdb/extensions)
350        // Falls back to community repository if not in default
351        conn.execute("SET allow_community_extensions = true", [])?;
352
353        for ext in ["parquet", "icu"] {
354            if !ensure_extension(&conn, ext)? {
355                return Err(Error::Extension(format!(
356                    "Required extension '{}' could not be loaded",
357                    ext
358                )));
359            }
360        }
361
362        // Optional community extensions - warn if missing
363        for (ext, desc) in [
364            ("scalarfs", "data: URL support for inline blobs"),
365            ("duck_hunt", "log/output parsing for event extraction"),
366        ] {
367            if !ensure_extension(&conn, ext)? {
368                eprintln!("Warning: {} extension not available ({})", ext, desc);
369            }
370        }
371
372        // Set file search path so views resolve relative paths correctly
373        conn.execute(
374            &format!(
375                "SET file_search_path = '{}'",
376                self.config.data_dir().display()
377            ),
378            [],
379        )?;
380
381        // ===== Optional: Run migration for existing installations =====
382        if opts.run_migration {
383            self.migrate_to_new_schema(&conn)?;
384        }
385
386        // ===== Always set up blob resolution =====
387        // S3 credentials needed before blob_roots is used
388        self.setup_s3_credentials(&conn)?;
389        self.setup_blob_resolution(&conn)?;
390
391        // ===== Optional: Attach remotes and create access macros =====
392        if opts.attach_remotes && !self.config.remotes.is_empty() {
393            self.attach_remotes(&conn)?;
394            self.create_remote_macros(&conn)?;
395        }
396
397        // ===== Optional: Attach project database =====
398        if opts.attach_project {
399            self.attach_project_db(&conn)?;
400        }
401
402        // ===== Optional: Create cwd macros =====
403        // These TEMPORARY macros filter by current working directory
404        if opts.create_ephemeral_views {
405            self.create_cwd_macros(&conn)?;
406        }
407
408        Ok(conn)
409    }
410
411    /// Attach project-level `.bird/` database if we're in a project directory.
412    ///
413    /// The project database is attached as read-only under schema "project".
414    /// This allows queries like `SELECT * FROM project.invocations`.
415    fn attach_project_db(&self, conn: &Connection) -> Result<()> {
416        use crate::project::find_current_project;
417
418        let Some(project) = find_current_project() else {
419            return Ok(()); // Not in a project
420        };
421
422        if !project.is_initialized() {
423            return Ok(()); // Project not initialized
424        }
425
426        // Don't attach if project DB is the same as user DB
427        if project.db_path == self.config.db_path() {
428            return Ok(());
429        }
430
431        // Attach as read-only
432        let attach_sql = format!(
433            "ATTACH '{}' AS project (READ_ONLY)",
434            project.db_path.display()
435        );
436
437        if let Err(e) = conn.execute(&attach_sql, []) {
438            // Log but don't fail - project DB might be locked or inaccessible
439            eprintln!("Note: Could not attach project database: {}", e);
440        }
441
442        Ok(())
443    }
444
445    /// Migrate existing installations to the new schema architecture.
446    ///
447    /// Checks if the `local` schema exists; if not, creates the new schema
448    /// structure and migrates data from old `*_table` tables.
449    fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
450        // Check if already migrated (local schema exists)
451        let local_exists: bool = conn
452            .query_row(
453                "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
454                [],
455                |row| row.get(0),
456            )
457            .unwrap_or(false);
458
459        if local_exists {
460            return Ok(());
461        }
462
463        // This is an old installation - need to migrate
464        // For now, just create the new schemas. Data migration would require
465        // moving data from old tables/views to new structure.
466        // TODO: Implement full data migration if needed
467
468        eprintln!("Note: Migrating to new schema architecture...");
469
470        // Create core schemas
471        conn.execute_batch(
472            r#"
473            CREATE SCHEMA IF NOT EXISTS local;
474            CREATE SCHEMA IF NOT EXISTS cached_placeholder;
475            CREATE SCHEMA IF NOT EXISTS remote_placeholder;
476            CREATE SCHEMA IF NOT EXISTS caches;
477            CREATE SCHEMA IF NOT EXISTS remotes;
478            CREATE SCHEMA IF NOT EXISTS unified;
479            CREATE SCHEMA IF NOT EXISTS cwd;
480            "#,
481        )?;
482
483        // For DuckDB mode, create local tables
484        if self.config.storage_mode == crate::StorageMode::DuckDB {
485            conn.execute_batch(
486                r#"
487                CREATE TABLE IF NOT EXISTS local.sessions (
488                    session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
489                    invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
490                );
491                CREATE TABLE IF NOT EXISTS local.invocations (
492                    id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
493                    cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
494                    format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
495                    tag VARCHAR, date DATE
496                );
497                CREATE TABLE IF NOT EXISTS local.outputs (
498                    id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
499                    byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
500                    content_type VARCHAR, date DATE
501                );
502                CREATE TABLE IF NOT EXISTS local.events (
503                    id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
504                    event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
505                    ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
506                    status VARCHAR, format_used VARCHAR, date DATE
507                );
508                "#,
509            )?;
510
511            // Copy data from old tables if they exist
512            let old_tables_exist: bool = conn
513                .query_row(
514                    "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
515                    [],
516                    |row| row.get(0),
517                )
518                .unwrap_or(false);
519
520            if old_tables_exist {
521                conn.execute_batch(
522                    r#"
523                    INSERT INTO local.sessions SELECT * FROM sessions_table;
524                    INSERT INTO local.invocations SELECT * FROM invocations_table;
525                    INSERT INTO local.outputs SELECT * FROM outputs_table;
526                    INSERT INTO local.events SELECT * FROM events_table;
527                    "#,
528                )?;
529            }
530        } else {
531            // Parquet mode - create views over parquet files
532            conn.execute_batch(
533                r#"
534                CREATE OR REPLACE VIEW local.sessions AS
535                SELECT * EXCLUDE (filename) FROM read_parquet(
536                    'recent/sessions/**/*.parquet',
537                    union_by_name = true, hive_partitioning = true, filename = true
538                );
539                CREATE OR REPLACE VIEW local.invocations AS
540                SELECT * EXCLUDE (filename) FROM read_parquet(
541                    'recent/invocations/**/*.parquet',
542                    union_by_name = true, hive_partitioning = true, filename = true
543                );
544                CREATE OR REPLACE VIEW local.outputs AS
545                SELECT * EXCLUDE (filename) FROM read_parquet(
546                    'recent/outputs/**/*.parquet',
547                    union_by_name = true, hive_partitioning = true, filename = true
548                );
549                CREATE OR REPLACE VIEW local.events AS
550                SELECT * EXCLUDE (filename) FROM read_parquet(
551                    'recent/events/**/*.parquet',
552                    union_by_name = true, hive_partitioning = true, filename = true
553                );
554                "#,
555            )?;
556        }
557
558        // Create placeholder schemas
559        conn.execute_batch(
560            r#"
561            CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
562                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
563                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
564            );
565            CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
566                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
567                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
568                status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
569            );
570            CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
571                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
572                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
573                content_type VARCHAR, date DATE, _source VARCHAR
574            );
575            CREATE TABLE IF NOT EXISTS cached_placeholder.events (
576                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
577                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
578                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
579                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
580            );
581            CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
582                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
583                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
584            );
585            CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
586                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
587                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
588                status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
589            );
590            CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
591                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
592                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
593                content_type VARCHAR, date DATE, _source VARCHAR
594            );
595            CREATE TABLE IF NOT EXISTS remote_placeholder.events (
596                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
597                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
598                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
599                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
600            );
601            "#,
602        )?;
603
604        // Create union schemas
605        conn.execute_batch(
606            r#"
607            CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
608            CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
609            CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
610            CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
611
612            CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
613            CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
614            CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
615            CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
616
617            CREATE OR REPLACE VIEW main.sessions AS
618                SELECT *, 'local' as _source FROM local.sessions
619                UNION ALL BY NAME SELECT * FROM caches.sessions;
620            CREATE OR REPLACE VIEW main.invocations AS
621                SELECT *, 'local' as _source FROM local.invocations
622                UNION ALL BY NAME SELECT * FROM caches.invocations;
623            CREATE OR REPLACE VIEW main.outputs AS
624                SELECT *, 'local' as _source FROM local.outputs
625                UNION ALL BY NAME SELECT * FROM caches.outputs;
626            CREATE OR REPLACE VIEW main.events AS
627                SELECT *, 'local' as _source FROM local.events
628                UNION ALL BY NAME SELECT * FROM caches.events;
629
630            CREATE OR REPLACE VIEW unified.sessions AS
631                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
632            CREATE OR REPLACE VIEW unified.invocations AS
633                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
634            CREATE OR REPLACE VIEW unified.outputs AS
635                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
636            CREATE OR REPLACE VIEW unified.events AS
637                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
638
639            -- Qualified views: deduplicated with source list
640            CREATE OR REPLACE VIEW unified.qualified_sessions AS
641                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
642                FROM unified.sessions GROUP BY ALL;
643            CREATE OR REPLACE VIEW unified.qualified_invocations AS
644                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
645                FROM unified.invocations GROUP BY ALL;
646            CREATE OR REPLACE VIEW unified.qualified_outputs AS
647                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
648                FROM unified.outputs GROUP BY ALL;
649            CREATE OR REPLACE VIEW unified.qualified_events AS
650                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
651                FROM unified.events GROUP BY ALL;
652            "#,
653        )?;
654
655        Ok(())
656    }
657
658    /// Set up S3 credentials for all remotes that use S3.
659    /// This is called early so that blob resolution can access S3 paths.
660    fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
661        // Check if any remote uses S3
662        let has_s3 = self.config.remotes.iter().any(|r| {
663            r.remote_type == crate::config::RemoteType::S3
664        });
665
666        if !has_s3 {
667            return Ok(());
668        }
669
670        // Set up credentials for each S3 remote
671        for remote in &self.config.remotes {
672            if remote.remote_type == crate::config::RemoteType::S3 {
673                if let Some(provider) = &remote.credential_provider {
674                    let secret_sql = format!(
675                        "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
676                        remote.name, provider
677                    );
678                    if let Err(e) = conn.execute(&secret_sql, []) {
679                        eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
680                    }
681                }
682            }
683        }
684
685        Ok(())
686    }
687
688    /// Set up blob_roots variable and blob resolution macros.
689    ///
690    /// Storage refs use URI schemes to indicate type:
691    /// - `data:`, `data+varchar:`, `data+blob:` - inline content (scalarfs)
692    /// - `file:path` - relative path, resolved against blob_roots
693    /// - Absolute paths (`s3://`, `/path/`) - used directly
694    fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
695        let blob_roots = self.config.blob_roots();
696
697        // Format as SQL array literal
698        let roots_sql: String = blob_roots
699            .iter()
700            .map(|r| format!("'{}'", r.replace('\'', "''")))
701            .collect::<Vec<_>>()
702            .join(", ");
703
704        // Set blob_roots variable
705        conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
706
707        // Helper: check if ref is inline data (scalarfs data: protocol)
708        conn.execute(
709            r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
710                ref[:5] = 'data:' OR ref[:5] = 'data+'
711            )"#,
712            [],
713        )?;
714
715        // Helper: check if ref is a relative file: path
716        conn.execute(
717            r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
718                ref[:5] = 'file:'
719            )"#,
720            [],
721        )?;
722
723        // Resolve storage ref to list of paths for pathvariable:
724        // - Inline data: pass through as single-element list
725        // - file: refs: expand to glob patterns across all blob_roots
726        // - Other (absolute paths): pass through
727        conn.execute(
728            r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
729                CASE
730                    WHEN is_inline_data(ref) THEN [ref]
731                    WHEN is_file_ref(ref) THEN
732                        [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
733                    ELSE [ref]
734                END
735            )"#,
736            [],
737        )?;
738
739        Ok(())
740    }
741
742    /// Attach configured remotes to the connection.
743    /// Note: S3 credentials are already set up by setup_s3_credentials().
744    fn attach_remotes(&self, conn: &Connection) -> Result<()> {
745        let remotes = self.config.auto_attach_remotes();
746
747        // Collect all file remote data directories to add to file_search_path
748        let remote_data_dirs: Vec<String> = remotes
749            .iter()
750            .filter_map(|r| r.data_dir())
751            .map(|p| p.display().to_string())
752            .collect();
753
754        // Add remote data directories to file_search_path (for parquet-mode remotes)
755        if !remote_data_dirs.is_empty() {
756            let current_path: String = conn
757                .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
758                .unwrap_or_default();
759
760            let mut paths: Vec<&str> = if current_path.is_empty() {
761                vec![]
762            } else {
763                current_path.split(',').collect()
764            };
765
766            for dir in &remote_data_dirs {
767                if !paths.contains(&dir.as_str()) {
768                    paths.push(dir);
769                }
770            }
771
772            let new_path = paths.join(",");
773            if let Err(e) = conn.execute(&format!("SET file_search_path = '{}'", new_path), []) {
774                eprintln!("Warning: Failed to set file_search_path: {}", e);
775            }
776        }
777
778        // Attach each remote
779        for remote in &remotes {
780            let attach_sql = remote.attach_sql();
781            if let Err(e) = conn.execute(&attach_sql, []) {
782                eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
783            }
784        }
785
786        Ok(())
787    }
788
789    /// Detect the table path for an attached remote.
790    ///
791    /// BIRD databases can have different structures:
792    /// - DuckDB mode: tables are in `local` schema (e.g., `remote_name.local.invocations`)
793    /// - Parquet mode: tables are at top level (e.g., `remote_name.invocations`)
794    /// - Standalone: tables may be at top level
795    ///
796    /// Returns the schema prefix to use (e.g., "local." or "").
797    pub(crate) fn detect_remote_table_path(&self, conn: &Connection, remote_schema: &str) -> String {
798        // Check if this remote has a `local` schema with tables (BIRD DuckDB mode)
799        let check_sql = format!(
800            "SELECT 1 FROM information_schema.tables \
801             WHERE table_catalog = '{}' AND table_schema = 'local' AND table_name = 'invocations' \
802             LIMIT 1",
803            remote_schema.trim_matches('"')
804        );
805        if conn.execute(&check_sql, []).is_ok() {
806            if let Ok(mut stmt) = conn.prepare(&check_sql) {
807                if stmt.query([]).is_ok_and(|mut rows| rows.next().is_ok_and(|r| r.is_some())) {
808                    return "local.".to_string();
809                }
810            }
811        }
812
813        // Default: tables at top level
814        String::new()
815    }
816
817    /// Create TEMPORARY macros for accessing remote data.
818    ///
819    /// We use TEMPORARY macros to avoid persisting references to attached databases
820    /// in the catalog. Persisted references cause database corruption when the
821    /// attachment is not present.
822    ///
823    /// Handles both BIRD databases (with `local` schema) and standalone databases.
824    ///
825    /// Usage: `SELECT * FROM remotes_invocations()` or `SELECT * FROM remote_<name>_invocations()`
826    fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
827        let remotes = self.config.auto_attach_remotes();
828        if remotes.is_empty() {
829            return Ok(());
830        }
831
832        // Create per-remote TEMPORARY macros for each table type
833        for remote in &remotes {
834            let schema = remote.quoted_schema_name();
835            let name = &remote.name;
836            // Sanitize name for use in macro identifier
837            let safe_name = name.replace(['-', '.'], "_");
838
839            // Detect if this is a BIRD database with tables in `local` schema
840            let table_prefix = self.detect_remote_table_path(conn, &schema);
841
842            for table in &["sessions", "invocations", "outputs", "events"] {
843                let macro_name = format!("\"remote_{safe_name}_{table}\"");
844                let sql = format!(
845                    r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
846                        SELECT *, '{name}' as _source FROM {schema}.{prefix}{table}
847                    )"#,
848                    macro_name = macro_name,
849                    name = name,
850                    schema = schema,
851                    prefix = table_prefix,
852                    table = table
853                );
854                if let Err(e) = conn.execute(&sql, []) {
855                    eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
856                }
857            }
858        }
859
860        // Create combined remotes_* TEMPORARY macros that union all remotes
861        for table in &["sessions", "invocations", "outputs", "events"] {
862            let mut union_parts: Vec<String> = remotes
863                .iter()
864                .map(|r| {
865                    let safe_name = r.name.replace(['-', '.'], "_");
866                    format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
867                })
868                .collect();
869
870            // Include placeholder for empty case
871            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
872
873            let sql = format!(
874                r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
875                    {union}
876                )"#,
877                table = table,
878                union = union_parts.join(" UNION ALL BY NAME ")
879            );
880            if let Err(e) = conn.execute(&sql, []) {
881                eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
882            }
883        }
884
885        // Rebuild remotes.* and unified.* views to include attached remote data
886        // These are regular views (not TEMPORARY) that reference attached databases.
887        // They're rebuilt on every connection open, so stale references are updated.
888        for table in &["sessions", "invocations", "outputs", "events"] {
889            // Build union of all remote data for this table
890            let mut union_parts: Vec<String> = remotes
891                .iter()
892                .map(|r| {
893                    let safe_name = r.name.replace(['-', '.'], "_");
894                    format!(
895                        "SELECT * FROM \"remote_{safe_name}_{table}\"()",
896                        safe_name = safe_name,
897                        table = table
898                    )
899                })
900                .collect();
901
902            // Always include placeholder for empty case
903            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
904
905            let remotes_sql = format!(
906                "CREATE OR REPLACE VIEW remotes.{table} AS {union}",
907                table = table,
908                union = union_parts.join(" UNION ALL BY NAME ")
909            );
910            if let Err(e) = conn.execute(&remotes_sql, []) {
911                eprintln!("Warning: Failed to rebuild remotes.{} view: {}", table, e);
912            }
913        }
914
915        // Rebuild unified.* views (they reference main.* and remotes.*)
916        let unified_views = r#"
917            CREATE OR REPLACE VIEW unified.sessions AS
918                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
919            CREATE OR REPLACE VIEW unified.invocations AS
920                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
921            CREATE OR REPLACE VIEW unified.outputs AS
922                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
923            CREATE OR REPLACE VIEW unified.events AS
924                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
925        "#;
926        if let Err(e) = conn.execute_batch(unified_views) {
927            eprintln!("Warning: Failed to rebuild unified views: {}", e);
928        }
929
930        Ok(())
931    }
932
933    /// Create TEMPORARY macros for cwd-filtered data.
934    ///
935    /// These filter main.* data to entries matching the current working directory.
936    /// Uses TEMPORARY macros to avoid persisting anything that changes per-connection.
937    ///
938    /// Usage: `SELECT * FROM cwd_invocations()`
939    fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
940        let cwd = std::env::current_dir()
941            .map(|p| p.to_string_lossy().to_string())
942            .unwrap_or_default();
943        let cwd_escaped = cwd.replace('\'', "''");
944
945        // Create TEMPORARY macros for cwd-filtered data
946        let macros = format!(
947            r#"
948            CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
949                SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
950            );
951            CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
952                SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
953            );
954            CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
955                SELECT o.* FROM main.outputs o
956                JOIN main.invocations i ON o.invocation_id = i.id
957                WHERE i.cwd LIKE '{}%'
958            );
959            CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
960                SELECT e.* FROM main.events e
961                JOIN main.invocations i ON e.invocation_id = i.id
962                WHERE i.cwd LIKE '{}%'
963            );
964            "#,
965            cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
966        );
967
968        conn.execute_batch(&macros)?;
969        Ok(())
970    }
971
972    /// Manually attach a specific remote.
973    pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
974        // Set up credentials
975        if let Some(provider) = &remote.credential_provider {
976            if remote.remote_type == crate::config::RemoteType::S3 {
977                let secret_sql = format!(
978                    "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
979                    remote.name, provider
980                );
981                conn.execute(&secret_sql, [])?;
982            }
983        }
984
985        // For file remotes, add the remote's data directory to file_search_path
986        // This allows parquet-mode remotes to resolve their relative file paths
987        if let Some(remote_data_dir) = remote.data_dir() {
988            // Get current file_search_path and append the remote's data directory
989            let current_path: String = conn
990                .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
991                .unwrap_or_default();
992
993            let remote_path = remote_data_dir.display().to_string();
994            let new_path = if current_path.is_empty() {
995                remote_path
996            } else if current_path.contains(&remote_path) {
997                // Already in the path
998                current_path
999            } else {
1000                format!("{},{}", current_path, remote_path)
1001            };
1002
1003            conn.execute(&format!("SET file_search_path = '{}'", new_path), [])?;
1004        }
1005
1006        // Attach
1007        conn.execute(&remote.attach_sql(), [])?;
1008        Ok(())
1009    }
1010
1011    /// Detach a remote.
1012    pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
1013        conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
1014        Ok(())
1015    }
1016
1017    /// Test connection to a remote. Returns Ok if successful.
1018    pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
1019        let conn = self.connection_with_options(false)?;
1020        self.attach_remote(&conn, remote)?;
1021
1022        // Try a simple query
1023        let test_sql = format!(
1024            "SELECT 1 FROM {}.invocations LIMIT 1",
1025            remote.quoted_schema_name()
1026        );
1027        conn.execute(&test_sql, [])?;
1028
1029        Ok(())
1030    }
1031
1032    /// Get config reference.
1033    pub fn config(&self) -> &Config {
1034        &self.config
1035    }
1036
1037    /// Query the store using SQL.
1038    ///
1039    /// Returns results as a Vec of rows, where each row is a Vec of string values.
1040    pub fn query(&self, sql: &str) -> Result<QueryResult> {
1041        let conn = self.connection()?;
1042        let mut stmt = conn.prepare(sql)?;
1043
1044        // Execute the query first to get column info
1045        let mut rows_iter = stmt.query([])?;
1046
1047        // Get column info from the rows iterator
1048        let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
1049        let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
1050            (0..column_count)
1051                .map(|i| {
1052                    row_ref
1053                        .column_name(i)
1054                        .map(|s| s.to_string())
1055                        .unwrap_or_else(|_| format!("col{}", i))
1056                })
1057                .collect()
1058        } else {
1059            Vec::new()
1060        };
1061
1062        // Collect all rows
1063        let mut result_rows = Vec::new();
1064        while let Some(row) = rows_iter.next()? {
1065            let mut values = Vec::with_capacity(column_count);
1066            for i in 0..column_count {
1067                // Get value as generic ValueRef and convert to string
1068                let value = match row.get_ref(i)? {
1069                    ValueRef::Null => "NULL".to_string(),
1070                    ValueRef::Boolean(b) => b.to_string(),
1071                    ValueRef::TinyInt(n) => n.to_string(),
1072                    ValueRef::SmallInt(n) => n.to_string(),
1073                    ValueRef::Int(n) => n.to_string(),
1074                    ValueRef::BigInt(n) => n.to_string(),
1075                    ValueRef::HugeInt(n) => n.to_string(),
1076                    ValueRef::UTinyInt(n) => n.to_string(),
1077                    ValueRef::USmallInt(n) => n.to_string(),
1078                    ValueRef::UInt(n) => n.to_string(),
1079                    ValueRef::UBigInt(n) => n.to_string(),
1080                    ValueRef::Float(f) => f.to_string(),
1081                    ValueRef::Double(f) => f.to_string(),
1082                    ValueRef::Decimal(d) => d.to_string(),
1083                    ValueRef::Timestamp(unit, val) => {
1084                        // Convert to microseconds then to DateTime
1085                        let micros = match unit {
1086                            TimeUnit::Second => val * 1_000_000,
1087                            TimeUnit::Millisecond => val * 1_000,
1088                            TimeUnit::Microsecond => val,
1089                            TimeUnit::Nanosecond => val / 1_000,
1090                        };
1091                        DateTime::<Utc>::from_timestamp_micros(micros)
1092                            .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
1093                            .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
1094                    }
1095                    ValueRef::Date32(days) => {
1096                        // Days since 1970-01-01
1097                        NaiveDate::from_ymd_opt(1970, 1, 1)
1098                            .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
1099                            .map(|d| d.format("%Y-%m-%d").to_string())
1100                            .unwrap_or_else(|| format!("<invalid date {}>", days))
1101                    }
1102                    ValueRef::Time64(unit, val) => {
1103                        // Convert to microseconds then to NaiveTime
1104                        let micros = match unit {
1105                            TimeUnit::Second => val * 1_000_000,
1106                            TimeUnit::Millisecond => val * 1_000,
1107                            TimeUnit::Microsecond => val,
1108                            TimeUnit::Nanosecond => val / 1_000,
1109                        };
1110                        let secs = (micros / 1_000_000) as u32;
1111                        let micro_part = (micros % 1_000_000) as u32;
1112                        NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
1113                            .map(|t| t.format("%H:%M:%S").to_string())
1114                            .unwrap_or_else(|| format!("<invalid time {}>", val))
1115                    }
1116                    ValueRef::Interval { months, days, nanos } => {
1117                        format!("{} months {} days {} ns", months, days, nanos)
1118                    }
1119                    ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
1120                    ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
1121                    other => {
1122                        // Convert to owned Value for complex types (List, Array, Map, Struct)
1123                        let owned: Value = other.into();
1124                        format_value(&owned)
1125                    }
1126                };
1127                values.push(value);
1128            }
1129            result_rows.push(values);
1130        }
1131
1132        Ok(QueryResult {
1133            columns: column_names,
1134            rows: result_rows,
1135        })
1136    }
1137
1138    /// Get the last invocation with its output (if any).
1139    pub fn last_invocation_with_output(
1140        &self,
1141    ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1142        if let Some(inv) = self.last_invocation()? {
1143            let output = self.get_output(&inv.id)?;
1144            Ok(Some((inv, output)))
1145        } else {
1146            Ok(None)
1147        }
1148    }
1149
1150    /// Write a batch of related records atomically.
1151    ///
1152    /// This is the preferred way to write an invocation with its outputs,
1153    /// session, and events together. In DuckDB mode, all writes are wrapped
1154    /// in a transaction. In Parquet mode, files are written atomically.
1155    pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1156        let invocation = batch
1157            .invocation
1158            .as_ref()
1159            .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1160
1161        match self.config.storage_mode {
1162            StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1163            StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1164        }
1165    }
1166
1167    /// Write batch using Parquet files (multi-writer safe).
1168    fn write_batch_parquet(
1169        &self,
1170        batch: &InvocationBatch,
1171        invocation: &InvocationRecord,
1172    ) -> Result<()> {
1173        // For Parquet mode, we write each record type separately.
1174        // Atomicity is per-file (temp + rename), but not across files.
1175        // This is acceptable because Parquet mode prioritizes concurrent writes.
1176
1177        // Write session first (if provided and not already registered)
1178        if let Some(ref session) = batch.session {
1179            self.ensure_session(session)?;
1180        }
1181
1182        // Write invocation
1183        self.write_invocation(invocation)?;
1184
1185        let date = invocation.date();
1186        let inv_id = invocation.id;
1187
1188        // Write outputs
1189        for (stream, content) in &batch.outputs {
1190            self.store_output(
1191                inv_id,
1192                stream,
1193                content,
1194                date,
1195                invocation.executable.as_deref(),
1196            )?;
1197        }
1198
1199        // Write events (if provided)
1200        if let Some(ref events) = batch.events {
1201            if !events.is_empty() {
1202                self.write_events(events)?;
1203            }
1204        }
1205
1206        Ok(())
1207    }
1208
1209    /// Write batch using DuckDB tables with transaction.
1210    fn write_batch_duckdb(
1211        &self,
1212        batch: &InvocationBatch,
1213        invocation: &InvocationRecord,
1214    ) -> Result<()> {
1215        let conn = self.connection()?;
1216
1217        // Begin transaction
1218        conn.execute("BEGIN TRANSACTION", [])?;
1219
1220        let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1221
1222        match result {
1223            Ok(()) => {
1224                conn.execute("COMMIT", [])?;
1225                Ok(())
1226            }
1227            Err(e) => {
1228                // Rollback on error
1229                let _ = conn.execute("ROLLBACK", []);
1230                Err(e)
1231            }
1232        }
1233    }
1234
1235    /// Inner implementation for DuckDB batch write (within transaction).
1236    fn write_batch_duckdb_inner(
1237        &self,
1238        conn: &Connection,
1239        batch: &InvocationBatch,
1240        invocation: &InvocationRecord,
1241    ) -> Result<()> {
1242        use base64::Engine;
1243
1244        let date = invocation.date();
1245        let inv_id = invocation.id;
1246
1247        // Write session (if provided)
1248        if let Some(ref session) = batch.session {
1249            // Check if session exists
1250            let exists: i64 = conn
1251                .query_row(
1252                    "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1253                    params![&session.session_id],
1254                    |row| row.get(0),
1255                )
1256                .unwrap_or(0);
1257
1258            if exists == 0 {
1259                conn.execute(
1260                    r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1261                    params![
1262                        session.session_id,
1263                        session.client_id,
1264                        session.invoker,
1265                        session.invoker_pid,
1266                        session.invoker_type,
1267                        session.registered_at.to_rfc3339(),
1268                        session.cwd,
1269                        session.date.to_string(),
1270                    ],
1271                )?;
1272            }
1273        }
1274
1275        // Write invocation
1276        conn.execute(
1277            r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1278            params![
1279                invocation.id.to_string(),
1280                invocation.session_id,
1281                invocation.timestamp.to_rfc3339(),
1282                invocation.duration_ms,
1283                invocation.cwd,
1284                invocation.cmd,
1285                invocation.executable,
1286                invocation.runner_id,
1287                invocation.exit_code,
1288                invocation.status,
1289                invocation.format_hint,
1290                invocation.client_id,
1291                invocation.hostname,
1292                invocation.username,
1293                invocation.tag,
1294                date.to_string(),
1295            ],
1296        )?;
1297
1298        // Write outputs
1299        for (stream, content) in &batch.outputs {
1300            // Compute hash
1301            let hash = blake3::hash(content);
1302            let hash_hex = hash.to_hex().to_string();
1303
1304            // Route by size
1305            let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1306                // Inline: use data: URL
1307                let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1308                let data_url = format!("data:application/octet-stream;base64,{}", b64);
1309                ("inline".to_string(), data_url)
1310            } else {
1311                // Blob: write file and register
1312                let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1313                let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1314
1315                if let Some(parent) = blob_path.parent() {
1316                    fs::create_dir_all(parent)?;
1317                }
1318
1319                let rel_path = blob_path
1320                    .strip_prefix(self.config.data_dir())
1321                    .map(|p| p.to_string_lossy().to_string())
1322                    .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1323
1324                // Write blob atomically
1325                let wrote_new = atomic::write_file(&blob_path, content)?;
1326
1327                if wrote_new {
1328                    conn.execute(
1329                        "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1330                        params![&hash_hex, content.len() as i64, &rel_path],
1331                    )?;
1332                } else {
1333                    conn.execute(
1334                        "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1335                        params![&hash_hex],
1336                    )?;
1337                }
1338
1339                ("blob".to_string(), format!("file://{}", rel_path))
1340            };
1341
1342            // Write output record
1343            let output_id = uuid::Uuid::now_v7();
1344            conn.execute(
1345                r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1346                params![
1347                    output_id.to_string(),
1348                    inv_id.to_string(),
1349                    stream,
1350                    hash_hex,
1351                    content.len() as i64,
1352                    storage_type,
1353                    storage_ref,
1354                    Option::<String>::None, // content_type
1355                    date.to_string(),
1356                ],
1357            )?;
1358        }
1359
1360        // Write events (if provided)
1361        if let Some(ref events) = batch.events {
1362            for event in events {
1363                conn.execute(
1364                    r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1365                    params![
1366                        event.id.to_string(),
1367                        event.invocation_id.to_string(),
1368                        event.client_id,
1369                        event.hostname,
1370                        event.event_type,
1371                        event.severity,
1372                        event.ref_file,
1373                        event.ref_line,
1374                        event.ref_column,
1375                        event.message,
1376                        event.error_code,
1377                        event.test_name,
1378                        event.status,
1379                        event.format_used,
1380                        event.date.to_string(),
1381                    ],
1382                )?;
1383            }
1384        }
1385
1386        Ok(())
1387    }
1388
1389    /// Load format hints from the config file.
1390    pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1391        let path = self.config.format_hints_path();
1392
1393        // Try new format-hints.toml first
1394        if path.exists() {
1395            return crate::FormatHints::load(&path);
1396        }
1397
1398        // Fall back to legacy event-formats.toml
1399        let legacy_path = self.config.event_formats_path();
1400        if legacy_path.exists() {
1401            return crate::FormatHints::load(&legacy_path);
1402        }
1403
1404        Ok(crate::FormatHints::new())
1405    }
1406
1407    /// Save format hints to the config file.
1408    pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1409        hints.save(&self.config.format_hints_path())
1410    }
1411
1412    /// Detect format for a command using format hints.
1413    ///
1414    /// Priority:
1415    /// 1. User-defined format hints (by priority)
1416    /// 2. Default format from config (or "auto")
1417    ///
1418    /// Note: duck_hunt detects formats from content analysis, not command names.
1419    /// Use format hints to map commands to formats, then duck_hunt parses the output.
1420    pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1421        let hints = self.load_format_hints()?;
1422        Ok(hints.detect(cmd).to_string())
1423    }
1424
1425    /// Get list of duck_hunt built-in formats.
1426    ///
1427    /// Note: duck_hunt detects formats from content analysis, not command patterns.
1428    /// This lists available format names that can be used with duck_hunt parsing.
1429    pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1430        let conn = self.connection()?;
1431
1432        let mut stmt = conn.prepare(
1433            "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1434        )?;
1435
1436        let rows = stmt.query_map([], |row| {
1437            Ok(BuiltinFormat {
1438                format: row.get(0)?,
1439                pattern: row.get::<_, String>(1)?, // description as "pattern" for display
1440                priority: row.get(2)?,
1441            })
1442        })?;
1443
1444        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1445        Ok(results)
1446    }
1447
1448    /// Check which format would be detected for a command.
1449    /// Returns the format name and source (user-defined or default).
1450    ///
1451    /// Note: duck_hunt detects formats from content, not command names.
1452    /// This only checks user-defined format hints.
1453    pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1454        let hints = self.load_format_hints()?;
1455
1456        // Check user-defined hints
1457        for hint in hints.hints() {
1458            if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1459                return Ok(FormatMatch {
1460                    format: hint.format.clone(),
1461                    source: FormatSource::UserDefined {
1462                        pattern: hint.pattern.clone(),
1463                        priority: hint.priority,
1464                    },
1465                });
1466            }
1467        }
1468
1469        // No match - use default
1470        Ok(FormatMatch {
1471            format: hints.default_format().to_string(),
1472            source: FormatSource::Default,
1473        })
1474    }
1475}
1476
1477/// A built-in format from duck_hunt.
1478#[derive(Debug, Clone)]
1479pub struct BuiltinFormat {
1480    pub format: String,
1481    pub pattern: String,
1482    pub priority: i32,
1483}
1484
1485/// Result of format detection.
1486#[derive(Debug, Clone)]
1487pub struct FormatMatch {
1488    pub format: String,
1489    pub source: FormatSource,
1490}
1491
1492/// Source of a format match.
1493#[derive(Debug, Clone)]
1494pub enum FormatSource {
1495    UserDefined { pattern: String, priority: i32 },
1496    Builtin { pattern: String, priority: i32 },
1497    Default,
1498}
1499
1500/// Result of a SQL query.
1501#[derive(Debug)]
1502pub struct QueryResult {
1503    pub columns: Vec<String>,
1504    pub rows: Vec<Vec<String>>,
1505}
1506
1507/// Sanitize a string for use in filenames.
1508fn sanitize_filename(s: &str) -> String {
1509    s.chars()
1510        .map(|c| match c {
1511            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1512            ' ' => '-',
1513            c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1514            _ => '_',
1515        })
1516        .take(64)
1517        .collect()
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522    use super::*;
1523    use crate::init::initialize;
1524    use crate::schema::SessionRecord;
1525    use tempfile::TempDir;
1526
1527    fn setup_store() -> (TempDir, Store) {
1528        let tmp = TempDir::new().unwrap();
1529        let config = Config::with_root(tmp.path());
1530        initialize(&config).unwrap();
1531        let store = Store::open(config).unwrap();
1532        (tmp, store)
1533    }
1534
1535    fn setup_store_duckdb() -> (TempDir, Store) {
1536        let tmp = TempDir::new().unwrap();
1537        let config = Config::with_duckdb_mode(tmp.path());
1538        initialize(&config).unwrap();
1539        let store = Store::open(config).unwrap();
1540        (tmp, store)
1541    }
1542
1543    #[test]
1544    fn test_store_open_uninitialized_fails() {
1545        let tmp = TempDir::new().unwrap();
1546        let config = Config::with_root(tmp.path());
1547
1548        let result = Store::open(config);
1549        assert!(matches!(result, Err(Error::NotInitialized(_))));
1550    }
1551
1552    #[test]
1553    fn test_sanitize_filename() {
1554        assert_eq!(sanitize_filename("make test"), "make-test");
1555        assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1556        assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1557    }
1558
1559    // Batch write tests - Parquet mode
1560
1561    #[test]
1562    fn test_batch_write_parquet_invocation_only() {
1563        let (_tmp, store) = setup_store();
1564
1565        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1566
1567        let batch = InvocationBatch::new(inv);
1568        store.write_batch(&batch).unwrap();
1569
1570        assert_eq!(store.invocation_count().unwrap(), 1);
1571    }
1572
1573    #[test]
1574    fn test_batch_write_parquet_with_output() {
1575        let (_tmp, store) = setup_store();
1576
1577        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1578        let inv_id = inv.id;
1579
1580        let batch = InvocationBatch::new(inv)
1581            .with_output("stdout", b"hello world\n".to_vec());
1582
1583        store.write_batch(&batch).unwrap();
1584
1585        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1586        assert_eq!(outputs.len(), 1);
1587        assert_eq!(outputs[0].stream, "stdout");
1588    }
1589
1590    #[test]
1591    fn test_batch_write_parquet_with_session() {
1592        let (_tmp, store) = setup_store();
1593
1594        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1595        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1596
1597        let batch = InvocationBatch::new(inv).with_session(session);
1598        store.write_batch(&batch).unwrap();
1599
1600        assert!(store.session_exists("test-session").unwrap());
1601    }
1602
1603    #[test]
1604    fn test_batch_write_parquet_full() {
1605        let (_tmp, store) = setup_store();
1606
1607        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1608        let inv_id = inv.id;
1609        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1610
1611        let batch = InvocationBatch::new(inv)
1612            .with_session(session)
1613            .with_output("stdout", b"Building...\n".to_vec())
1614            .with_output("stderr", b"error: failed\n".to_vec());
1615
1616        store.write_batch(&batch).unwrap();
1617
1618        assert_eq!(store.invocation_count().unwrap(), 1);
1619        assert!(store.session_exists("test-session").unwrap());
1620
1621        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1622        assert_eq!(outputs.len(), 2);
1623    }
1624
1625    // Batch write tests - DuckDB mode
1626
1627    #[test]
1628    fn test_batch_write_duckdb_invocation_only() {
1629        let (_tmp, store) = setup_store_duckdb();
1630
1631        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1632
1633        let batch = InvocationBatch::new(inv);
1634        store.write_batch(&batch).unwrap();
1635
1636        assert_eq!(store.invocation_count().unwrap(), 1);
1637    }
1638
1639    #[test]
1640    fn test_batch_write_duckdb_with_output() {
1641        let (_tmp, store) = setup_store_duckdb();
1642
1643        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1644        let inv_id = inv.id;
1645
1646        let batch = InvocationBatch::new(inv)
1647            .with_output("stdout", b"hello world\n".to_vec());
1648
1649        store.write_batch(&batch).unwrap();
1650
1651        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1652        assert_eq!(outputs.len(), 1);
1653        assert_eq!(outputs[0].stream, "stdout");
1654    }
1655
1656    #[test]
1657    fn test_batch_write_duckdb_with_session() {
1658        let (_tmp, store) = setup_store_duckdb();
1659
1660        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1661        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1662
1663        let batch = InvocationBatch::new(inv).with_session(session);
1664        store.write_batch(&batch).unwrap();
1665
1666        assert!(store.session_exists("test-session").unwrap());
1667    }
1668
1669    #[test]
1670    fn test_batch_write_duckdb_full() {
1671        let (_tmp, store) = setup_store_duckdb();
1672
1673        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1674        let inv_id = inv.id;
1675        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1676
1677        let batch = InvocationBatch::new(inv)
1678            .with_session(session)
1679            .with_output("stdout", b"Building...\n".to_vec())
1680            .with_output("stderr", b"error: failed\n".to_vec());
1681
1682        store.write_batch(&batch).unwrap();
1683
1684        assert_eq!(store.invocation_count().unwrap(), 1);
1685        assert!(store.session_exists("test-session").unwrap());
1686
1687        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1688        assert_eq!(outputs.len(), 2);
1689    }
1690
1691    #[test]
1692    fn test_batch_requires_invocation() {
1693        let (_tmp, store) = setup_store();
1694
1695        let batch = InvocationBatch::default();
1696        let result = store.write_batch(&batch);
1697
1698        assert!(result.is_err());
1699    }
1700
1701    // Extension loading tests
1702
1703    #[test]
1704    fn test_ensure_extension_parquet() {
1705        // Parquet is an official extension, should always be available
1706        let conn = duckdb::Connection::open_in_memory().unwrap();
1707        let result = ensure_extension(&conn, "parquet").unwrap();
1708        assert!(result, "parquet extension should be loadable");
1709    }
1710
1711    #[test]
1712    fn test_ensure_extension_icu() {
1713        // ICU is an official extension, should always be available
1714        let conn = duckdb::Connection::open_in_memory().unwrap();
1715        let result = ensure_extension(&conn, "icu").unwrap();
1716        assert!(result, "icu extension should be loadable");
1717    }
1718
1719    #[test]
1720    fn test_ensure_extension_community() {
1721        // Community extensions require allow_community_extensions
1722        let conn = duckdb::Connection::open_in_memory().unwrap();
1723        conn.execute("SET allow_community_extensions = true", []).unwrap();
1724
1725        // scalarfs and duck_hunt are community extensions
1726        let result = ensure_extension(&conn, "scalarfs").unwrap();
1727        assert!(result, "scalarfs extension should be loadable from community");
1728
1729        let result = ensure_extension(&conn, "duck_hunt").unwrap();
1730        assert!(result, "duck_hunt extension should be loadable from community");
1731    }
1732
1733    #[test]
1734    fn test_ensure_extension_nonexistent() {
1735        let conn = duckdb::Connection::open_in_memory().unwrap();
1736        conn.execute("SET allow_community_extensions = true", []).unwrap();
1737
1738        // A made-up extension should return false (not error)
1739        let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1740        assert!(!result, "nonexistent extension should return false");
1741    }
1742
1743    #[test]
1744    fn test_extension_loading_is_cached() {
1745        // Once installed, extensions should load quickly from cache
1746        let conn = duckdb::Connection::open_in_memory().unwrap();
1747
1748        // First load might install
1749        ensure_extension(&conn, "parquet").unwrap();
1750
1751        // Second load should be fast (from cache)
1752        let start = std::time::Instant::now();
1753        ensure_extension(&conn, "parquet").unwrap();
1754        let elapsed = start.elapsed();
1755
1756        // Should be very fast if cached (< 100ms)
1757        assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1758    }
1759}