1use std::fs;
30
31use crate::config::StorageMode;
32use crate::{Config, Error, Result};
33
34pub fn initialize(config: &Config) -> Result<()> {
39 let bird_root = &config.bird_root;
40
41 if config.db_path().exists() {
43 return Err(Error::AlreadyInitialized(bird_root.clone()));
44 }
45
46 create_directories(config)?;
48
49 init_database(config)?;
51
52 config.save()?;
54
55 create_event_formats_config(config)?;
57
58 Ok(())
59}
60
61fn create_directories(config: &Config) -> Result<()> {
63 let mut dirs = vec![
65 config.bird_root.join("db"),
66 config.blobs_dir(), config.archive_dir().join("blobs/content"),
68 config.extensions_dir(),
69 config.sql_dir(),
70 ];
71
72 if config.storage_mode == StorageMode::Parquet {
74 dirs.extend([
75 config.recent_dir().join("attempts"),
77 config.recent_dir().join("outcomes"),
78 config.recent_dir().join("outputs"),
80 config.recent_dir().join("sessions"),
81 config.recent_dir().join("events"),
82 ]);
83 }
84
85 for dir in &dirs {
86 fs::create_dir_all(dir)?;
87 }
88
89 Ok(())
90}
91
92fn init_database(config: &Config) -> Result<()> {
94 let conn = duckdb::Connection::open(config.db_path())?;
95
96 conn.execute("SET allow_community_extensions = true", [])?;
98
99 install_extensions(&conn)?;
102
103 let data_dir = config.data_dir();
105 conn.execute(
106 &format!("SET file_search_path = '{}'", data_dir.display()),
107 [],
108 )?;
109
110 create_core_schemas(&conn)?;
112
113 create_bird_meta(&conn)?;
115
116 create_blob_registry(&conn)?;
118
119 match config.storage_mode {
121 StorageMode::Parquet => {
122 create_seed_files(&conn, config)?;
124 create_local_parquet_views(&conn)?;
126 }
127 StorageMode::DuckDB => {
128 create_local_tables(&conn)?;
130 }
131 }
132
133 create_placeholder_schemas(&conn)?;
135
136 create_union_schemas(&conn)?;
138
139 create_helper_views(&conn)?;
141
142 create_cwd_views(&conn)?;
144
145 Ok(())
146}
147
148fn create_core_schemas(conn: &duckdb::Connection) -> Result<()> {
150 conn.execute_batch(
151 r#"
152 -- Data schemas
153 CREATE SCHEMA IF NOT EXISTS local;
154 CREATE SCHEMA IF NOT EXISTS cached_placeholder;
155 CREATE SCHEMA IF NOT EXISTS remote_placeholder;
156
157 -- Union schemas
158 CREATE SCHEMA IF NOT EXISTS caches;
159 CREATE SCHEMA IF NOT EXISTS remotes;
160 -- main already exists as default schema
161 CREATE SCHEMA IF NOT EXISTS unified;
162 CREATE SCHEMA IF NOT EXISTS cwd;
163 "#,
164 )?;
165 Ok(())
166}
167
168fn create_placeholder_schemas(conn: &duckdb::Connection) -> Result<()> {
173 conn.execute_batch(
175 r#"
176 CREATE TABLE cached_placeholder.sessions (
177 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
178 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
179 _source VARCHAR
180 );
181 -- V5: Attempts table (invocation start)
182 CREATE TABLE cached_placeholder.attempts (
183 id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
184 tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
185 executable VARCHAR, format_hint VARCHAR, metadata JSON, date DATE,
186 _source VARCHAR
187 );
188 -- V5: Outcomes table (invocation end)
189 CREATE TABLE cached_placeholder.outcomes (
190 attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
191 signal INTEGER, timeout BOOLEAN, metadata JSON, date DATE,
192 _source VARCHAR
193 );
194 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
195 CREATE VIEW cached_placeholder.invocations AS
196 SELECT
197 a.id,
198 a.session_id,
199 a.timestamp,
200 o.duration_ms,
201 a.cwd,
202 a.cmd,
203 a.executable,
204 o.exit_code,
205 CASE
206 WHEN o.attempt_id IS NULL THEN 'pending'
207 WHEN o.exit_code IS NULL THEN 'orphaned'
208 ELSE 'completed'
209 END AS status,
210 a.format_hint,
211 a.source_client AS client_id,
212 a.hostname,
213 a.tag,
214 o.signal,
215 o.timeout,
216 o.completed_at,
217 CASE
218 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
219 WHEN a.metadata IS NULL THEN o.metadata
220 WHEN o.metadata IS NULL THEN a.metadata
221 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
222 END AS metadata,
223 a.date,
224 a._source
225 FROM cached_placeholder.attempts a
226 LEFT JOIN cached_placeholder.outcomes o ON a.id = o.attempt_id;
227
228 CREATE TABLE cached_placeholder.outputs (
229 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
230 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
231 content_type VARCHAR, date DATE, _source VARCHAR
232 );
233 CREATE TABLE cached_placeholder.events (
234 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
235 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
236 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
237 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
238 );
239 "#,
240 )?;
241
242 conn.execute_batch(
244 r#"
245 CREATE TABLE remote_placeholder.sessions (
246 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
247 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
248 _source VARCHAR
249 );
250 -- V5: Attempts table (invocation start)
251 CREATE TABLE remote_placeholder.attempts (
252 id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
253 tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
254 executable VARCHAR, format_hint VARCHAR, metadata JSON, date DATE,
255 _source VARCHAR
256 );
257 -- V5: Outcomes table (invocation end)
258 CREATE TABLE remote_placeholder.outcomes (
259 attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
260 signal INTEGER, timeout BOOLEAN, metadata JSON, date DATE,
261 _source VARCHAR
262 );
263 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
264 CREATE VIEW remote_placeholder.invocations AS
265 SELECT
266 a.id,
267 a.session_id,
268 a.timestamp,
269 o.duration_ms,
270 a.cwd,
271 a.cmd,
272 a.executable,
273 o.exit_code,
274 CASE
275 WHEN o.attempt_id IS NULL THEN 'pending'
276 WHEN o.exit_code IS NULL THEN 'orphaned'
277 ELSE 'completed'
278 END AS status,
279 a.format_hint,
280 a.source_client AS client_id,
281 a.hostname,
282 a.tag,
283 o.signal,
284 o.timeout,
285 o.completed_at,
286 CASE
287 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
288 WHEN a.metadata IS NULL THEN o.metadata
289 WHEN o.metadata IS NULL THEN a.metadata
290 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
291 END AS metadata,
292 a.date,
293 a._source
294 FROM remote_placeholder.attempts a
295 LEFT JOIN remote_placeholder.outcomes o ON a.id = o.attempt_id;
296
297 CREATE TABLE remote_placeholder.outputs (
298 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
299 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
300 content_type VARCHAR, date DATE, _source VARCHAR
301 );
302 CREATE TABLE remote_placeholder.events (
303 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
304 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
305 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
306 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
307 );
308 "#,
309 )?;
310
311 Ok(())
312}
313
314fn create_union_schemas(conn: &duckdb::Connection) -> Result<()> {
319 conn.execute_batch(
321 r#"
322 CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
323 CREATE OR REPLACE VIEW caches.attempts AS SELECT * FROM cached_placeholder.attempts;
324 CREATE OR REPLACE VIEW caches.outcomes AS SELECT * FROM cached_placeholder.outcomes;
325 CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
326 CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
327 CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
328 "#,
329 )?;
330
331 conn.execute_batch(
333 r#"
334 CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
335 CREATE OR REPLACE VIEW remotes.attempts AS SELECT * FROM remote_placeholder.attempts;
336 CREATE OR REPLACE VIEW remotes.outcomes AS SELECT * FROM remote_placeholder.outcomes;
337 CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
338 CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
339 CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
340 "#,
341 )?;
342
343 conn.execute_batch(
346 r#"
347 CREATE OR REPLACE VIEW main.sessions AS
348 SELECT *, 'local' as _source FROM local.sessions
349 UNION ALL BY NAME SELECT * FROM caches.sessions;
350 CREATE OR REPLACE VIEW main.attempts AS
351 SELECT *, 'local' as _source FROM local.attempts
352 UNION ALL BY NAME SELECT * FROM caches.attempts;
353 CREATE OR REPLACE VIEW main.outcomes AS
354 SELECT *, 'local' as _source FROM local.outcomes
355 UNION ALL BY NAME SELECT * FROM caches.outcomes;
356 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
357 CREATE OR REPLACE VIEW main.invocations AS
358 SELECT
359 a.id,
360 a.session_id,
361 a.timestamp,
362 o.duration_ms,
363 a.cwd,
364 a.cmd,
365 a.executable,
366 o.exit_code,
367 CASE
368 WHEN o.attempt_id IS NULL THEN 'pending'
369 WHEN o.exit_code IS NULL THEN 'orphaned'
370 ELSE 'completed'
371 END AS status,
372 a.format_hint,
373 a.source_client AS client_id,
374 a.hostname,
375 a.tag,
376 o.signal,
377 o.timeout,
378 o.completed_at,
379 CASE
380 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
381 WHEN a.metadata IS NULL THEN o.metadata
382 WHEN o.metadata IS NULL THEN a.metadata
383 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
384 END AS metadata,
385 a.date,
386 a._source
387 FROM main.attempts a
388 LEFT JOIN main.outcomes o ON a.id = o.attempt_id;
389
390 CREATE OR REPLACE VIEW main.outputs AS
391 SELECT *, 'local' as _source FROM local.outputs
392 UNION ALL BY NAME SELECT * FROM caches.outputs;
393 CREATE OR REPLACE VIEW main.events AS
394 SELECT *, 'local' as _source FROM local.events
395 UNION ALL BY NAME SELECT * FROM caches.events;
396 "#,
397 )?;
398
399 conn.execute_batch(
402 r#"
403 CREATE OR REPLACE VIEW unified.sessions AS
404 SELECT * FROM main.sessions
405 UNION ALL BY NAME SELECT * FROM remotes.sessions;
406 CREATE OR REPLACE VIEW unified.attempts AS
407 SELECT * FROM main.attempts
408 UNION ALL BY NAME SELECT * FROM remotes.attempts;
409 CREATE OR REPLACE VIEW unified.outcomes AS
410 SELECT * FROM main.outcomes
411 UNION ALL BY NAME SELECT * FROM remotes.outcomes;
412 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
413 CREATE OR REPLACE VIEW unified.invocations AS
414 SELECT
415 a.id,
416 a.session_id,
417 a.timestamp,
418 o.duration_ms,
419 a.cwd,
420 a.cmd,
421 a.executable,
422 o.exit_code,
423 CASE
424 WHEN o.attempt_id IS NULL THEN 'pending'
425 WHEN o.exit_code IS NULL THEN 'orphaned'
426 ELSE 'completed'
427 END AS status,
428 a.format_hint,
429 a.source_client AS client_id,
430 a.hostname,
431 a.tag,
432 o.signal,
433 o.timeout,
434 o.completed_at,
435 CASE
436 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
437 WHEN a.metadata IS NULL THEN o.metadata
438 WHEN o.metadata IS NULL THEN a.metadata
439 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
440 END AS metadata,
441 a.date,
442 a._source
443 FROM unified.attempts a
444 LEFT JOIN unified.outcomes o ON a.id = o.attempt_id;
445
446 CREATE OR REPLACE VIEW unified.outputs AS
447 SELECT * FROM main.outputs
448 UNION ALL BY NAME SELECT * FROM remotes.outputs;
449 CREATE OR REPLACE VIEW unified.events AS
450 SELECT * FROM main.events
451 UNION ALL BY NAME SELECT * FROM remotes.events;
452 "#,
453 )?;
454
455 conn.execute_batch(
457 r#"
458 CREATE OR REPLACE VIEW unified.qualified_sessions AS
459 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
460 FROM unified.sessions
461 GROUP BY ALL;
462 CREATE OR REPLACE VIEW unified.qualified_attempts AS
463 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
464 FROM unified.attempts
465 GROUP BY ALL;
466 CREATE OR REPLACE VIEW unified.qualified_outcomes AS
467 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
468 FROM unified.outcomes
469 GROUP BY ALL;
470 CREATE OR REPLACE VIEW unified.qualified_invocations AS
471 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
472 FROM unified.invocations
473 GROUP BY ALL;
474 CREATE OR REPLACE VIEW unified.qualified_outputs AS
475 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
476 FROM unified.outputs
477 GROUP BY ALL;
478 CREATE OR REPLACE VIEW unified.qualified_events AS
479 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
480 FROM unified.events
481 GROUP BY ALL;
482 "#,
483 )?;
484
485 Ok(())
486}
487
488fn create_local_parquet_views(conn: &duckdb::Connection) -> Result<()> {
496 conn.execute_batch(
500 r#"
501 -- Sessions view: read from parquet files
502 CREATE OR REPLACE VIEW local.sessions AS
503 SELECT * EXCLUDE (filename, file_row_number)
504 FROM read_parquet(
505 'recent/sessions/**/*.parquet',
506 union_by_name = true,
507 hive_partitioning = true,
508 filename = true,
509 file_row_number = true
510 );
511
512 -- V5: Attempts view: read from parquet files
513 CREATE OR REPLACE VIEW local.attempts AS
514 SELECT * EXCLUDE (filename, file_row_number)
515 FROM read_parquet(
516 'recent/attempts/**/*.parquet',
517 union_by_name = true,
518 hive_partitioning = true,
519 filename = true,
520 file_row_number = true
521 );
522
523 -- V5: Outcomes view: read from parquet files
524 CREATE OR REPLACE VIEW local.outcomes AS
525 SELECT * EXCLUDE (filename, file_row_number)
526 FROM read_parquet(
527 'recent/outcomes/**/*.parquet',
528 union_by_name = true,
529 hive_partitioning = true,
530 filename = true,
531 file_row_number = true
532 );
533
534 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
535 CREATE OR REPLACE VIEW local.invocations AS
536 SELECT
537 a.id,
538 a.session_id,
539 a.timestamp,
540 o.duration_ms,
541 a.cwd,
542 a.cmd,
543 a.executable,
544 o.exit_code,
545 CASE
546 WHEN o.attempt_id IS NULL THEN 'pending'
547 WHEN o.exit_code IS NULL THEN 'orphaned'
548 ELSE 'completed'
549 END AS status,
550 a.format_hint,
551 a.source_client AS client_id,
552 a.hostname,
553 a.tag,
554 o.signal,
555 o.timeout,
556 o.completed_at,
557 CASE
558 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
559 WHEN a.metadata IS NULL THEN o.metadata
560 WHEN o.metadata IS NULL THEN a.metadata
561 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
562 END AS metadata,
563 a.date
564 FROM local.attempts a
565 LEFT JOIN local.outcomes o ON a.id = o.attempt_id;
566
567 -- Outputs view: read from parquet files
568 CREATE OR REPLACE VIEW local.outputs AS
569 SELECT * EXCLUDE (filename, file_row_number)
570 FROM read_parquet(
571 'recent/outputs/**/*.parquet',
572 union_by_name = true,
573 hive_partitioning = true,
574 filename = true,
575 file_row_number = true
576 );
577
578 -- Events view: read from parquet files
579 CREATE OR REPLACE VIEW local.events AS
580 SELECT * EXCLUDE (filename, file_row_number)
581 FROM read_parquet(
582 'recent/events/**/*.parquet',
583 union_by_name = true,
584 hive_partitioning = true,
585 filename = true,
586 file_row_number = true
587 );
588 "#,
589 )?;
590 Ok(())
591}
592
593fn create_local_tables(conn: &duckdb::Connection) -> Result<()> {
597 conn.execute_batch(
598 r#"
599 -- Sessions table
600 CREATE TABLE IF NOT EXISTS local.sessions (
601 session_id VARCHAR,
602 client_id VARCHAR,
603 invoker VARCHAR,
604 invoker_pid INTEGER,
605 invoker_type VARCHAR,
606 registered_at TIMESTAMP,
607 cwd VARCHAR,
608 date DATE
609 );
610
611 -- V5: Attempts table (invocation start)
612 CREATE TABLE IF NOT EXISTS local.attempts (
613 id UUID PRIMARY KEY,
614 timestamp TIMESTAMP NOT NULL,
615 cmd VARCHAR NOT NULL,
616 cwd VARCHAR NOT NULL,
617 session_id VARCHAR NOT NULL,
618 tag VARCHAR,
619 source_client VARCHAR NOT NULL,
620 machine_id VARCHAR,
621 hostname VARCHAR,
622 executable VARCHAR,
623 format_hint VARCHAR,
624 metadata MAP(VARCHAR, JSON),
625 date DATE NOT NULL
626 );
627
628 -- V5: Outcomes table (invocation end)
629 CREATE TABLE IF NOT EXISTS local.outcomes (
630 attempt_id UUID PRIMARY KEY,
631 completed_at TIMESTAMP NOT NULL,
632 exit_code INTEGER,
633 duration_ms BIGINT,
634 signal INTEGER,
635 timeout BOOLEAN DEFAULT FALSE,
636 metadata MAP(VARCHAR, JSON),
637 date DATE NOT NULL
638 );
639
640 -- V5: Invocations VIEW (attempts LEFT JOIN outcomes with derived status)
641 CREATE OR REPLACE VIEW local.invocations AS
642 SELECT
643 a.id,
644 a.session_id,
645 a.timestamp,
646 o.duration_ms,
647 a.cwd,
648 a.cmd,
649 a.executable,
650 o.exit_code,
651 CASE
652 WHEN o.attempt_id IS NULL THEN 'pending'
653 WHEN o.exit_code IS NULL THEN 'orphaned'
654 ELSE 'completed'
655 END AS status,
656 a.format_hint,
657 a.source_client AS client_id,
658 a.hostname,
659 a.tag,
660 o.signal,
661 o.timeout,
662 o.completed_at,
663 CASE
664 WHEN a.metadata IS NULL AND o.metadata IS NULL THEN NULL
665 WHEN a.metadata IS NULL THEN o.metadata
666 WHEN o.metadata IS NULL THEN a.metadata
667 ELSE map_concat(a.metadata::MAP(VARCHAR, JSON), o.metadata::MAP(VARCHAR, JSON))
668 END AS metadata,
669 a.date
670 FROM local.attempts a
671 LEFT JOIN local.outcomes o ON a.id = o.attempt_id;
672
673 -- Outputs table
674 CREATE TABLE IF NOT EXISTS local.outputs (
675 id UUID,
676 invocation_id UUID,
677 stream VARCHAR,
678 content_hash VARCHAR,
679 byte_length BIGINT,
680 storage_type VARCHAR,
681 storage_ref VARCHAR,
682 content_type VARCHAR,
683 date DATE
684 );
685
686 -- Events table
687 CREATE TABLE IF NOT EXISTS local.events (
688 id UUID,
689 invocation_id UUID,
690 client_id VARCHAR,
691 hostname VARCHAR,
692 event_type VARCHAR,
693 severity VARCHAR,
694 ref_file VARCHAR,
695 ref_line INTEGER,
696 ref_column INTEGER,
697 message VARCHAR,
698 error_code VARCHAR,
699 test_name VARCHAR,
700 status VARCHAR,
701 format_used VARCHAR,
702 date DATE
703 );
704 "#,
705 )?;
706 Ok(())
707}
708
709fn create_helper_views(conn: &duckdb::Connection) -> Result<()> {
711 conn.execute_batch(
712 r#"
713 -- Recent invocations helper view
714 CREATE OR REPLACE VIEW main.recent_invocations AS
715 SELECT *
716 FROM main.invocations
717 WHERE date >= CURRENT_DATE - INTERVAL '7 days'
718 ORDER BY timestamp DESC;
719
720 -- Invocations today helper view
721 CREATE OR REPLACE VIEW main.invocations_today AS
722 SELECT *
723 FROM main.invocations
724 WHERE date = CURRENT_DATE
725 ORDER BY timestamp DESC;
726
727 -- Failed invocations helper view
728 CREATE OR REPLACE VIEW main.failed_invocations AS
729 SELECT *
730 FROM main.invocations
731 WHERE exit_code != 0
732 ORDER BY timestamp DESC;
733
734 -- Invocations with outputs (joined view)
735 CREATE OR REPLACE VIEW main.invocations_with_outputs AS
736 SELECT
737 i.*,
738 o.id as output_id,
739 o.stream,
740 o.byte_length,
741 o.storage_type,
742 o.storage_ref
743 FROM main.invocations i
744 LEFT JOIN main.outputs o ON i.id = o.invocation_id;
745
746 -- Clients view (derived from sessions)
747 CREATE OR REPLACE VIEW main.clients AS
748 SELECT
749 client_id,
750 MIN(registered_at) as first_seen,
751 MAX(registered_at) as last_seen,
752 COUNT(DISTINCT session_id) as session_count
753 FROM main.sessions
754 GROUP BY client_id;
755
756 -- Events with invocation context (joined view)
757 CREATE OR REPLACE VIEW main.events_with_context AS
758 SELECT
759 e.*,
760 i.cmd,
761 i.timestamp,
762 i.cwd,
763 i.exit_code
764 FROM main.events e
765 JOIN main.invocations i ON e.invocation_id = i.id;
766 "#,
767 )?;
768 Ok(())
769}
770
771fn create_bird_meta(conn: &duckdb::Connection) -> Result<()> {
776 conn.execute_batch(
777 r#"
778 CREATE TABLE IF NOT EXISTS bird_meta (
779 key VARCHAR PRIMARY KEY,
780 value VARCHAR NOT NULL,
781 updated_at TIMESTAMP DEFAULT (now())
782 );
783
784 -- Insert schema version
785 INSERT INTO bird_meta (key, value, updated_at) VALUES ('schema_version', '5', now())
786 ON CONFLICT (key) DO UPDATE SET value = '5', updated_at = now();
787 "#,
788 )?;
789 Ok(())
790}
791
792fn create_cwd_views(conn: &duckdb::Connection) -> Result<()> {
793 conn.execute_batch(
796 r#"
797 -- Placeholder views - these get rebuilt with actual cwd at connection time
798 CREATE OR REPLACE VIEW cwd.sessions AS
799 SELECT * FROM main.sessions WHERE false;
800 CREATE OR REPLACE VIEW cwd.invocations AS
801 SELECT * FROM main.invocations WHERE false;
802 CREATE OR REPLACE VIEW cwd.outputs AS
803 SELECT * FROM main.outputs WHERE false;
804 CREATE OR REPLACE VIEW cwd.events AS
805 SELECT * FROM main.events WHERE false;
806 "#,
807 )?;
808 Ok(())
809}
810
811fn ensure_extension(conn: &duckdb::Connection, name: &str) -> Result<bool> {
821 for attempt in 0..3 {
823 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
825 return Ok(true);
826 }
827
828 if conn.execute(&format!("INSTALL {}", name), []).is_ok()
830 && conn.execute(&format!("LOAD {}", name), []).is_ok()
831 {
832 return Ok(true);
833 }
834
835 if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
837 && conn.execute(&format!("LOAD {}", name), []).is_ok()
838 {
839 return Ok(true);
840 }
841
842 if attempt < 2 {
844 std::thread::sleep(std::time::Duration::from_millis(100 * (attempt as u64 + 1)));
845 }
846 }
847
848 Ok(false)
849}
850
851fn install_extensions(conn: &duckdb::Connection) -> Result<()> {
854 for name in ["parquet", "icu", "httpfs", "json"] {
856 if !ensure_extension(conn, name)? {
857 return Err(Error::Config(format!(
858 "Required extension '{}' could not be installed",
859 name
860 )));
861 }
862 }
863
864 for (name, desc) in [
866 ("scalarfs", "data: URL support for inline blobs"),
867 ("duck_hunt", "log/output parsing for event extraction"),
868 ] {
869 if !ensure_extension(conn, name)? {
870 eprintln!("Warning: {} extension not available ({})", name, desc);
871 }
872 }
873
874 Ok(())
875}
876
877
878fn create_blob_registry(conn: &duckdb::Connection) -> Result<()> {
880 conn.execute_batch(
881 r#"
882 CREATE TABLE IF NOT EXISTS blob_registry (
883 content_hash VARCHAR PRIMARY KEY, -- BLAKE3 hash
884 byte_length BIGINT NOT NULL, -- Original uncompressed size
885 ref_count INTEGER DEFAULT 1, -- Number of outputs referencing this blob
886 first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
887 last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
888 storage_path VARCHAR NOT NULL -- Relative path to blob file
889 );
890 "#,
891 )?;
892 Ok(())
893}
894
895fn create_seed_files(conn: &duckdb::Connection, config: &Config) -> Result<()> {
900 let attempts_seed_dir = config.recent_dir().join("attempts").join("date=1970-01-01");
902 fs::create_dir_all(&attempts_seed_dir)?;
903
904 let attempts_seed_path = attempts_seed_dir.join("_seed.parquet");
905 conn.execute_batch(&format!(
906 r#"
907 COPY (
908 SELECT
909 NULL::UUID as id,
910 NULL::TIMESTAMP as timestamp,
911 NULL::VARCHAR as cmd,
912 NULL::VARCHAR as cwd,
913 NULL::VARCHAR as session_id,
914 NULL::VARCHAR as tag,
915 NULL::VARCHAR as source_client,
916 NULL::VARCHAR as machine_id,
917 NULL::VARCHAR as hostname,
918 NULL::VARCHAR as executable,
919 NULL::VARCHAR as format_hint,
920 NULL::MAP(VARCHAR, JSON) as metadata,
921 NULL::DATE as date
922 WHERE false
923 ) TO '{}' (FORMAT PARQUET);
924 "#,
925 attempts_seed_path.display()
926 ))?;
927
928 let outcomes_seed_dir = config.recent_dir().join("outcomes").join("date=1970-01-01");
930 fs::create_dir_all(&outcomes_seed_dir)?;
931
932 let outcomes_seed_path = outcomes_seed_dir.join("_seed.parquet");
933 conn.execute_batch(&format!(
934 r#"
935 COPY (
936 SELECT
937 NULL::UUID as attempt_id,
938 NULL::TIMESTAMP as completed_at,
939 NULL::INTEGER as exit_code,
940 NULL::BIGINT as duration_ms,
941 NULL::INTEGER as signal,
942 NULL::BOOLEAN as timeout,
943 NULL::MAP(VARCHAR, JSON) as metadata,
944 NULL::DATE as date
945 WHERE false
946 ) TO '{}' (FORMAT PARQUET);
947 "#,
948 outcomes_seed_path.display()
949 ))?;
950
951 let outputs_seed_dir = config.recent_dir().join("outputs").join("date=1970-01-01");
953 fs::create_dir_all(&outputs_seed_dir)?;
954
955 let outputs_seed_path = outputs_seed_dir.join("_seed.parquet");
956 conn.execute_batch(&format!(
957 r#"
958 COPY (
959 SELECT
960 NULL::UUID as id,
961 NULL::UUID as invocation_id,
962 NULL::VARCHAR as stream,
963 NULL::VARCHAR as content_hash,
964 NULL::BIGINT as byte_length,
965 NULL::VARCHAR as storage_type,
966 NULL::VARCHAR as storage_ref,
967 NULL::VARCHAR as content_type,
968 NULL::DATE as date
969 WHERE false
970 ) TO '{}' (FORMAT PARQUET);
971 "#,
972 outputs_seed_path.display()
973 ))?;
974
975 let sessions_seed_dir = config.recent_dir().join("sessions").join("date=1970-01-01");
977 fs::create_dir_all(&sessions_seed_dir)?;
978
979 let sessions_seed_path = sessions_seed_dir.join("_seed.parquet");
980 conn.execute_batch(&format!(
981 r#"
982 COPY (
983 SELECT
984 NULL::VARCHAR as session_id,
985 NULL::VARCHAR as client_id,
986 NULL::VARCHAR as invoker,
987 NULL::INTEGER as invoker_pid,
988 NULL::VARCHAR as invoker_type,
989 NULL::TIMESTAMP as registered_at,
990 NULL::VARCHAR as cwd,
991 NULL::DATE as date
992 WHERE false
993 ) TO '{}' (FORMAT PARQUET);
994 "#,
995 sessions_seed_path.display()
996 ))?;
997
998 let events_seed_dir = config.recent_dir().join("events").join("date=1970-01-01");
1000 fs::create_dir_all(&events_seed_dir)?;
1001
1002 let events_seed_path = events_seed_dir.join("_seed.parquet");
1003 conn.execute_batch(&format!(
1004 r#"
1005 COPY (
1006 SELECT
1007 NULL::UUID as id,
1008 NULL::UUID as invocation_id,
1009 NULL::VARCHAR as client_id,
1010 NULL::VARCHAR as hostname,
1011 NULL::VARCHAR as event_type,
1012 NULL::VARCHAR as severity,
1013 NULL::VARCHAR as ref_file,
1014 NULL::INTEGER as ref_line,
1015 NULL::INTEGER as ref_column,
1016 NULL::VARCHAR as message,
1017 NULL::VARCHAR as error_code,
1018 NULL::VARCHAR as test_name,
1019 NULL::VARCHAR as status,
1020 NULL::VARCHAR as format_used,
1021 NULL::DATE as date
1022 WHERE false
1023 ) TO '{}' (FORMAT PARQUET);
1024 "#,
1025 events_seed_path.display()
1026 ))?;
1027
1028 Ok(())
1029}
1030
1031fn create_event_formats_config(config: &Config) -> Result<()> {
1033 let path = config.event_formats_path();
1034 if !path.exists() {
1035 fs::write(&path, DEFAULT_EVENT_FORMATS_CONFIG)?;
1036 }
1037 Ok(())
1038}
1039
1040pub const DEFAULT_EVENT_FORMATS_CONFIG: &str = r#"# Event format detection rules for duck_hunt
1042# Patterns are glob-matched against the command string
1043# First matching rule wins; use 'auto' for duck_hunt's built-in detection
1044
1045# C/C++ compilers
1046[[rules]]
1047pattern = "*gcc*"
1048format = "gcc"
1049
1050[[rules]]
1051pattern = "*g++*"
1052format = "gcc"
1053
1054[[rules]]
1055pattern = "*clang*"
1056format = "gcc"
1057
1058[[rules]]
1059pattern = "*clang++*"
1060format = "gcc"
1061
1062# Rust
1063[[rules]]
1064pattern = "*cargo build*"
1065format = "cargo_build"
1066
1067[[rules]]
1068pattern = "*cargo test*"
1069format = "cargo_test_json"
1070
1071[[rules]]
1072pattern = "*cargo check*"
1073format = "cargo_build"
1074
1075[[rules]]
1076pattern = "*rustc*"
1077format = "rustc"
1078
1079# Python
1080[[rules]]
1081pattern = "*pytest*"
1082format = "pytest_text"
1083
1084[[rules]]
1085pattern = "*python*-m*pytest*"
1086format = "pytest_text"
1087
1088[[rules]]
1089pattern = "*mypy*"
1090format = "mypy"
1091
1092[[rules]]
1093pattern = "*flake8*"
1094format = "flake8"
1095
1096[[rules]]
1097pattern = "*pylint*"
1098format = "pylint"
1099
1100# JavaScript/TypeScript
1101[[rules]]
1102pattern = "*eslint*"
1103format = "eslint"
1104
1105[[rules]]
1106pattern = "*tsc*"
1107format = "typescript"
1108
1109[[rules]]
1110pattern = "*jest*"
1111format = "jest"
1112
1113# Build systems
1114[[rules]]
1115pattern = "*make*"
1116format = "make_error"
1117
1118[[rules]]
1119pattern = "*cmake*"
1120format = "cmake"
1121
1122[[rules]]
1123pattern = "*ninja*"
1124format = "ninja"
1125
1126# Go
1127[[rules]]
1128pattern = "*go build*"
1129format = "go_build"
1130
1131[[rules]]
1132pattern = "*go test*"
1133format = "go_test"
1134
1135# Default: use duck_hunt's auto-detection
1136[default]
1137format = "auto"
1138"#;
1139
1140pub fn is_initialized(config: &Config) -> bool {
1142 config.db_path().exists()
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use tempfile::TempDir;
1149
1150 #[test]
1151 fn test_initialize_creates_structure() {
1152 let tmp = TempDir::new().unwrap();
1153 let config = Config::with_root(tmp.path());
1154
1155 initialize(&config).unwrap();
1156
1157 assert!(config.db_path().exists());
1159 assert!(config.recent_dir().join("attempts").exists());
1161 assert!(config.recent_dir().join("outcomes").exists());
1162 assert!(config.recent_dir().join("outputs").exists());
1163 assert!(config.recent_dir().join("sessions").exists());
1164 assert!(config.blobs_dir().exists());
1165 assert!(config.extensions_dir().exists());
1166 assert!(config.sql_dir().exists());
1167 assert!(config.bird_root.join("config.toml").exists());
1168 }
1169
1170 #[test]
1171 fn test_initialize_twice_fails() {
1172 let tmp = TempDir::new().unwrap();
1173 let config = Config::with_root(tmp.path());
1174
1175 initialize(&config).unwrap();
1176
1177 let result = initialize(&config);
1179 assert!(matches!(result, Err(Error::AlreadyInitialized(_))));
1180 }
1181
1182 #[test]
1183 fn test_is_initialized() {
1184 let tmp = TempDir::new().unwrap();
1185 let config = Config::with_root(tmp.path());
1186
1187 assert!(!is_initialized(&config));
1188 initialize(&config).unwrap();
1189 assert!(is_initialized(&config));
1190 }
1191}