Skip to main content

magic_bird/
init.rs

1//! BIRD initialization - creates directory structure and database.
2//!
3//! # Schema Architecture
4//!
5//! BIRD uses a multi-schema architecture for flexible data organization:
6//!
7//! ## Data Schemas (contain actual tables)
8//! - `local` - Locally generated data (tables in DuckDB mode, parquet views in parquet mode)
9//! - `cached_<name>` - One per remote, contains data pulled/synced from that remote
10//! - `cached_placeholder` - Empty tables (ensures `caches` views work with no cached data)
11//!
12//! ## Attached Schemas (live remote connections)
13//! - `remote_<name>` - Attached remote databases (read-only)
14//! - `remote_placeholder` - Empty tables (ensures `remotes` views work with no remotes)
15//!
16//! ## Union Schemas (dynamic views)
17//! - `caches` - Union of all `cached_*` schemas
18//! - `remotes` - Union of all `remote_*` schemas
19//! - `main` - Union of `local` + `caches` (all data we own locally)
20//! - `unified` - Union of `main` + `remotes` (everything)
21//! - `cwd` - Views filtered to current working directory
22//!
23//! ## Reserved Schema Names
24//! - `local`, `main`, `unified`, `cwd`, `caches`, `remotes` - Core schemas
25//! - `cached_*` - Reserved prefix for cached remote data
26//! - `remote_*` - Reserved prefix for attached remotes
27//! - `project` - Reserved for attached project-level database
28
29use std::fs;
30
31use crate::config::StorageMode;
32use crate::{Config, Error, Result};
33
34/// Initialize a new BIRD installation.
35///
36/// Creates the directory structure and initializes the DuckDB database
37/// with the schema architecture.
38pub fn initialize(config: &Config) -> Result<()> {
39    let bird_root = &config.bird_root;
40
41    // Check if already initialized
42    if config.db_path().exists() {
43        return Err(Error::AlreadyInitialized(bird_root.clone()));
44    }
45
46    // Create directory structure
47    create_directories(config)?;
48
49    // Initialize DuckDB with schemas
50    init_database(config)?;
51
52    // Save config
53    config.save()?;
54
55    // Create default event-formats.toml
56    create_event_formats_config(config)?;
57
58    Ok(())
59}
60
61/// Create the BIRD directory structure.
62fn create_directories(config: &Config) -> Result<()> {
63    // Common directories for both modes
64    let mut dirs = vec![
65        config.bird_root.join("db"),
66        config.blobs_dir(), // blobs/content
67        config.archive_dir().join("blobs/content"),
68        config.extensions_dir(),
69        config.sql_dir(),
70    ];
71
72    // Parquet mode needs partition directories
73    if config.storage_mode == StorageMode::Parquet {
74        dirs.extend([
75            // V5 schema directories
76            config.recent_dir().join("attempts"),
77            config.recent_dir().join("outcomes"),
78            // Shared data directories
79            config.recent_dir().join("outputs"),
80            config.recent_dir().join("sessions"),
81            config.recent_dir().join("events"),
82        ]);
83    }
84
85    for dir in &dirs {
86        fs::create_dir_all(dir)?;
87    }
88
89    Ok(())
90}
91
92/// Initialize the DuckDB database with schema architecture.
93fn init_database(config: &Config) -> Result<()> {
94    let conn = duckdb::Connection::open(config.db_path())?;
95
96    // Enable community extensions
97    conn.execute("SET allow_community_extensions = true", [])?;
98
99    // Install and load required extensions
100    // This pre-installs to the default location so connect() is fast
101    install_extensions(&conn)?;
102
103    // Set file search path so views use relative paths
104    let data_dir = config.data_dir();
105    conn.execute(
106        &format!("SET file_search_path = '{}'", data_dir.display()),
107        [],
108    )?;
109
110    // Create core schemas
111    create_core_schemas(&conn)?;
112
113    // Create bird_meta table for schema versioning (v5)
114    create_bird_meta(&conn)?;
115
116    // Create blob_registry table in main schema (used by both modes)
117    create_blob_registry(&conn)?;
118
119    // Mode-specific initialization for local schema
120    match config.storage_mode {
121        StorageMode::Parquet => {
122            // Create seed parquet files with correct schema but no rows (v5: attempts, outcomes)
123            create_seed_files(&conn, config)?;
124            // Create local schema with views over parquet files (v5: attempts, outcomes tables)
125            create_local_parquet_views(&conn)?;
126        }
127        StorageMode::DuckDB => {
128            // Create local schema with tables for direct storage (v5: attempts, outcomes tables)
129            create_local_tables(&conn)?;
130        }
131    }
132
133    // Create placeholder schemas (for empty unions)
134    create_placeholder_schemas(&conn)?;
135
136    // Create union schemas (caches, remotes, main, bird)
137    create_union_schemas(&conn)?;
138
139    // Create helper views in main schema
140    create_helper_views(&conn)?;
141
142    // Create cwd schema views (placeholders, rebuilt at connection time)
143    create_cwd_views(&conn)?;
144
145    Ok(())
146}
147
148/// Create core schemas used by BIRD.
149fn create_core_schemas(conn: &duckdb::Connection) -> Result<()> {
150    conn.execute_batch(
151        r#"
152        -- Data schemas
153        CREATE SCHEMA IF NOT EXISTS local;
154        CREATE SCHEMA IF NOT EXISTS cached_placeholder;
155        CREATE SCHEMA IF NOT EXISTS remote_placeholder;
156
157        -- Union schemas
158        CREATE SCHEMA IF NOT EXISTS caches;
159        CREATE SCHEMA IF NOT EXISTS remotes;
160        -- main already exists as default schema
161        CREATE SCHEMA IF NOT EXISTS unified;
162        CREATE SCHEMA IF NOT EXISTS cwd;
163        "#,
164    )?;
165    Ok(())
166}
167
168/// Create placeholder schemas with empty tables.
169/// These ensure union views work even when no cached/remote schemas exist.
170///
171/// V5 schema: Includes attempts and outcomes tables, plus invocations VIEW.
172fn create_placeholder_schemas(conn: &duckdb::Connection) -> Result<()> {
173    // Cached placeholder - empty tables with correct schema
174    conn.execute_batch(
175        r#"
176        CREATE TABLE cached_placeholder.sessions (
177            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
178            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
179            _source VARCHAR
180        );
181        -- V5: Attempts table (invocation start)
182        CREATE TABLE cached_placeholder.attempts (
183            id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
184            tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
185            executable VARCHAR, format_hint VARCHAR, metadata JSON, date DATE,
186            _source VARCHAR
187        );
188        -- V5: Outcomes table (invocation end)
189        CREATE TABLE cached_placeholder.outcomes (
190            attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
191            signal INTEGER, timeout BOOLEAN, metadata JSON, date DATE,
192            _source VARCHAR
193        );
194        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
195        CREATE VIEW cached_placeholder.invocations AS
196        SELECT
197            a.id,
198            a.session_id,
199            a.timestamp,
200            o.duration_ms,
201            a.cwd,
202            a.cmd,
203            a.executable,
204            o.exit_code,
205            CASE
206                WHEN o.attempt_id IS NULL THEN 'pending'
207                WHEN o.exit_code IS NULL THEN 'orphaned'
208                ELSE 'completed'
209            END AS status,
210            a.format_hint,
211            a.source_client AS client_id,
212            a.hostname,
213            a.tag,
214            o.signal,
215            o.timeout,
216            o.completed_at,
217            CASE
218                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
219                WHEN a.metadata IS NULL THEN o.metadata
220                WHEN o.metadata IS NULL THEN a.metadata
221                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
222            END AS metadata,
223            a.date,
224            a._source
225        FROM cached_placeholder.attempts a
226        LEFT JOIN cached_placeholder.outcomes o ON a.id = o.attempt_id;
227
228        CREATE TABLE cached_placeholder.outputs (
229            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
230            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
231            content_type VARCHAR, date DATE, _source VARCHAR
232        );
233        CREATE TABLE cached_placeholder.events (
234            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
235            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
236            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
237            status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
238        );
239        "#,
240    )?;
241
242    // Remote placeholder - same structure
243    conn.execute_batch(
244        r#"
245        CREATE TABLE remote_placeholder.sessions (
246            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
247            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
248            _source VARCHAR
249        );
250        -- V5: Attempts table (invocation start)
251        CREATE TABLE remote_placeholder.attempts (
252            id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
253            tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
254            executable VARCHAR, format_hint VARCHAR, metadata JSON, date DATE,
255            _source VARCHAR
256        );
257        -- V5: Outcomes table (invocation end)
258        CREATE TABLE remote_placeholder.outcomes (
259            attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
260            signal INTEGER, timeout BOOLEAN, metadata JSON, date DATE,
261            _source VARCHAR
262        );
263        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
264        CREATE VIEW remote_placeholder.invocations AS
265        SELECT
266            a.id,
267            a.session_id,
268            a.timestamp,
269            o.duration_ms,
270            a.cwd,
271            a.cmd,
272            a.executable,
273            o.exit_code,
274            CASE
275                WHEN o.attempt_id IS NULL THEN 'pending'
276                WHEN o.exit_code IS NULL THEN 'orphaned'
277                ELSE 'completed'
278            END AS status,
279            a.format_hint,
280            a.source_client AS client_id,
281            a.hostname,
282            a.tag,
283            o.signal,
284            o.timeout,
285            o.completed_at,
286            CASE
287                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
288                WHEN a.metadata IS NULL THEN o.metadata
289                WHEN o.metadata IS NULL THEN a.metadata
290                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
291            END AS metadata,
292            a.date,
293            a._source
294        FROM remote_placeholder.attempts a
295        LEFT JOIN remote_placeholder.outcomes o ON a.id = o.attempt_id;
296
297        CREATE TABLE remote_placeholder.outputs (
298            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
299            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
300            content_type VARCHAR, date DATE, _source VARCHAR
301        );
302        CREATE TABLE remote_placeholder.events (
303            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
304            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
305            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
306            status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
307        );
308        "#,
309    )?;
310
311    Ok(())
312}
313
314/// Create union schemas that combine data from multiple sources.
315/// Initially these just reference placeholders; they get rebuilt when remotes are added.
316///
317/// V5 schema: Includes attempts and outcomes union views.
318fn create_union_schemas(conn: &duckdb::Connection) -> Result<()> {
319    // caches = union of all cached_* schemas (initially just placeholder)
320    conn.execute_batch(
321        r#"
322        CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
323        CREATE OR REPLACE VIEW caches.attempts AS SELECT * FROM cached_placeholder.attempts;
324        CREATE OR REPLACE VIEW caches.outcomes AS SELECT * FROM cached_placeholder.outcomes;
325        CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
326        CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
327        CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
328        "#,
329    )?;
330
331    // remotes = union of all remote_* schemas (initially just placeholder)
332    conn.execute_batch(
333        r#"
334        CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
335        CREATE OR REPLACE VIEW remotes.attempts AS SELECT * FROM remote_placeholder.attempts;
336        CREATE OR REPLACE VIEW remotes.outcomes AS SELECT * FROM remote_placeholder.outcomes;
337        CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
338        CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
339        CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
340        "#,
341    )?;
342
343    // main = local + caches (all data we own)
344    // V5: attempts and outcomes are base tables, invocations is derived VIEW
345    conn.execute_batch(
346        r#"
347        CREATE OR REPLACE VIEW main.sessions AS
348            SELECT *, 'local' as _source FROM local.sessions
349            UNION ALL BY NAME SELECT * FROM caches.sessions;
350        CREATE OR REPLACE VIEW main.attempts AS
351            SELECT *, 'local' as _source FROM local.attempts
352            UNION ALL BY NAME SELECT * FROM caches.attempts;
353        CREATE OR REPLACE VIEW main.outcomes AS
354            SELECT *, 'local' as _source FROM local.outcomes
355            UNION ALL BY NAME SELECT * FROM caches.outcomes;
356        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
357        CREATE OR REPLACE VIEW main.invocations AS
358        SELECT
359            a.id,
360            a.session_id,
361            a.timestamp,
362            o.duration_ms,
363            a.cwd,
364            a.cmd,
365            a.executable,
366            o.exit_code,
367            CASE
368                WHEN o.attempt_id IS NULL THEN 'pending'
369                WHEN o.exit_code IS NULL THEN 'orphaned'
370                ELSE 'completed'
371            END AS status,
372            a.format_hint,
373            a.source_client AS client_id,
374            a.hostname,
375            a.tag,
376            o.signal,
377            o.timeout,
378            o.completed_at,
379            CASE
380                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
381                WHEN a.metadata IS NULL THEN o.metadata
382                WHEN o.metadata IS NULL THEN a.metadata
383                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
384            END AS metadata,
385            a.date,
386            a._source
387        FROM main.attempts a
388        LEFT JOIN main.outcomes o ON a.id = o.attempt_id;
389
390        CREATE OR REPLACE VIEW main.outputs AS
391            SELECT *, 'local' as _source FROM local.outputs
392            UNION ALL BY NAME SELECT * FROM caches.outputs;
393        CREATE OR REPLACE VIEW main.events AS
394            SELECT *, 'local' as _source FROM local.events
395            UNION ALL BY NAME SELECT * FROM caches.events;
396        "#,
397    )?;
398
399    // unified = main + remotes (everything)
400    // V5: attempts and outcomes are base tables, invocations is derived VIEW
401    conn.execute_batch(
402        r#"
403        CREATE OR REPLACE VIEW unified.sessions AS
404            SELECT * FROM main.sessions
405            UNION ALL BY NAME SELECT * FROM remotes.sessions;
406        CREATE OR REPLACE VIEW unified.attempts AS
407            SELECT * FROM main.attempts
408            UNION ALL BY NAME SELECT * FROM remotes.attempts;
409        CREATE OR REPLACE VIEW unified.outcomes AS
410            SELECT * FROM main.outcomes
411            UNION ALL BY NAME SELECT * FROM remotes.outcomes;
412        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
413        CREATE OR REPLACE VIEW unified.invocations AS
414        SELECT
415            a.id,
416            a.session_id,
417            a.timestamp,
418            o.duration_ms,
419            a.cwd,
420            a.cmd,
421            a.executable,
422            o.exit_code,
423            CASE
424                WHEN o.attempt_id IS NULL THEN 'pending'
425                WHEN o.exit_code IS NULL THEN 'orphaned'
426                ELSE 'completed'
427            END AS status,
428            a.format_hint,
429            a.source_client AS client_id,
430            a.hostname,
431            a.tag,
432            o.signal,
433            o.timeout,
434            o.completed_at,
435            CASE
436                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
437                WHEN a.metadata IS NULL THEN o.metadata
438                WHEN o.metadata IS NULL THEN a.metadata
439                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
440            END AS metadata,
441            a.date,
442            a._source
443        FROM unified.attempts a
444        LEFT JOIN unified.outcomes o ON a.id = o.attempt_id;
445
446        CREATE OR REPLACE VIEW unified.outputs AS
447            SELECT * FROM main.outputs
448            UNION ALL BY NAME SELECT * FROM remotes.outputs;
449        CREATE OR REPLACE VIEW unified.events AS
450            SELECT * FROM main.events
451            UNION ALL BY NAME SELECT * FROM remotes.events;
452        "#,
453    )?;
454
455    // unified.qualified_* views - deduplicated with source list
456    conn.execute_batch(
457        r#"
458        CREATE OR REPLACE VIEW unified.qualified_sessions AS
459            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
460            FROM unified.sessions
461            GROUP BY ALL;
462        CREATE OR REPLACE VIEW unified.qualified_attempts AS
463            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
464            FROM unified.attempts
465            GROUP BY ALL;
466        CREATE OR REPLACE VIEW unified.qualified_outcomes AS
467            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
468            FROM unified.outcomes
469            GROUP BY ALL;
470        CREATE OR REPLACE VIEW unified.qualified_invocations AS
471            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
472            FROM unified.invocations
473            GROUP BY ALL;
474        CREATE OR REPLACE VIEW unified.qualified_outputs AS
475            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
476            FROM unified.outputs
477            GROUP BY ALL;
478        CREATE OR REPLACE VIEW unified.qualified_events AS
479            SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
480            FROM unified.events
481            GROUP BY ALL;
482        "#,
483    )?;
484
485    Ok(())
486}
487
488/// Create local schema with views over Parquet files (for Parquet mode).
489///
490/// In parquet mode, local data is stored in parquet files.
491/// Views in the local schema read from these files.
492/// Uses `file_row_number = true` to handle empty directories gracefully.
493///
494/// V5 schema: Creates attempts/outcomes views and invocations as a derived VIEW.
495fn create_local_parquet_views(conn: &duckdb::Connection) -> Result<()> {
496    // Note: We use UNION ALL with seed files to ensure views work even when
497    // main directories are empty. The seed files are in date=1970-01-01 and
498    // contain no data rows, just schema.
499    conn.execute_batch(
500        r#"
501        -- Sessions view: read from parquet files
502        CREATE OR REPLACE VIEW local.sessions AS
503        SELECT * EXCLUDE (filename, file_row_number)
504        FROM read_parquet(
505            'recent/sessions/**/*.parquet',
506            union_by_name = true,
507            hive_partitioning = true,
508            filename = true,
509            file_row_number = true
510        );
511
512        -- V5: Attempts view: read from parquet files
513        CREATE OR REPLACE VIEW local.attempts AS
514        SELECT * EXCLUDE (filename, file_row_number)
515        FROM read_parquet(
516            'recent/attempts/**/*.parquet',
517            union_by_name = true,
518            hive_partitioning = true,
519            filename = true,
520            file_row_number = true
521        );
522
523        -- V5: Outcomes view: read from parquet files
524        CREATE OR REPLACE VIEW local.outcomes AS
525        SELECT * EXCLUDE (filename, file_row_number)
526        FROM read_parquet(
527            'recent/outcomes/**/*.parquet',
528            union_by_name = true,
529            hive_partitioning = true,
530            filename = true,
531            file_row_number = true
532        );
533
534        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
535        CREATE OR REPLACE VIEW local.invocations AS
536        SELECT
537            a.id,
538            a.session_id,
539            a.timestamp,
540            o.duration_ms,
541            a.cwd,
542            a.cmd,
543            a.executable,
544            o.exit_code,
545            CASE
546                WHEN o.attempt_id IS NULL THEN 'pending'
547                WHEN o.exit_code IS NULL THEN 'orphaned'
548                ELSE 'completed'
549            END AS status,
550            a.format_hint,
551            a.source_client AS client_id,
552            a.hostname,
553            a.tag,
554            o.signal,
555            o.timeout,
556            o.completed_at,
557            CASE
558                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
559                WHEN a.metadata IS NULL THEN o.metadata
560                WHEN o.metadata IS NULL THEN a.metadata
561                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
562            END AS metadata,
563            a.date
564        FROM local.attempts a
565        LEFT JOIN local.outcomes o ON a.id = o.attempt_id;
566
567        -- Outputs view: read from parquet files
568        CREATE OR REPLACE VIEW local.outputs AS
569        SELECT * EXCLUDE (filename, file_row_number)
570        FROM read_parquet(
571            'recent/outputs/**/*.parquet',
572            union_by_name = true,
573            hive_partitioning = true,
574            filename = true,
575            file_row_number = true
576        );
577
578        -- Events view: read from parquet files
579        CREATE OR REPLACE VIEW local.events AS
580        SELECT * EXCLUDE (filename, file_row_number)
581        FROM read_parquet(
582            'recent/events/**/*.parquet',
583            union_by_name = true,
584            hive_partitioning = true,
585            filename = true,
586            file_row_number = true
587        );
588        "#,
589    )?;
590    Ok(())
591}
592
593/// Create local schema with tables for direct storage (for DuckDB mode).
594///
595/// Creates v5 schema with attempts/outcomes tables and invocations VIEW.
596fn create_local_tables(conn: &duckdb::Connection) -> Result<()> {
597    conn.execute_batch(
598        r#"
599        -- Sessions table
600        CREATE TABLE IF NOT EXISTS local.sessions (
601            session_id VARCHAR,
602            client_id VARCHAR,
603            invoker VARCHAR,
604            invoker_pid INTEGER,
605            invoker_type VARCHAR,
606            registered_at TIMESTAMP,
607            cwd VARCHAR,
608            date DATE
609        );
610
611        -- V5: Attempts table (invocation start)
612        CREATE TABLE IF NOT EXISTS local.attempts (
613            id UUID PRIMARY KEY,
614            timestamp TIMESTAMP NOT NULL,
615            cmd VARCHAR NOT NULL,
616            cwd VARCHAR NOT NULL,
617            session_id VARCHAR NOT NULL,
618            tag VARCHAR,
619            source_client VARCHAR NOT NULL,
620            machine_id VARCHAR,
621            hostname VARCHAR,
622            executable VARCHAR,
623            format_hint VARCHAR,
624            metadata MAP(VARCHAR, JSON),
625            date DATE NOT NULL
626        );
627
628        -- V5: Outcomes table (invocation end)
629        CREATE TABLE IF NOT EXISTS local.outcomes (
630            attempt_id UUID PRIMARY KEY,
631            completed_at TIMESTAMP NOT NULL,
632            exit_code INTEGER,
633            duration_ms BIGINT,
634            signal INTEGER,
635            timeout BOOLEAN DEFAULT FALSE,
636            metadata MAP(VARCHAR, JSON),
637            date DATE NOT NULL
638        );
639
640        -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
641        CREATE OR REPLACE VIEW local.invocations AS
642        SELECT
643            a.id,
644            a.session_id,
645            a.timestamp,
646            o.duration_ms,
647            a.cwd,
648            a.cmd,
649            a.executable,
650            o.exit_code,
651            CASE
652                WHEN o.attempt_id IS NULL THEN 'pending'
653                WHEN o.exit_code IS NULL THEN 'orphaned'
654                ELSE 'completed'
655            END AS status,
656            a.format_hint,
657            a.source_client AS client_id,
658            a.hostname,
659            a.tag,
660            o.signal,
661            o.timeout,
662            o.completed_at,
663            CASE
664                WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
665                WHEN a.metadata IS NULL THEN o.metadata
666                WHEN o.metadata IS NULL THEN a.metadata
667                ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
668            END AS metadata,
669            a.date
670        FROM local.attempts a
671        LEFT JOIN local.outcomes o ON a.id = o.attempt_id;
672
673        -- Outputs table
674        CREATE TABLE IF NOT EXISTS local.outputs (
675            id UUID,
676            invocation_id UUID,
677            stream VARCHAR,
678            content_hash VARCHAR,
679            byte_length BIGINT,
680            storage_type VARCHAR,
681            storage_ref VARCHAR,
682            content_type VARCHAR,
683            date DATE
684        );
685
686        -- Events table
687        CREATE TABLE IF NOT EXISTS local.events (
688            id UUID,
689            invocation_id UUID,
690            client_id VARCHAR,
691            hostname VARCHAR,
692            event_type VARCHAR,
693            severity VARCHAR,
694            ref_file VARCHAR,
695            ref_line INTEGER,
696            ref_column INTEGER,
697            message VARCHAR,
698            error_code VARCHAR,
699            test_name VARCHAR,
700            status VARCHAR,
701            format_used VARCHAR,
702            date DATE
703        );
704        "#,
705    )?;
706    Ok(())
707}
708
709/// Create helper views in main schema.
710fn create_helper_views(conn: &duckdb::Connection) -> Result<()> {
711    conn.execute_batch(
712        r#"
713        -- Recent invocations helper view
714        CREATE OR REPLACE VIEW main.recent_invocations AS
715        SELECT *
716        FROM main.invocations
717        WHERE date >= CURRENT_DATE - INTERVAL '7 days'
718        ORDER BY timestamp DESC;
719
720        -- Invocations today helper view
721        CREATE OR REPLACE VIEW main.invocations_today AS
722        SELECT *
723        FROM main.invocations
724        WHERE date = CURRENT_DATE
725        ORDER BY timestamp DESC;
726
727        -- Failed invocations helper view
728        CREATE OR REPLACE VIEW main.failed_invocations AS
729        SELECT *
730        FROM main.invocations
731        WHERE exit_code != 0
732        ORDER BY timestamp DESC;
733
734        -- Invocations with outputs (joined view)
735        CREATE OR REPLACE VIEW main.invocations_with_outputs AS
736        SELECT
737            i.*,
738            o.id as output_id,
739            o.stream,
740            o.byte_length,
741            o.storage_type,
742            o.storage_ref
743        FROM main.invocations i
744        LEFT JOIN main.outputs o ON i.id = o.invocation_id;
745
746        -- Clients view (derived from sessions)
747        CREATE OR REPLACE VIEW main.clients AS
748        SELECT
749            client_id,
750            MIN(registered_at) as first_seen,
751            MAX(registered_at) as last_seen,
752            COUNT(DISTINCT session_id) as session_count
753        FROM main.sessions
754        GROUP BY client_id;
755
756        -- Events with invocation context (joined view)
757        CREATE OR REPLACE VIEW main.events_with_context AS
758        SELECT
759            e.*,
760            i.cmd,
761            i.timestamp,
762            i.cwd,
763            i.exit_code
764        FROM main.events e
765        JOIN main.invocations i ON e.invocation_id = i.id;
766        "#,
767    )?;
768    Ok(())
769}
770
771/// Create cwd schema views filtered to current working directory.
772/// These views are dynamically regenerated when the connection opens.
773/// Note: Initial creation uses a placeholder; actual filtering happens at connection time.
774/// Create the bird_meta table for schema versioning (v5).
775fn create_bird_meta(conn: &duckdb::Connection) -> Result<()> {
776    conn.execute_batch(
777        r#"
778        CREATE TABLE IF NOT EXISTS bird_meta (
779            key               VARCHAR PRIMARY KEY,
780            value             VARCHAR NOT NULL,
781            updated_at        TIMESTAMP DEFAULT (now())
782        );
783
784        -- Insert schema version
785        INSERT INTO bird_meta (key, value, updated_at) VALUES ('schema_version', '5', now())
786        ON CONFLICT (key) DO UPDATE SET value = '5', updated_at = now();
787        "#,
788    )?;
789    Ok(())
790}
791
792fn create_cwd_views(conn: &duckdb::Connection) -> Result<()> {
793    // cwd views filter main data to entries where cwd starts with current directory
794    // The actual current directory is set via a variable at connection time
795    conn.execute_batch(
796        r#"
797        -- Placeholder views - these get rebuilt with actual cwd at connection time
798        CREATE OR REPLACE VIEW cwd.sessions AS
799        SELECT * FROM main.sessions WHERE false;
800        CREATE OR REPLACE VIEW cwd.invocations AS
801        SELECT * FROM main.invocations WHERE false;
802        CREATE OR REPLACE VIEW cwd.outputs AS
803        SELECT * FROM main.outputs WHERE false;
804        CREATE OR REPLACE VIEW cwd.events AS
805        SELECT * FROM main.events WHERE false;
806        "#,
807    )?;
808    Ok(())
809}
810
811/// Ensure a DuckDB extension is loaded, installing if necessary.
812///
813/// Attempts in order:
814/// 1. LOAD (extension might already be available)
815/// 2. INSTALL from default repository, then LOAD
816/// 3. INSTALL FROM community, then LOAD
817///
818/// Includes retry logic to handle race conditions when multiple processes
819/// try to install extensions concurrently.
820fn ensure_extension(conn: &duckdb::Connection, name: &str) -> Result<bool> {
821    // Retry up to 3 times to handle concurrent installation races
822    for attempt in 0..3 {
823        // Try loading directly first (already installed/cached)
824        if conn.execute(&format!("LOAD {}", name), []).is_ok() {
825            return Ok(true);
826        }
827
828        // Try installing from default repository
829        if conn.execute(&format!("INSTALL {}", name), []).is_ok()
830            && conn.execute(&format!("LOAD {}", name), []).is_ok()
831        {
832            return Ok(true);
833        }
834
835        // Try installing from community repository
836        if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
837            && conn.execute(&format!("LOAD {}", name), []).is_ok()
838        {
839            return Ok(true);
840        }
841
842        // If not the last attempt, wait a bit before retrying
843        if attempt < 2 {
844            std::thread::sleep(std::time::Duration::from_millis(100 * (attempt as u64 + 1)));
845        }
846    }
847
848    Ok(false)
849}
850
851/// Install and load all required extensions during initialization.
852/// This pre-populates the extension cache so connect() is fast.
853fn install_extensions(conn: &duckdb::Connection) -> Result<()> {
854    // Required extensions - fail if not available
855    for name in ["parquet", "icu", "httpfs", "json"] {
856        if !ensure_extension(conn, name)? {
857            return Err(Error::Config(format!(
858                "Required extension '{}' could not be installed",
859                name
860            )));
861        }
862    }
863
864    // Optional community extensions - warn if not available
865    for (name, desc) in [
866        ("scalarfs", "data: URL support for inline blobs"),
867        ("duck_hunt", "log/output parsing for event extraction"),
868    ] {
869        if !ensure_extension(conn, name)? {
870            eprintln!("Warning: {} extension not available ({})", name, desc);
871        }
872    }
873
874    Ok(())
875}
876
877
878/// Create the blob_registry table for tracking deduplicated blobs.
879fn create_blob_registry(conn: &duckdb::Connection) -> Result<()> {
880    conn.execute_batch(
881        r#"
882        CREATE TABLE IF NOT EXISTS blob_registry (
883            content_hash  VARCHAR PRIMARY KEY,  -- BLAKE3 hash
884            byte_length   BIGINT NOT NULL,      -- Original uncompressed size
885            ref_count     INTEGER DEFAULT 1,    -- Number of outputs referencing this blob
886            first_seen    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
887            last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
888            storage_path  VARCHAR NOT NULL      -- Relative path to blob file
889        );
890        "#,
891    )?;
892    Ok(())
893}
894
895/// Create seed parquet files with correct schema but no rows.
896///
897/// V5 schema: Creates seed files for attempts and outcomes (no invocations seed needed
898/// since invocations is now a VIEW).
899fn create_seed_files(conn: &duckdb::Connection, config: &Config) -> Result<()> {
900    // V5: Create attempts seed
901    let attempts_seed_dir = config.recent_dir().join("attempts").join("date=1970-01-01");
902    fs::create_dir_all(&attempts_seed_dir)?;
903
904    let attempts_seed_path = attempts_seed_dir.join("_seed.parquet");
905    conn.execute_batch(&format!(
906        r#"
907        COPY (
908            SELECT
909                NULL::UUID as id,
910                NULL::TIMESTAMP as timestamp,
911                NULL::VARCHAR as cmd,
912                NULL::VARCHAR as cwd,
913                NULL::VARCHAR as session_id,
914                NULL::VARCHAR as tag,
915                NULL::VARCHAR as source_client,
916                NULL::VARCHAR as machine_id,
917                NULL::VARCHAR as hostname,
918                NULL::VARCHAR as executable,
919                NULL::VARCHAR as format_hint,
920                NULL::MAP(VARCHAR, JSON) as metadata,
921                NULL::DATE as date
922            WHERE false
923        ) TO '{}' (FORMAT PARQUET);
924        "#,
925        attempts_seed_path.display()
926    ))?;
927
928    // V5: Create outcomes seed
929    let outcomes_seed_dir = config.recent_dir().join("outcomes").join("date=1970-01-01");
930    fs::create_dir_all(&outcomes_seed_dir)?;
931
932    let outcomes_seed_path = outcomes_seed_dir.join("_seed.parquet");
933    conn.execute_batch(&format!(
934        r#"
935        COPY (
936            SELECT
937                NULL::UUID as attempt_id,
938                NULL::TIMESTAMP as completed_at,
939                NULL::INTEGER as exit_code,
940                NULL::BIGINT as duration_ms,
941                NULL::INTEGER as signal,
942                NULL::BOOLEAN as timeout,
943                NULL::MAP(VARCHAR, JSON) as metadata,
944                NULL::DATE as date
945            WHERE false
946        ) TO '{}' (FORMAT PARQUET);
947        "#,
948        outcomes_seed_path.display()
949    ))?;
950
951    // Create outputs seed
952    let outputs_seed_dir = config.recent_dir().join("outputs").join("date=1970-01-01");
953    fs::create_dir_all(&outputs_seed_dir)?;
954
955    let outputs_seed_path = outputs_seed_dir.join("_seed.parquet");
956    conn.execute_batch(&format!(
957        r#"
958        COPY (
959            SELECT
960                NULL::UUID as id,
961                NULL::UUID as invocation_id,
962                NULL::VARCHAR as stream,
963                NULL::VARCHAR as content_hash,
964                NULL::BIGINT as byte_length,
965                NULL::VARCHAR as storage_type,
966                NULL::VARCHAR as storage_ref,
967                NULL::VARCHAR as content_type,
968                NULL::DATE as date
969            WHERE false
970        ) TO '{}' (FORMAT PARQUET);
971        "#,
972        outputs_seed_path.display()
973    ))?;
974
975    // Create sessions seed
976    let sessions_seed_dir = config.recent_dir().join("sessions").join("date=1970-01-01");
977    fs::create_dir_all(&sessions_seed_dir)?;
978
979    let sessions_seed_path = sessions_seed_dir.join("_seed.parquet");
980    conn.execute_batch(&format!(
981        r#"
982        COPY (
983            SELECT
984                NULL::VARCHAR as session_id,
985                NULL::VARCHAR as client_id,
986                NULL::VARCHAR as invoker,
987                NULL::INTEGER as invoker_pid,
988                NULL::VARCHAR as invoker_type,
989                NULL::TIMESTAMP as registered_at,
990                NULL::VARCHAR as cwd,
991                NULL::DATE as date
992            WHERE false
993        ) TO '{}' (FORMAT PARQUET);
994        "#,
995        sessions_seed_path.display()
996    ))?;
997
998    // Create events seed
999    let events_seed_dir = config.recent_dir().join("events").join("date=1970-01-01");
1000    fs::create_dir_all(&events_seed_dir)?;
1001
1002    let events_seed_path = events_seed_dir.join("_seed.parquet");
1003    conn.execute_batch(&format!(
1004        r#"
1005        COPY (
1006            SELECT
1007                NULL::UUID as id,
1008                NULL::UUID as invocation_id,
1009                NULL::VARCHAR as client_id,
1010                NULL::VARCHAR as hostname,
1011                NULL::VARCHAR as event_type,
1012                NULL::VARCHAR as severity,
1013                NULL::VARCHAR as ref_file,
1014                NULL::INTEGER as ref_line,
1015                NULL::INTEGER as ref_column,
1016                NULL::VARCHAR as message,
1017                NULL::VARCHAR as error_code,
1018                NULL::VARCHAR as test_name,
1019                NULL::VARCHAR as status,
1020                NULL::VARCHAR as format_used,
1021                NULL::DATE as date
1022            WHERE false
1023        ) TO '{}' (FORMAT PARQUET);
1024        "#,
1025        events_seed_path.display()
1026    ))?;
1027
1028    Ok(())
1029}
1030
1031/// Create the default event-formats.toml configuration file.
1032fn create_event_formats_config(config: &Config) -> Result<()> {
1033    let path = config.event_formats_path();
1034    if !path.exists() {
1035        fs::write(&path, DEFAULT_EVENT_FORMATS_CONFIG)?;
1036    }
1037    Ok(())
1038}
1039
1040/// Default content for event-formats.toml.
1041pub const DEFAULT_EVENT_FORMATS_CONFIG: &str = r#"# Event format detection rules for duck_hunt
1042# Patterns are glob-matched against the command string
1043# First matching rule wins; use 'auto' for duck_hunt's built-in detection
1044
1045# C/C++ compilers
1046[[rules]]
1047pattern = "*gcc*"
1048format = "gcc"
1049
1050[[rules]]
1051pattern = "*g++*"
1052format = "gcc"
1053
1054[[rules]]
1055pattern = "*clang*"
1056format = "gcc"
1057
1058[[rules]]
1059pattern = "*clang++*"
1060format = "gcc"
1061
1062# Rust
1063[[rules]]
1064pattern = "*cargo build*"
1065format = "cargo_build"
1066
1067[[rules]]
1068pattern = "*cargo test*"
1069format = "cargo_test_json"
1070
1071[[rules]]
1072pattern = "*cargo check*"
1073format = "cargo_build"
1074
1075[[rules]]
1076pattern = "*rustc*"
1077format = "rustc"
1078
1079# Python
1080[[rules]]
1081pattern = "*pytest*"
1082format = "pytest_text"
1083
1084[[rules]]
1085pattern = "*python*-m*pytest*"
1086format = "pytest_text"
1087
1088[[rules]]
1089pattern = "*mypy*"
1090format = "mypy"
1091
1092[[rules]]
1093pattern = "*flake8*"
1094format = "flake8"
1095
1096[[rules]]
1097pattern = "*pylint*"
1098format = "pylint"
1099
1100# JavaScript/TypeScript
1101[[rules]]
1102pattern = "*eslint*"
1103format = "eslint"
1104
1105[[rules]]
1106pattern = "*tsc*"
1107format = "typescript"
1108
1109[[rules]]
1110pattern = "*jest*"
1111format = "jest"
1112
1113# Build systems
1114[[rules]]
1115pattern = "*make*"
1116format = "make_error"
1117
1118[[rules]]
1119pattern = "*cmake*"
1120format = "cmake"
1121
1122[[rules]]
1123pattern = "*ninja*"
1124format = "ninja"
1125
1126# Go
1127[[rules]]
1128pattern = "*go build*"
1129format = "go_build"
1130
1131[[rules]]
1132pattern = "*go test*"
1133format = "go_test"
1134
1135# Default: use duck_hunt's auto-detection
1136[default]
1137format = "auto"
1138"#;
1139
1140/// Check if BIRD is initialized at the given location.
1141pub fn is_initialized(config: &Config) -> bool {
1142    config.db_path().exists()
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use tempfile::TempDir;
1149
1150    #[test]
1151    fn test_initialize_creates_structure() {
1152        let tmp = TempDir::new().unwrap();
1153        let config = Config::with_root(tmp.path());
1154
1155        initialize(&config).unwrap();
1156
1157        // Check directories exist
1158        assert!(config.db_path().exists());
1159        // V5 schema: attempts and outcomes directories instead of invocations
1160        assert!(config.recent_dir().join("attempts").exists());
1161        assert!(config.recent_dir().join("outcomes").exists());
1162        assert!(config.recent_dir().join("outputs").exists());
1163        assert!(config.recent_dir().join("sessions").exists());
1164        assert!(config.blobs_dir().exists());
1165        assert!(config.extensions_dir().exists());
1166        assert!(config.sql_dir().exists());
1167        assert!(config.bird_root.join("config.toml").exists());
1168    }
1169
1170    #[test]
1171    fn test_initialize_twice_fails() {
1172        let tmp = TempDir::new().unwrap();
1173        let config = Config::with_root(tmp.path());
1174
1175        initialize(&config).unwrap();
1176
1177        // Second init should fail
1178        let result = initialize(&config);
1179        assert!(matches!(result, Err(Error::AlreadyInitialized(_))));
1180    }
1181
1182    #[test]
1183    fn test_is_initialized() {
1184        let tmp = TempDir::new().unwrap();
1185        let config = Config::with_root(tmp.path());
1186
1187        assert!(!is_initialized(&config));
1188        initialize(&config).unwrap();
1189        assert!(is_initialized(&config));
1190    }
1191}