Skip to main content

magic_bird/store/
mod.rs

1//! Store - handles writing and reading records.
2//!
3//! Uses DuckDB to write Parquet files and query across them.
4//!
5//! # V5 Schema
6//!
7//! The v5 schema splits invocations into attempts + outcomes:
8//! - `attempts`: Written at invocation start (cmd, cwd, timestamp, etc.)
9//! - `outcomes`: Written at invocation end (exit_code, duration, etc.)
10//! - `invocations`: VIEW joining attempts LEFT JOIN outcomes
11//!
12//! Use `start_invocation()` / `complete_invocation()` for the v5 API.
13//! The legacy `write_invocation()` still works for v4 compatibility.
14
15mod atomic;
16mod attempts;
17mod compact;
18mod events;
19mod invocations;
20mod outcomes;
21mod outputs;
22mod pending;
23mod remote;
24mod sessions;
25
26use std::fs;
27use std::thread;
28use std::time::Duration;
29
30use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
31use duckdb::{
32    params,
33    types::{TimeUnit, Value, ValueRef},
34    Connection,
35};
36
37use crate::config::StorageMode;
38use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
39use crate::{Config, Error, Result};
40
41/// Format a DuckDB Value to a human-readable string.
42/// Handles complex types like List, Array, Map, and Struct recursively.
43fn format_value(value: &Value) -> String {
44    match value {
45        Value::Null => "NULL".to_string(),
46        Value::Boolean(b) => b.to_string(),
47        Value::TinyInt(n) => n.to_string(),
48        Value::SmallInt(n) => n.to_string(),
49        Value::Int(n) => n.to_string(),
50        Value::BigInt(n) => n.to_string(),
51        Value::HugeInt(n) => n.to_string(),
52        Value::UTinyInt(n) => n.to_string(),
53        Value::USmallInt(n) => n.to_string(),
54        Value::UInt(n) => n.to_string(),
55        Value::UBigInt(n) => n.to_string(),
56        Value::Float(f) => f.to_string(),
57        Value::Double(f) => f.to_string(),
58        Value::Decimal(d) => d.to_string(),
59        Value::Timestamp(_, micros) => {
60            DateTime::<Utc>::from_timestamp_micros(*micros)
61                .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
62                .unwrap_or_else(|| format!("<timestamp {}>", micros))
63        }
64        Value::Text(s) => s.clone(),
65        Value::Blob(b) => format!("<blob {} bytes>", b.len()),
66        Value::Date32(days) => {
67            NaiveDate::from_ymd_opt(1970, 1, 1)
68                .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
69                .map(|d| d.format("%Y-%m-%d").to_string())
70                .unwrap_or_else(|| format!("<date {}>", days))
71        }
72        Value::Time64(_, micros) => {
73            let secs = (*micros / 1_000_000) as u32;
74            let micro_part = (*micros % 1_000_000) as u32;
75            NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
76                .map(|t| t.format("%H:%M:%S").to_string())
77                .unwrap_or_else(|| format!("<time {}>", micros))
78        }
79        Value::Interval { months, days, nanos } => {
80            format!("{} months {} days {} ns", months, days, nanos)
81        }
82        // Complex types
83        Value::List(items) => {
84            let formatted: Vec<String> = items.iter().map(format_value).collect();
85            format!("[{}]", formatted.join(", "))
86        }
87        Value::Array(items) => {
88            let formatted: Vec<String> = items.iter().map(format_value).collect();
89            format!("[{}]", formatted.join(", "))
90        }
91        Value::Map(map) => {
92            let formatted: Vec<String> = map
93                .iter()
94                .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
95                .collect();
96            format!("{{{}}}", formatted.join(", "))
97        }
98        Value::Struct(fields) => {
99            let formatted: Vec<String> = fields
100                .iter()
101                .map(|(k, v)| format!("{}: {}", k, format_value(v)))
102                .collect();
103            format!("{{{}}}", formatted.join(", "))
104        }
105        Value::Enum(s) => s.clone(),
106        _ => "<unknown>".to_string(),
107    }
108}
109
110// Re-export types from submodules
111pub use compact::{
112    ArchiveStats, AutoCompactOptions, CleanOptions, CleanStats, CompactOptions, CompactStats,
113    PruneStats,
114};
115pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
116pub use invocations::InvocationSummary;
117pub use outputs::OutputInfo;
118pub use pending::{is_runner_alive, RecoveryStats};
119pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
120
121// Re-export format detection types (defined below)
122// BuiltinFormat, FormatMatch, FormatSource are defined at the bottom of this file
123
124/// A batch of related records to write atomically.
125///
126/// Use this when you want to write an invocation along with its outputs,
127/// session, and/or events in a single transaction.
128#[derive(Debug, Default)]
129pub struct InvocationBatch {
130    /// The invocation record (required).
131    pub invocation: Option<InvocationRecord>,
132
133    /// Output streams with their content: (stream_name, content).
134    /// Common streams: "stdout", "stderr", "combined".
135    pub outputs: Vec<(String, Vec<u8>)>,
136
137    /// Session record (optional, created if not already registered).
138    pub session: Option<SessionRecord>,
139
140    /// Pre-extracted events (optional).
141    pub events: Option<Vec<EventRecord>>,
142}
143
144impl InvocationBatch {
145    /// Create a new batch with an invocation.
146    pub fn new(invocation: InvocationRecord) -> Self {
147        Self {
148            invocation: Some(invocation),
149            outputs: Vec::new(),
150            session: None,
151            events: None,
152        }
153    }
154
155    /// Add an output stream.
156    pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
157        self.outputs.push((stream.into(), content));
158        self
159    }
160
161    /// Add a session record.
162    pub fn with_session(mut self, session: SessionRecord) -> Self {
163        self.session = Some(session);
164        self
165    }
166
167    /// Add pre-extracted events.
168    pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
169        self.events = Some(events);
170        self
171    }
172}
173
174/// Options for creating a database connection.
175///
176/// Controls what gets loaded and attached when opening a connection.
177/// Use this for explicit control over connection behavior.
178///
179/// # Connection Stages
180///
181/// 1. **Extensions** (always): parquet, icu, scalarfs, duck_hunt
182/// 2. **Blob resolution** (always): S3 credentials, blob_roots macro
183/// 3. **Migration** (optional): Upgrade existing installations to new schema
184/// 4. **Remotes** (optional): Attach remote databases, rebuild remotes.* views
185/// 5. **Project** (optional): Attach project-local database if in a project
186/// 6. **CWD views** (optional): Rebuild cwd.* views for current directory
187#[derive(Debug, Clone, Default)]
188pub struct ConnectionOptions {
189    /// Attach configured remotes (default: true).
190    /// When true, remote databases are attached and remotes.* views are rebuilt
191    /// to include the attached data.
192    pub attach_remotes: bool,
193
194    /// Attach project database if in a project directory (default: true).
195    pub attach_project: bool,
196
197    /// Rebuild cwd.* views for current working directory (default: true).
198    /// These views filter main.* data to entries matching the current directory.
199    pub create_ephemeral_views: bool,
200
201    /// Run migration for existing installations (default: false).
202    /// Only enable this for explicit upgrade operations.
203    pub run_migration: bool,
204}
205
206impl ConnectionOptions {
207    /// Create options for a full connection (default behavior).
208    pub fn full() -> Self {
209        Self {
210            attach_remotes: true,
211            attach_project: true,
212            create_ephemeral_views: true,
213            run_migration: false,
214        }
215    }
216
217    /// Create options for a minimal connection (no attachments).
218    /// Useful for write operations that don't need remote data.
219    pub fn minimal() -> Self {
220        Self {
221            attach_remotes: false,
222            attach_project: false,
223            create_ephemeral_views: false,
224            run_migration: false,
225        }
226    }
227
228    /// Create options for a migration/upgrade connection.
229    pub fn for_migration() -> Self {
230        Self {
231            attach_remotes: false,
232            attach_project: false,
233            create_ephemeral_views: false,
234            run_migration: true,
235        }
236    }
237}
238
239/// Ensure a DuckDB extension is loaded, installing if necessary.
240///
241/// Attempts in order:
242/// 1. LOAD (extension might already be available)
243/// 2. INSTALL from default repository, then LOAD
244/// 3. INSTALL FROM community, then LOAD
245///
246/// Returns Ok(true) if loaded successfully, Ok(false) if extension unavailable.
247fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
248    // Try loading directly first (already installed/cached)
249    if conn.execute(&format!("LOAD {}", name), []).is_ok() {
250        return Ok(true);
251    }
252
253    // Try installing from default repository
254    if conn.execute(&format!("INSTALL {}", name), []).is_ok()
255        && conn.execute(&format!("LOAD {}", name), []).is_ok()
256    {
257        return Ok(true);
258    }
259
260    // Try installing from community repository
261    if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
262        && conn.execute(&format!("LOAD {}", name), []).is_ok()
263    {
264        return Ok(true);
265    }
266
267    Ok(false)
268}
269
270/// A BIRD store for reading and writing records.
271pub struct Store {
272    config: Config,
273}
274
275impl Store {
276    /// Open an existing BIRD store.
277    pub fn open(config: Config) -> Result<Self> {
278        if !config.db_path().exists() {
279            return Err(Error::NotInitialized(config.bird_root.clone()));
280        }
281        Ok(Self { config })
282    }
283
284    /// Open a DuckDB connection with retry and exponential backoff.
285    ///
286    /// DuckDB uses file locking for concurrent access. When multiple processes
287    /// (e.g., background shell hook saves) try to access the database simultaneously,
288    /// this method retries with exponential backoff to avoid lock conflicts.
289    fn open_connection_with_retry(&self) -> Result<Connection> {
290        const MAX_RETRIES: u32 = 10;
291        const INITIAL_DELAY_MS: u64 = 10;
292        const MAX_DELAY_MS: u64 = 1000;
293
294        let db_path = self.config.db_path();
295        let mut delay_ms = INITIAL_DELAY_MS;
296        let mut last_error = None;
297
298        for attempt in 0..MAX_RETRIES {
299            match Connection::open(&db_path) {
300                Ok(conn) => return Ok(conn),
301                Err(e) => {
302                    let err_msg = e.to_string();
303                    // Check if this is a lock conflict error
304                    if err_msg.contains("Could not set lock")
305                        || err_msg.contains("Conflicting lock")
306                        || err_msg.contains("database is locked")
307                    {
308                        last_error = Some(e);
309                        if attempt < MAX_RETRIES - 1 {
310                            // Add jitter to avoid thundering herd
311                            let jitter = (attempt as u64 * 7) % 10;
312                            thread::sleep(Duration::from_millis(delay_ms + jitter));
313                            delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
314                            continue;
315                        }
316                    } else {
317                        // Non-lock error, fail immediately
318                        return Err(e.into());
319                    }
320                }
321            }
322        }
323
324        // All retries exhausted
325        Err(last_error
326            .map(|e| e.into())
327            .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
328    }
329
330    /// Get a DuckDB connection with full features (attachments, ephemeral views).
331    pub fn connection(&self) -> Result<Connection> {
332        self.connect(ConnectionOptions::full())
333    }
334
335    /// Get a DuckDB connection with optional remote attachment (legacy API).
336    pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
337        let opts = if attach_remotes {
338            ConnectionOptions::full()
339        } else {
340            ConnectionOptions::minimal()
341        };
342        self.connect(opts)
343    }
344
345    /// Get a DuckDB connection with explicit options.
346    ///
347    /// This is the main connection method. Use `ConnectionOptions` to control:
348    /// - Whether remotes are attached
349    /// - Whether project database is attached
350    /// - Whether ephemeral views are created
351    /// - Whether migration should run
352    ///
353    /// Uses retry with exponential backoff to handle concurrent access.
354    pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
355        let conn = self.open_connection_with_retry()?;
356
357        // ===== Load required extensions =====
358        // Uses default extension directory (typically ~/.duckdb/extensions)
359        // Falls back to community repository if not in default
360        conn.execute("SET allow_community_extensions = true", [])?;
361
362        for ext in ["parquet", "icu"] {
363            if !ensure_extension(&conn, ext)? {
364                return Err(Error::Extension(format!(
365                    "Required extension '{}' could not be loaded",
366                    ext
367                )));
368            }
369        }
370
371        // Optional community extensions - warn if missing
372        for (ext, desc) in [
373            ("scalarfs", "data: URL support for inline blobs"),
374            ("duck_hunt", "log/output parsing for event extraction"),
375        ] {
376            if !ensure_extension(&conn, ext)? {
377                eprintln!("Warning: {} extension not available ({})", ext, desc);
378            }
379        }
380
381        // Set file search path so views resolve relative paths correctly
382        conn.execute(
383            &format!(
384                "SET file_search_path = '{}'",
385                self.config.data_dir().display()
386            ),
387            [],
388        )?;
389
390        // ===== Optional: Run migration for existing installations =====
391        if opts.run_migration {
392            self.migrate_to_new_schema(&conn)?;
393        }
394
395        // ===== Always set up blob resolution =====
396        // S3 credentials needed before blob_roots is used
397        self.setup_s3_credentials(&conn)?;
398        self.setup_blob_resolution(&conn)?;
399
400        // ===== Optional: Attach remotes and create access macros =====
401        if opts.attach_remotes && !self.config.remotes.is_empty() {
402            self.attach_remotes(&conn)?;
403            self.create_remote_macros(&conn)?;
404        }
405
406        // ===== Optional: Attach project database =====
407        if opts.attach_project {
408            self.attach_project_db(&conn)?;
409        }
410
411        // ===== Optional: Create cwd macros =====
412        // These TEMPORARY macros filter by current working directory
413        if opts.create_ephemeral_views {
414            self.create_cwd_macros(&conn)?;
415        }
416
417        Ok(conn)
418    }
419
420    /// Attach project-level `.bird/` database if we're in a project directory.
421    ///
422    /// The project database is attached as read-only under schema "project".
423    /// This allows queries like `SELECT * FROM project.invocations`.
424    fn attach_project_db(&self, conn: &Connection) -> Result<()> {
425        use crate::project::find_current_project;
426
427        let Some(project) = find_current_project() else {
428            return Ok(()); // Not in a project
429        };
430
431        if !project.is_initialized() {
432            return Ok(()); // Project not initialized
433        }
434
435        // Don't attach if project DB is the same as user DB
436        if project.db_path == self.config.db_path() {
437            return Ok(());
438        }
439
440        // Attach as read-only
441        let attach_sql = format!(
442            "ATTACH '{}' AS project (READ_ONLY)",
443            project.db_path.display()
444        );
445
446        if let Err(e) = conn.execute(&attach_sql, []) {
447            // Log but don't fail - project DB might be locked or inaccessible
448            eprintln!("Note: Could not attach project database: {}", e);
449        }
450
451        Ok(())
452    }
453
454    /// Migrate existing installations to the new schema architecture.
455    ///
456    /// Checks if the `local` schema exists; if not, creates the new schema
457    /// structure and migrates data from old `*_table` tables.
458    fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
459        // Check if already migrated (local schema exists)
460        let local_exists: bool = conn
461            .query_row(
462                "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
463                [],
464                |row| row.get(0),
465            )
466            .unwrap_or(false);
467
468        if local_exists {
469            return Ok(());
470        }
471
472        // This is an old installation - need to migrate
473        // For now, just create the new schemas. Data migration would require
474        // moving data from old tables/views to new structure.
475        // TODO: Implement full data migration if needed
476
477        eprintln!("Note: Migrating to new schema architecture...");
478
479        // Create core schemas
480        conn.execute_batch(
481            r#"
482            CREATE SCHEMA IF NOT EXISTS local;
483            CREATE SCHEMA IF NOT EXISTS cached_placeholder;
484            CREATE SCHEMA IF NOT EXISTS remote_placeholder;
485            CREATE SCHEMA IF NOT EXISTS caches;
486            CREATE SCHEMA IF NOT EXISTS remotes;
487            CREATE SCHEMA IF NOT EXISTS unified;
488            CREATE SCHEMA IF NOT EXISTS cwd;
489            "#,
490        )?;
491
492        // For DuckDB mode, create local tables
493        if self.config.storage_mode == crate::StorageMode::DuckDB {
494            conn.execute_batch(
495                r#"
496                CREATE TABLE IF NOT EXISTS local.sessions (
497                    session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
498                    invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
499                );
500                CREATE TABLE IF NOT EXISTS local.invocations (
501                    id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
502                    cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
503                    format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
504                    tag VARCHAR, date DATE
505                );
506                CREATE TABLE IF NOT EXISTS local.outputs (
507                    id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
508                    byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
509                    content_type VARCHAR, date DATE
510                );
511                CREATE TABLE IF NOT EXISTS local.events (
512                    id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
513                    event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
514                    ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
515                    status VARCHAR, format_used VARCHAR, date DATE
516                );
517                "#,
518            )?;
519
520            // Copy data from old tables if they exist
521            let old_tables_exist: bool = conn
522                .query_row(
523                    "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
524                    [],
525                    |row| row.get(0),
526                )
527                .unwrap_or(false);
528
529            if old_tables_exist {
530                conn.execute_batch(
531                    r#"
532                    INSERT INTO local.sessions SELECT * FROM sessions_table;
533                    INSERT INTO local.invocations SELECT * FROM invocations_table;
534                    INSERT INTO local.outputs SELECT * FROM outputs_table;
535                    INSERT INTO local.events SELECT * FROM events_table;
536                    "#,
537                )?;
538            }
539        } else {
540            // Parquet mode - create views over parquet files
541            conn.execute_batch(
542                r#"
543                CREATE OR REPLACE VIEW local.sessions AS
544                SELECT * EXCLUDE (filename) FROM read_parquet(
545                    'recent/sessions/**/*.parquet',
546                    union_by_name = true, hive_partitioning = true, filename = true
547                );
548                CREATE OR REPLACE VIEW local.invocations AS
549                SELECT * EXCLUDE (filename) FROM read_parquet(
550                    'recent/invocations/**/*.parquet',
551                    union_by_name = true, hive_partitioning = true, filename = true
552                );
553                CREATE OR REPLACE VIEW local.outputs AS
554                SELECT * EXCLUDE (filename) FROM read_parquet(
555                    'recent/outputs/**/*.parquet',
556                    union_by_name = true, hive_partitioning = true, filename = true
557                );
558                CREATE OR REPLACE VIEW local.events AS
559                SELECT * EXCLUDE (filename) FROM read_parquet(
560                    'recent/events/**/*.parquet',
561                    union_by_name = true, hive_partitioning = true, filename = true
562                );
563                "#,
564            )?;
565        }
566
567        // Create placeholder schemas
568        conn.execute_batch(
569            r#"
570            CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
571                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
572                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
573            );
574            CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
575                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
576                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
577                status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
578            );
579            CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
580                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
581                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
582                content_type VARCHAR, date DATE, _source VARCHAR
583            );
584            CREATE TABLE IF NOT EXISTS cached_placeholder.events (
585                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
586                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
587                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
588                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
589            );
590            CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
591                session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
592                invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
593            );
594            CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
595                id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
596                cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
597                status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
598            );
599            CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
600                id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
601                byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
602                content_type VARCHAR, date DATE, _source VARCHAR
603            );
604            CREATE TABLE IF NOT EXISTS remote_placeholder.events (
605                id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
606                event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
607                ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
608                status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
609            );
610            "#,
611        )?;
612
613        // Create union schemas
614        conn.execute_batch(
615            r#"
616            CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
617            CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
618            CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
619            CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
620
621            CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
622            CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
623            CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
624            CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
625
626            CREATE OR REPLACE VIEW main.sessions AS
627                SELECT *, 'local' as _source FROM local.sessions
628                UNION ALL BY NAME SELECT * FROM caches.sessions;
629            CREATE OR REPLACE VIEW main.invocations AS
630                SELECT *, 'local' as _source FROM local.invocations
631                UNION ALL BY NAME SELECT * FROM caches.invocations;
632            CREATE OR REPLACE VIEW main.outputs AS
633                SELECT *, 'local' as _source FROM local.outputs
634                UNION ALL BY NAME SELECT * FROM caches.outputs;
635            CREATE OR REPLACE VIEW main.events AS
636                SELECT *, 'local' as _source FROM local.events
637                UNION ALL BY NAME SELECT * FROM caches.events;
638
639            CREATE OR REPLACE VIEW unified.sessions AS
640                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
641            CREATE OR REPLACE VIEW unified.invocations AS
642                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
643            CREATE OR REPLACE VIEW unified.outputs AS
644                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
645            CREATE OR REPLACE VIEW unified.events AS
646                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
647
648            -- Qualified views: deduplicated with source list
649            CREATE OR REPLACE VIEW unified.qualified_sessions AS
650                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
651                FROM unified.sessions GROUP BY ALL;
652            CREATE OR REPLACE VIEW unified.qualified_invocations AS
653                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
654                FROM unified.invocations GROUP BY ALL;
655            CREATE OR REPLACE VIEW unified.qualified_outputs AS
656                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
657                FROM unified.outputs GROUP BY ALL;
658            CREATE OR REPLACE VIEW unified.qualified_events AS
659                SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
660                FROM unified.events GROUP BY ALL;
661            "#,
662        )?;
663
664        Ok(())
665    }
666
667    /// Set up S3 credentials for all remotes that use S3.
668    /// This is called early so that blob resolution can access S3 paths.
669    fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
670        // Check if any remote uses S3
671        let has_s3 = self.config.remotes.iter().any(|r| {
672            r.remote_type == crate::config::RemoteType::S3
673        });
674
675        if !has_s3 {
676            return Ok(());
677        }
678
679        // Set up credentials for each S3 remote
680        for remote in &self.config.remotes {
681            if remote.remote_type == crate::config::RemoteType::S3 {
682                if let Some(provider) = &remote.credential_provider {
683                    let secret_sql = format!(
684                        "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
685                        remote.name, provider
686                    );
687                    if let Err(e) = conn.execute(&secret_sql, []) {
688                        eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
689                    }
690                }
691            }
692        }
693
694        Ok(())
695    }
696
697    /// Set up blob_roots variable and blob resolution macros.
698    ///
699    /// Storage refs use URI schemes to indicate type:
700    /// - `data:`, `data+varchar:`, `data+blob:` - inline content (scalarfs)
701    /// - `file:path` - relative path, resolved against blob_roots
702    /// - Absolute paths (`s3://`, `/path/`) - used directly
703    fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
704        let blob_roots = self.config.blob_roots();
705
706        // Format as SQL array literal
707        let roots_sql: String = blob_roots
708            .iter()
709            .map(|r| format!("'{}'", r.replace('\'', "''")))
710            .collect::<Vec<_>>()
711            .join(", ");
712
713        // Set blob_roots variable
714        conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
715
716        // Helper: check if ref is inline data (scalarfs data: protocol)
717        conn.execute(
718            r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
719                ref[:5] = 'data:' OR ref[:5] = 'data+'
720            )"#,
721            [],
722        )?;
723
724        // Helper: check if ref is a relative file: path
725        conn.execute(
726            r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
727                ref[:5] = 'file:'
728            )"#,
729            [],
730        )?;
731
732        // Resolve storage ref to list of paths for pathvariable:
733        // - Inline data: pass through as single-element list
734        // - file: refs: expand to glob patterns across all blob_roots
735        // - Other (absolute paths): pass through
736        conn.execute(
737            r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
738                CASE
739                    WHEN is_inline_data(ref) THEN [ref]
740                    WHEN is_file_ref(ref) THEN
741                        [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
742                    ELSE [ref]
743                END
744            )"#,
745            [],
746        )?;
747
748        Ok(())
749    }
750
751    /// Attach configured remotes to the connection.
752    /// Note: S3 credentials are already set up by setup_s3_credentials().
753    fn attach_remotes(&self, conn: &Connection) -> Result<()> {
754        let remotes = self.config.auto_attach_remotes();
755
756        // Collect all file remote data directories to add to file_search_path
757        let remote_data_dirs: Vec<String> = remotes
758            .iter()
759            .filter_map(|r| r.data_dir())
760            .map(|p| p.display().to_string())
761            .collect();
762
763        // Add remote data directories to file_search_path (for parquet-mode remotes)
764        if !remote_data_dirs.is_empty() {
765            let current_path: String = conn
766                .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
767                .unwrap_or_default();
768
769            let mut paths: Vec<&str> = if current_path.is_empty() {
770                vec![]
771            } else {
772                current_path.split(',').collect()
773            };
774
775            for dir in &remote_data_dirs {
776                if !paths.contains(&dir.as_str()) {
777                    paths.push(dir);
778                }
779            }
780
781            let new_path = paths.join(",");
782            if let Err(e) = conn.execute(&format!("SET file_search_path = '{}'", new_path), []) {
783                eprintln!("Warning: Failed to set file_search_path: {}", e);
784            }
785        }
786
787        // Attach each remote
788        for remote in &remotes {
789            let attach_sql = remote.attach_sql();
790            if let Err(e) = conn.execute(&attach_sql, []) {
791                eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
792            }
793        }
794
795        Ok(())
796    }
797
798    /// Detect the table path for an attached remote.
799    ///
800    /// BIRD databases can have different structures:
801    /// - DuckDB mode: tables are in `local` schema (e.g., `remote_name.local.invocations`)
802    /// - Parquet mode: tables are at top level (e.g., `remote_name.invocations`)
803    /// - Standalone: tables may be at top level
804    ///
805    /// Returns the schema prefix to use (e.g., "local." or "").
806    pub(crate) fn detect_remote_table_path(&self, conn: &Connection, remote_schema: &str) -> String {
807        // Check if this remote has a `local` schema with tables (BIRD DuckDB mode)
808        let check_sql = format!(
809            "SELECT 1 FROM information_schema.tables \
810             WHERE table_catalog = '{}' AND table_schema = 'local' AND table_name = 'invocations' \
811             LIMIT 1",
812            remote_schema.trim_matches('"')
813        );
814        if conn.execute(&check_sql, []).is_ok() {
815            if let Ok(mut stmt) = conn.prepare(&check_sql) {
816                if stmt.query([]).is_ok_and(|mut rows| rows.next().is_ok_and(|r| r.is_some())) {
817                    return "local.".to_string();
818                }
819            }
820        }
821
822        // Default: tables at top level
823        String::new()
824    }
825
826    /// Create TEMPORARY macros for accessing remote data.
827    ///
828    /// We use TEMPORARY macros to avoid persisting references to attached databases
829    /// in the catalog. Persisted references cause database corruption when the
830    /// attachment is not present.
831    ///
832    /// Handles both BIRD databases (with `local` schema) and standalone databases.
833    ///
834    /// Usage: `SELECT * FROM remotes_invocations()` or `SELECT * FROM remote_<name>_invocations()`
835    fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
836        let remotes = self.config.auto_attach_remotes();
837        if remotes.is_empty() {
838            return Ok(());
839        }
840
841        // Create per-remote TEMPORARY macros for each table type
842        for remote in &remotes {
843            let schema = remote.quoted_schema_name();
844            let name = &remote.name;
845            // Sanitize name for use in macro identifier
846            let safe_name = name.replace(['-', '.'], "_");
847
848            // Detect if this is a BIRD database with tables in `local` schema
849            let table_prefix = self.detect_remote_table_path(conn, &schema);
850
851            for table in &["sessions", "invocations", "outputs", "events"] {
852                let macro_name = format!("\"remote_{safe_name}_{table}\"");
853                let sql = format!(
854                    r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
855                        SELECT *, '{name}' as _source FROM {schema}.{prefix}{table}
856                    )"#,
857                    macro_name = macro_name,
858                    name = name,
859                    schema = schema,
860                    prefix = table_prefix,
861                    table = table
862                );
863                if let Err(e) = conn.execute(&sql, []) {
864                    eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
865                }
866            }
867        }
868
869        // Create combined remotes_* TEMPORARY macros that union all remotes
870        for table in &["sessions", "invocations", "outputs", "events"] {
871            let mut union_parts: Vec<String> = remotes
872                .iter()
873                .map(|r| {
874                    let safe_name = r.name.replace(['-', '.'], "_");
875                    format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
876                })
877                .collect();
878
879            // Include placeholder for empty case
880            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
881
882            let sql = format!(
883                r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
884                    {union}
885                )"#,
886                table = table,
887                union = union_parts.join(" UNION ALL BY NAME ")
888            );
889            if let Err(e) = conn.execute(&sql, []) {
890                eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
891            }
892        }
893
894        // Rebuild remotes.* and unified.* views to include attached remote data
895        // These are regular views (not TEMPORARY) that reference attached databases.
896        // They're rebuilt on every connection open, so stale references are updated.
897        for table in &["sessions", "invocations", "outputs", "events"] {
898            // Build union of all remote data for this table
899            let mut union_parts: Vec<String> = remotes
900                .iter()
901                .map(|r| {
902                    let safe_name = r.name.replace(['-', '.'], "_");
903                    format!(
904                        "SELECT * FROM \"remote_{safe_name}_{table}\"()",
905                        safe_name = safe_name,
906                        table = table
907                    )
908                })
909                .collect();
910
911            // Always include placeholder for empty case
912            union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
913
914            let remotes_sql = format!(
915                "CREATE OR REPLACE VIEW remotes.{table} AS {union}",
916                table = table,
917                union = union_parts.join(" UNION ALL BY NAME ")
918            );
919            if let Err(e) = conn.execute(&remotes_sql, []) {
920                eprintln!("Warning: Failed to rebuild remotes.{} view: {}", table, e);
921            }
922        }
923
924        // Rebuild unified.* views (they reference main.* and remotes.*)
925        let unified_views = r#"
926            CREATE OR REPLACE VIEW unified.sessions AS
927                SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
928            CREATE OR REPLACE VIEW unified.invocations AS
929                SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
930            CREATE OR REPLACE VIEW unified.outputs AS
931                SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
932            CREATE OR REPLACE VIEW unified.events AS
933                SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
934        "#;
935        if let Err(e) = conn.execute_batch(unified_views) {
936            eprintln!("Warning: Failed to rebuild unified views: {}", e);
937        }
938
939        Ok(())
940    }
941
942    /// Create TEMPORARY macros for cwd-filtered data.
943    ///
944    /// These filter main.* data to entries matching the current working directory.
945    /// Uses TEMPORARY macros to avoid persisting anything that changes per-connection.
946    ///
947    /// Usage: `SELECT * FROM cwd_invocations()`
948    fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
949        let cwd = std::env::current_dir()
950            .map(|p| p.to_string_lossy().to_string())
951            .unwrap_or_default();
952        let cwd_escaped = cwd.replace('\'', "''");
953
954        // Create TEMPORARY macros for cwd-filtered data
955        let macros = format!(
956            r#"
957            CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
958                SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
959            );
960            CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
961                SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
962            );
963            CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
964                SELECT o.* FROM main.outputs o
965                JOIN main.invocations i ON o.invocation_id = i.id
966                WHERE i.cwd LIKE '{}%'
967            );
968            CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
969                SELECT e.* FROM main.events e
970                JOIN main.invocations i ON e.invocation_id = i.id
971                WHERE i.cwd LIKE '{}%'
972            );
973            "#,
974            cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
975        );
976
977        conn.execute_batch(&macros)?;
978        Ok(())
979    }
980
981    /// Manually attach a specific remote.
982    pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
983        // Set up credentials
984        if let Some(provider) = &remote.credential_provider {
985            if remote.remote_type == crate::config::RemoteType::S3 {
986                let secret_sql = format!(
987                    "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
988                    remote.name, provider
989                );
990                conn.execute(&secret_sql, [])?;
991            }
992        }
993
994        // For file remotes, add the remote's data directory to file_search_path
995        // This allows parquet-mode remotes to resolve their relative file paths
996        if let Some(remote_data_dir) = remote.data_dir() {
997            // Get current file_search_path and append the remote's data directory
998            let current_path: String = conn
999                .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
1000                .unwrap_or_default();
1001
1002            let remote_path = remote_data_dir.display().to_string();
1003            let new_path = if current_path.is_empty() {
1004                remote_path
1005            } else if current_path.contains(&remote_path) {
1006                // Already in the path
1007                current_path
1008            } else {
1009                format!("{},{}", current_path, remote_path)
1010            };
1011
1012            conn.execute(&format!("SET file_search_path = '{}'", new_path), [])?;
1013        }
1014
1015        // Attach
1016        conn.execute(&remote.attach_sql(), [])?;
1017        Ok(())
1018    }
1019
1020    /// Detach a remote.
1021    pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
1022        conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
1023        Ok(())
1024    }
1025
1026    /// Test connection to a remote. Returns Ok if successful.
1027    pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
1028        let conn = self.connection_with_options(false)?;
1029        self.attach_remote(&conn, remote)?;
1030
1031        // Try a simple query
1032        let test_sql = format!(
1033            "SELECT 1 FROM {}.invocations LIMIT 1",
1034            remote.quoted_schema_name()
1035        );
1036        conn.execute(&test_sql, [])?;
1037
1038        Ok(())
1039    }
1040
1041    /// Get config reference.
1042    pub fn config(&self) -> &Config {
1043        &self.config
1044    }
1045
1046    /// Query the store using SQL.
1047    ///
1048    /// Returns results as a Vec of rows, where each row is a Vec of string values.
1049    pub fn query(&self, sql: &str) -> Result<QueryResult> {
1050        let conn = self.connection()?;
1051        let mut stmt = conn.prepare(sql)?;
1052
1053        // Execute the query first to get column info
1054        let mut rows_iter = stmt.query([])?;
1055
1056        // Get column info from the rows iterator
1057        let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
1058        let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
1059            (0..column_count)
1060                .map(|i| {
1061                    row_ref
1062                        .column_name(i)
1063                        .map(|s| s.to_string())
1064                        .unwrap_or_else(|_| format!("col{}", i))
1065                })
1066                .collect()
1067        } else {
1068            Vec::new()
1069        };
1070
1071        // Collect all rows
1072        let mut result_rows = Vec::new();
1073        while let Some(row) = rows_iter.next()? {
1074            let mut values = Vec::with_capacity(column_count);
1075            for i in 0..column_count {
1076                // Get value as generic ValueRef and convert to string
1077                let value = match row.get_ref(i)? {
1078                    ValueRef::Null => "NULL".to_string(),
1079                    ValueRef::Boolean(b) => b.to_string(),
1080                    ValueRef::TinyInt(n) => n.to_string(),
1081                    ValueRef::SmallInt(n) => n.to_string(),
1082                    ValueRef::Int(n) => n.to_string(),
1083                    ValueRef::BigInt(n) => n.to_string(),
1084                    ValueRef::HugeInt(n) => n.to_string(),
1085                    ValueRef::UTinyInt(n) => n.to_string(),
1086                    ValueRef::USmallInt(n) => n.to_string(),
1087                    ValueRef::UInt(n) => n.to_string(),
1088                    ValueRef::UBigInt(n) => n.to_string(),
1089                    ValueRef::Float(f) => f.to_string(),
1090                    ValueRef::Double(f) => f.to_string(),
1091                    ValueRef::Decimal(d) => d.to_string(),
1092                    ValueRef::Timestamp(unit, val) => {
1093                        // Convert to microseconds then to DateTime
1094                        let micros = match unit {
1095                            TimeUnit::Second => val * 1_000_000,
1096                            TimeUnit::Millisecond => val * 1_000,
1097                            TimeUnit::Microsecond => val,
1098                            TimeUnit::Nanosecond => val / 1_000,
1099                        };
1100                        DateTime::<Utc>::from_timestamp_micros(micros)
1101                            .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
1102                            .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
1103                    }
1104                    ValueRef::Date32(days) => {
1105                        // Days since 1970-01-01
1106                        NaiveDate::from_ymd_opt(1970, 1, 1)
1107                            .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
1108                            .map(|d| d.format("%Y-%m-%d").to_string())
1109                            .unwrap_or_else(|| format!("<invalid date {}>", days))
1110                    }
1111                    ValueRef::Time64(unit, val) => {
1112                        // Convert to microseconds then to NaiveTime
1113                        let micros = match unit {
1114                            TimeUnit::Second => val * 1_000_000,
1115                            TimeUnit::Millisecond => val * 1_000,
1116                            TimeUnit::Microsecond => val,
1117                            TimeUnit::Nanosecond => val / 1_000,
1118                        };
1119                        let secs = (micros / 1_000_000) as u32;
1120                        let micro_part = (micros % 1_000_000) as u32;
1121                        NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
1122                            .map(|t| t.format("%H:%M:%S").to_string())
1123                            .unwrap_or_else(|| format!("<invalid time {}>", val))
1124                    }
1125                    ValueRef::Interval { months, days, nanos } => {
1126                        format!("{} months {} days {} ns", months, days, nanos)
1127                    }
1128                    ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
1129                    ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
1130                    other => {
1131                        // Convert to owned Value for complex types (List, Array, Map, Struct)
1132                        let owned: Value = other.into();
1133                        format_value(&owned)
1134                    }
1135                };
1136                values.push(value);
1137            }
1138            result_rows.push(values);
1139        }
1140
1141        Ok(QueryResult {
1142            columns: column_names,
1143            rows: result_rows,
1144        })
1145    }
1146
1147    /// Get the last invocation with its output (if any).
1148    pub fn last_invocation_with_output(
1149        &self,
1150    ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1151        if let Some(inv) = self.last_invocation()? {
1152            let output = self.get_output(&inv.id)?;
1153            Ok(Some((inv, output)))
1154        } else {
1155            Ok(None)
1156        }
1157    }
1158
1159    /// Write a batch of related records atomically.
1160    ///
1161    /// This is the preferred way to write an invocation with its outputs,
1162    /// session, and events together. In DuckDB mode, all writes are wrapped
1163    /// in a transaction. In Parquet mode, files are written atomically.
1164    pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1165        let invocation = batch
1166            .invocation
1167            .as_ref()
1168            .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1169
1170        match self.config.storage_mode {
1171            StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1172            StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1173        }
1174    }
1175
1176    /// Write batch using Parquet files (multi-writer safe).
1177    fn write_batch_parquet(
1178        &self,
1179        batch: &InvocationBatch,
1180        invocation: &InvocationRecord,
1181    ) -> Result<()> {
1182        // For Parquet mode, we write each record type separately.
1183        // Atomicity is per-file (temp + rename), but not across files.
1184        // This is acceptable because Parquet mode prioritizes concurrent writes.
1185
1186        // Write session first (if provided and not already registered)
1187        if let Some(ref session) = batch.session {
1188            self.ensure_session(session)?;
1189        }
1190
1191        // Write invocation
1192        self.write_invocation(invocation)?;
1193
1194        let date = invocation.date();
1195        let inv_id = invocation.id;
1196
1197        // Write outputs
1198        for (stream, content) in &batch.outputs {
1199            self.store_output(
1200                inv_id,
1201                stream,
1202                content,
1203                date,
1204                invocation.executable.as_deref(),
1205            )?;
1206        }
1207
1208        // Write events (if provided)
1209        if let Some(ref events) = batch.events {
1210            if !events.is_empty() {
1211                self.write_events(events)?;
1212            }
1213        }
1214
1215        Ok(())
1216    }
1217
1218    /// Write batch using DuckDB tables with transaction.
1219    fn write_batch_duckdb(
1220        &self,
1221        batch: &InvocationBatch,
1222        invocation: &InvocationRecord,
1223    ) -> Result<()> {
1224        let conn = self.connection()?;
1225
1226        // Begin transaction
1227        conn.execute("BEGIN TRANSACTION", [])?;
1228
1229        let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1230
1231        match result {
1232            Ok(()) => {
1233                conn.execute("COMMIT", [])?;
1234                Ok(())
1235            }
1236            Err(e) => {
1237                // Rollback on error
1238                let _ = conn.execute("ROLLBACK", []);
1239                Err(e)
1240            }
1241        }
1242    }
1243
1244    /// Inner implementation for DuckDB batch write (within transaction).
1245    fn write_batch_duckdb_inner(
1246        &self,
1247        conn: &Connection,
1248        batch: &InvocationBatch,
1249        invocation: &InvocationRecord,
1250    ) -> Result<()> {
1251        use base64::Engine;
1252
1253        let date = invocation.date();
1254        let inv_id = invocation.id;
1255
1256        // Write session (if provided)
1257        if let Some(ref session) = batch.session {
1258            // Check if session exists
1259            let exists: i64 = conn
1260                .query_row(
1261                    "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1262                    params![&session.session_id],
1263                    |row| row.get(0),
1264                )
1265                .unwrap_or(0);
1266
1267            if exists == 0 {
1268                conn.execute(
1269                    r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1270                    params![
1271                        session.session_id,
1272                        session.client_id,
1273                        session.invoker,
1274                        session.invoker_pid,
1275                        session.invoker_type,
1276                        session.registered_at.to_rfc3339(),
1277                        session.cwd,
1278                        session.date.to_string(),
1279                    ],
1280                )?;
1281            }
1282        }
1283
1284        // V5: Write attempt and outcome instead of invocation
1285        let attempt = invocation.to_attempt();
1286        let outcome = invocation.to_outcome();
1287
1288        // Convert metadata HashMap to DuckDB MAP format
1289        let attempt_metadata_map = if attempt.metadata.is_empty() {
1290            "map([],[]::JSON[])".to_string()
1291        } else {
1292            let entries: Vec<String> = attempt.metadata.iter()
1293                .map(|(k, v)| {
1294                    let key = k.replace('\'', "''");
1295                    let value = v.to_string().replace('\'', "''");
1296                    format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
1297                })
1298                .collect();
1299            format!("map_from_entries([{}])", entries.join(", "))
1300        };
1301
1302        // Write attempt
1303        conn.execute(
1304            &format!(
1305                r#"INSERT INTO local.attempts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?)"#,
1306                attempt_metadata_map
1307            ),
1308            params![
1309                attempt.id.to_string(),
1310                attempt.timestamp.to_rfc3339(),
1311                attempt.cmd,
1312                attempt.cwd,
1313                attempt.session_id,
1314                attempt.tag,
1315                attempt.source_client,
1316                attempt.machine_id,
1317                attempt.hostname,
1318                attempt.executable,
1319                attempt.format_hint,
1320                date.to_string(),
1321            ],
1322        )?;
1323
1324        // Write outcome if completed
1325        if let Some(outcome) = outcome {
1326            let outcome_metadata_map = if outcome.metadata.is_empty() {
1327                "map([],[]::JSON[])".to_string()
1328            } else {
1329                let entries: Vec<String> = outcome.metadata.iter()
1330                    .map(|(k, v)| {
1331                        let key = k.replace('\'', "''");
1332                        let value = v.to_string().replace('\'', "''");
1333                        format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
1334                    })
1335                    .collect();
1336                format!("map_from_entries([{}])", entries.join(", "))
1337            };
1338
1339            conn.execute(
1340                &format!(
1341                    r#"INSERT INTO local.outcomes VALUES (?, ?, ?, ?, ?, ?, {}, ?)"#,
1342                    outcome_metadata_map
1343                ),
1344                params![
1345                    outcome.attempt_id.to_string(),
1346                    outcome.completed_at.to_rfc3339(),
1347                    outcome.exit_code,
1348                    outcome.duration_ms,
1349                    outcome.signal,
1350                    outcome.timeout,
1351                    outcome.date.to_string(),
1352                ],
1353            )?;
1354        }
1355
1356        // Write outputs
1357        for (stream, content) in &batch.outputs {
1358            // Compute hash
1359            let hash = blake3::hash(content);
1360            let hash_hex = hash.to_hex().to_string();
1361
1362            // Route by size
1363            let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1364                // Inline: use data: URL
1365                let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1366                let data_url = format!("data:application/octet-stream;base64,{}", b64);
1367                ("inline".to_string(), data_url)
1368            } else {
1369                // Blob: write file and register
1370                let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1371                let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1372
1373                if let Some(parent) = blob_path.parent() {
1374                    fs::create_dir_all(parent)?;
1375                }
1376
1377                let rel_path = blob_path
1378                    .strip_prefix(self.config.data_dir())
1379                    .map(|p| p.to_string_lossy().to_string())
1380                    .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1381
1382                // Write blob atomically
1383                let wrote_new = atomic::write_file(&blob_path, content)?;
1384
1385                if wrote_new {
1386                    conn.execute(
1387                        "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1388                        params![&hash_hex, content.len() as i64, &rel_path],
1389                    )?;
1390                } else {
1391                    conn.execute(
1392                        "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1393                        params![&hash_hex],
1394                    )?;
1395                }
1396
1397                ("blob".to_string(), format!("file://{}", rel_path))
1398            };
1399
1400            // Write output record
1401            let output_id = uuid::Uuid::now_v7();
1402            conn.execute(
1403                r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1404                params![
1405                    output_id.to_string(),
1406                    inv_id.to_string(),
1407                    stream,
1408                    hash_hex,
1409                    content.len() as i64,
1410                    storage_type,
1411                    storage_ref,
1412                    Option::<String>::None, // content_type
1413                    date.to_string(),
1414                ],
1415            )?;
1416        }
1417
1418        // Write events (if provided)
1419        if let Some(ref events) = batch.events {
1420            for event in events {
1421                conn.execute(
1422                    r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1423                    params![
1424                        event.id.to_string(),
1425                        event.invocation_id.to_string(),
1426                        event.client_id,
1427                        event.hostname,
1428                        event.event_type,
1429                        event.severity,
1430                        event.ref_file,
1431                        event.ref_line,
1432                        event.ref_column,
1433                        event.message,
1434                        event.error_code,
1435                        event.test_name,
1436                        event.status,
1437                        event.format_used,
1438                        event.date.to_string(),
1439                    ],
1440                )?;
1441            }
1442        }
1443
1444        Ok(())
1445    }
1446
1447    /// Load format hints from the config file.
1448    pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1449        let path = self.config.format_hints_path();
1450
1451        // Try new format-hints.toml first
1452        if path.exists() {
1453            return crate::FormatHints::load(&path);
1454        }
1455
1456        // Fall back to legacy event-formats.toml
1457        let legacy_path = self.config.event_formats_path();
1458        if legacy_path.exists() {
1459            return crate::FormatHints::load(&legacy_path);
1460        }
1461
1462        Ok(crate::FormatHints::new())
1463    }
1464
1465    /// Save format hints to the config file.
1466    pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1467        hints.save(&self.config.format_hints_path())
1468    }
1469
1470    /// Detect format for a command using format hints.
1471    ///
1472    /// Priority:
1473    /// 1. User-defined format hints (by priority)
1474    /// 2. Default format from config (or "auto")
1475    ///
1476    /// Note: duck_hunt detects formats from content analysis, not command names.
1477    /// Use format hints to map commands to formats, then duck_hunt parses the output.
1478    pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1479        let hints = self.load_format_hints()?;
1480        Ok(hints.detect(cmd).to_string())
1481    }
1482
1483    /// Get list of duck_hunt built-in formats.
1484    ///
1485    /// Note: duck_hunt detects formats from content analysis, not command patterns.
1486    /// This lists available format names that can be used with duck_hunt parsing.
1487    pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1488        let conn = self.connection()?;
1489
1490        let mut stmt = conn.prepare(
1491            "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1492        )?;
1493
1494        let rows = stmt.query_map([], |row| {
1495            Ok(BuiltinFormat {
1496                format: row.get(0)?,
1497                pattern: row.get::<_, String>(1)?, // description as "pattern" for display
1498                priority: row.get(2)?,
1499            })
1500        })?;
1501
1502        let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1503        Ok(results)
1504    }
1505
1506    /// Check which format would be detected for a command.
1507    /// Returns the format name and source (user-defined or default).
1508    ///
1509    /// Note: duck_hunt detects formats from content, not command names.
1510    /// This only checks user-defined format hints.
1511    pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1512        let hints = self.load_format_hints()?;
1513
1514        // Check user-defined hints
1515        for hint in hints.hints() {
1516            if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1517                return Ok(FormatMatch {
1518                    format: hint.format.clone(),
1519                    source: FormatSource::UserDefined {
1520                        pattern: hint.pattern.clone(),
1521                        priority: hint.priority,
1522                    },
1523                });
1524            }
1525        }
1526
1527        // No match - use default
1528        Ok(FormatMatch {
1529            format: hints.default_format().to_string(),
1530            source: FormatSource::Default,
1531        })
1532    }
1533}
1534
1535/// A built-in format from duck_hunt.
1536#[derive(Debug, Clone)]
1537pub struct BuiltinFormat {
1538    pub format: String,
1539    pub pattern: String,
1540    pub priority: i32,
1541}
1542
1543/// Result of format detection.
1544#[derive(Debug, Clone)]
1545pub struct FormatMatch {
1546    pub format: String,
1547    pub source: FormatSource,
1548}
1549
1550/// Source of a format match.
1551#[derive(Debug, Clone)]
1552pub enum FormatSource {
1553    UserDefined { pattern: String, priority: i32 },
1554    Builtin { pattern: String, priority: i32 },
1555    Default,
1556}
1557
1558/// Result of a SQL query.
1559#[derive(Debug)]
1560pub struct QueryResult {
1561    pub columns: Vec<String>,
1562    pub rows: Vec<Vec<String>>,
1563}
1564
1565/// Sanitize a string for use in filenames.
1566fn sanitize_filename(s: &str) -> String {
1567    s.chars()
1568        .map(|c| match c {
1569            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1570            ' ' => '-',
1571            c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1572            _ => '_',
1573        })
1574        .take(64)
1575        .collect()
1576}
1577
1578#[cfg(test)]
1579mod tests {
1580    use super::*;
1581    use crate::init::initialize;
1582    use crate::schema::SessionRecord;
1583    use tempfile::TempDir;
1584
1585    fn setup_store() -> (TempDir, Store) {
1586        let tmp = TempDir::new().unwrap();
1587        let config = Config::with_root(tmp.path());
1588        initialize(&config).unwrap();
1589        let store = Store::open(config).unwrap();
1590        (tmp, store)
1591    }
1592
1593    fn setup_store_duckdb() -> (TempDir, Store) {
1594        let tmp = TempDir::new().unwrap();
1595        let config = Config::with_duckdb_mode(tmp.path());
1596        initialize(&config).unwrap();
1597        let store = Store::open(config).unwrap();
1598        (tmp, store)
1599    }
1600
1601    #[test]
1602    fn test_store_open_uninitialized_fails() {
1603        let tmp = TempDir::new().unwrap();
1604        let config = Config::with_root(tmp.path());
1605
1606        let result = Store::open(config);
1607        assert!(matches!(result, Err(Error::NotInitialized(_))));
1608    }
1609
1610    #[test]
1611    fn test_sanitize_filename() {
1612        assert_eq!(sanitize_filename("make test"), "make-test");
1613        assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1614        assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1615    }
1616
1617    // Batch write tests - Parquet mode
1618
1619    #[test]
1620    fn test_batch_write_parquet_invocation_only() {
1621        let (_tmp, store) = setup_store();
1622
1623        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1624
1625        let batch = InvocationBatch::new(inv);
1626        store.write_batch(&batch).unwrap();
1627
1628        assert_eq!(store.invocation_count().unwrap(), 1);
1629    }
1630
1631    #[test]
1632    fn test_batch_write_parquet_with_output() {
1633        let (_tmp, store) = setup_store();
1634
1635        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1636        let inv_id = inv.id;
1637
1638        let batch = InvocationBatch::new(inv)
1639            .with_output("stdout", b"hello world\n".to_vec());
1640
1641        store.write_batch(&batch).unwrap();
1642
1643        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1644        assert_eq!(outputs.len(), 1);
1645        assert_eq!(outputs[0].stream, "stdout");
1646    }
1647
1648    #[test]
1649    fn test_batch_write_parquet_with_session() {
1650        let (_tmp, store) = setup_store();
1651
1652        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1653        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1654
1655        let batch = InvocationBatch::new(inv).with_session(session);
1656        store.write_batch(&batch).unwrap();
1657
1658        assert!(store.session_exists("test-session").unwrap());
1659    }
1660
1661    #[test]
1662    fn test_batch_write_parquet_full() {
1663        let (_tmp, store) = setup_store();
1664
1665        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1666        let inv_id = inv.id;
1667        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1668
1669        let batch = InvocationBatch::new(inv)
1670            .with_session(session)
1671            .with_output("stdout", b"Building...\n".to_vec())
1672            .with_output("stderr", b"error: failed\n".to_vec());
1673
1674        store.write_batch(&batch).unwrap();
1675
1676        assert_eq!(store.invocation_count().unwrap(), 1);
1677        assert!(store.session_exists("test-session").unwrap());
1678
1679        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1680        assert_eq!(outputs.len(), 2);
1681    }
1682
1683    // Batch write tests - DuckDB mode
1684
1685    #[test]
1686    fn test_batch_write_duckdb_invocation_only() {
1687        let (_tmp, store) = setup_store_duckdb();
1688
1689        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1690
1691        let batch = InvocationBatch::new(inv);
1692        store.write_batch(&batch).unwrap();
1693
1694        assert_eq!(store.invocation_count().unwrap(), 1);
1695    }
1696
1697    #[test]
1698    fn test_batch_write_duckdb_with_output() {
1699        let (_tmp, store) = setup_store_duckdb();
1700
1701        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1702        let inv_id = inv.id;
1703
1704        let batch = InvocationBatch::new(inv)
1705            .with_output("stdout", b"hello world\n".to_vec());
1706
1707        store.write_batch(&batch).unwrap();
1708
1709        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1710        assert_eq!(outputs.len(), 1);
1711        assert_eq!(outputs[0].stream, "stdout");
1712    }
1713
1714    #[test]
1715    fn test_batch_write_duckdb_with_session() {
1716        let (_tmp, store) = setup_store_duckdb();
1717
1718        let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1719        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1720
1721        let batch = InvocationBatch::new(inv).with_session(session);
1722        store.write_batch(&batch).unwrap();
1723
1724        assert!(store.session_exists("test-session").unwrap());
1725    }
1726
1727    #[test]
1728    fn test_batch_write_duckdb_full() {
1729        let (_tmp, store) = setup_store_duckdb();
1730
1731        let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1732        let inv_id = inv.id;
1733        let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1734
1735        let batch = InvocationBatch::new(inv)
1736            .with_session(session)
1737            .with_output("stdout", b"Building...\n".to_vec())
1738            .with_output("stderr", b"error: failed\n".to_vec());
1739
1740        store.write_batch(&batch).unwrap();
1741
1742        assert_eq!(store.invocation_count().unwrap(), 1);
1743        assert!(store.session_exists("test-session").unwrap());
1744
1745        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1746        assert_eq!(outputs.len(), 2);
1747    }
1748
1749    #[test]
1750    fn test_batch_requires_invocation() {
1751        let (_tmp, store) = setup_store();
1752
1753        let batch = InvocationBatch::default();
1754        let result = store.write_batch(&batch);
1755
1756        assert!(result.is_err());
1757    }
1758
1759    // Extension loading tests
1760
1761    #[test]
1762    fn test_ensure_extension_parquet() {
1763        // Parquet is an official extension, should always be available
1764        let conn = duckdb::Connection::open_in_memory().unwrap();
1765        let result = ensure_extension(&conn, "parquet").unwrap();
1766        assert!(result, "parquet extension should be loadable");
1767    }
1768
1769    #[test]
1770    fn test_ensure_extension_icu() {
1771        // ICU is an official extension, should always be available
1772        let conn = duckdb::Connection::open_in_memory().unwrap();
1773        let result = ensure_extension(&conn, "icu").unwrap();
1774        assert!(result, "icu extension should be loadable");
1775    }
1776
1777    #[test]
1778    fn test_ensure_extension_community() {
1779        // Community extensions require allow_community_extensions
1780        let conn = duckdb::Connection::open_in_memory().unwrap();
1781        conn.execute("SET allow_community_extensions = true", []).unwrap();
1782
1783        // scalarfs and duck_hunt are community extensions
1784        let result = ensure_extension(&conn, "scalarfs").unwrap();
1785        assert!(result, "scalarfs extension should be loadable from community");
1786
1787        let result = ensure_extension(&conn, "duck_hunt").unwrap();
1788        assert!(result, "duck_hunt extension should be loadable from community");
1789    }
1790
1791    #[test]
1792    fn test_ensure_extension_nonexistent() {
1793        let conn = duckdb::Connection::open_in_memory().unwrap();
1794        conn.execute("SET allow_community_extensions = true", []).unwrap();
1795
1796        // A made-up extension should return false (not error)
1797        let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1798        assert!(!result, "nonexistent extension should return false");
1799    }
1800
1801    #[test]
1802    fn test_extension_loading_is_cached() {
1803        // Once installed, extensions should load quickly from cache
1804        let conn = duckdb::Connection::open_in_memory().unwrap();
1805
1806        // First load might install
1807        ensure_extension(&conn, "parquet").unwrap();
1808
1809        // Second load should be fast (from cache)
1810        let start = std::time::Instant::now();
1811        ensure_extension(&conn, "parquet").unwrap();
1812        let elapsed = start.elapsed();
1813
1814        // Should be very fast if cached (< 100ms)
1815        assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1816    }
1817}