1mod atomic;
16mod attempts;
17mod compact;
18mod events;
19mod invocations;
20mod outcomes;
21mod outputs;
22mod pending;
23mod remote;
24mod sessions;
25
26use std::fs;
27use std::thread;
28use std::time::Duration;
29
30use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
31use duckdb::{
32 params,
33 types::{TimeUnit, Value, ValueRef},
34 Connection,
35};
36
37use crate::config::StorageMode;
38use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
39use crate::{Config, Error, Result};
40
41fn format_value(value: &Value) -> String {
44 match value {
45 Value::Null => "NULL".to_string(),
46 Value::Boolean(b) => b.to_string(),
47 Value::TinyInt(n) => n.to_string(),
48 Value::SmallInt(n) => n.to_string(),
49 Value::Int(n) => n.to_string(),
50 Value::BigInt(n) => n.to_string(),
51 Value::HugeInt(n) => n.to_string(),
52 Value::UTinyInt(n) => n.to_string(),
53 Value::USmallInt(n) => n.to_string(),
54 Value::UInt(n) => n.to_string(),
55 Value::UBigInt(n) => n.to_string(),
56 Value::Float(f) => f.to_string(),
57 Value::Double(f) => f.to_string(),
58 Value::Decimal(d) => d.to_string(),
59 Value::Timestamp(_, micros) => {
60 DateTime::<Utc>::from_timestamp_micros(*micros)
61 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
62 .unwrap_or_else(|| format!("<timestamp {}>", micros))
63 }
64 Value::Text(s) => s.clone(),
65 Value::Blob(b) => format!("<blob {} bytes>", b.len()),
66 Value::Date32(days) => {
67 NaiveDate::from_ymd_opt(1970, 1, 1)
68 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
69 .map(|d| d.format("%Y-%m-%d").to_string())
70 .unwrap_or_else(|| format!("<date {}>", days))
71 }
72 Value::Time64(_, micros) => {
73 let secs = (*micros / 1_000_000) as u32;
74 let micro_part = (*micros % 1_000_000) as u32;
75 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
76 .map(|t| t.format("%H:%M:%S").to_string())
77 .unwrap_or_else(|| format!("<time {}>", micros))
78 }
79 Value::Interval { months, days, nanos } => {
80 format!("{} months {} days {} ns", months, days, nanos)
81 }
82 Value::List(items) => {
84 let formatted: Vec<String> = items.iter().map(format_value).collect();
85 format!("[{}]", formatted.join(", "))
86 }
87 Value::Array(items) => {
88 let formatted: Vec<String> = items.iter().map(format_value).collect();
89 format!("[{}]", formatted.join(", "))
90 }
91 Value::Map(map) => {
92 let formatted: Vec<String> = map
93 .iter()
94 .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
95 .collect();
96 format!("{{{}}}", formatted.join(", "))
97 }
98 Value::Struct(fields) => {
99 let formatted: Vec<String> = fields
100 .iter()
101 .map(|(k, v)| format!("{}: {}", k, format_value(v)))
102 .collect();
103 format!("{{{}}}", formatted.join(", "))
104 }
105 Value::Enum(s) => s.clone(),
106 _ => "<unknown>".to_string(),
107 }
108}
109
110pub use compact::{
112 ArchiveStats, AutoCompactOptions, CleanOptions, CleanStats, CompactOptions, CompactStats,
113 PruneStats,
114};
115pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
116pub use invocations::InvocationSummary;
117pub use outputs::OutputInfo;
118pub use pending::{is_runner_alive, RecoveryStats};
119pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
120
121#[derive(Debug, Default)]
129pub struct InvocationBatch {
130 pub invocation: Option<InvocationRecord>,
132
133 pub outputs: Vec<(String, Vec<u8>)>,
136
137 pub session: Option<SessionRecord>,
139
140 pub events: Option<Vec<EventRecord>>,
142}
143
144impl InvocationBatch {
145 pub fn new(invocation: InvocationRecord) -> Self {
147 Self {
148 invocation: Some(invocation),
149 outputs: Vec::new(),
150 session: None,
151 events: None,
152 }
153 }
154
155 pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
157 self.outputs.push((stream.into(), content));
158 self
159 }
160
161 pub fn with_session(mut self, session: SessionRecord) -> Self {
163 self.session = Some(session);
164 self
165 }
166
167 pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
169 self.events = Some(events);
170 self
171 }
172}
173
174#[derive(Debug, Clone, Default)]
188pub struct ConnectionOptions {
189 pub attach_remotes: bool,
193
194 pub attach_project: bool,
196
197 pub create_ephemeral_views: bool,
200
201 pub run_migration: bool,
204}
205
206impl ConnectionOptions {
207 pub fn full() -> Self {
209 Self {
210 attach_remotes: true,
211 attach_project: true,
212 create_ephemeral_views: true,
213 run_migration: false,
214 }
215 }
216
217 pub fn minimal() -> Self {
220 Self {
221 attach_remotes: false,
222 attach_project: false,
223 create_ephemeral_views: false,
224 run_migration: false,
225 }
226 }
227
228 pub fn for_migration() -> Self {
230 Self {
231 attach_remotes: false,
232 attach_project: false,
233 create_ephemeral_views: false,
234 run_migration: true,
235 }
236 }
237}
238
239fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
248 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
250 return Ok(true);
251 }
252
253 if conn.execute(&format!("INSTALL {}", name), []).is_ok()
255 && conn.execute(&format!("LOAD {}", name), []).is_ok()
256 {
257 return Ok(true);
258 }
259
260 if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
262 && conn.execute(&format!("LOAD {}", name), []).is_ok()
263 {
264 return Ok(true);
265 }
266
267 Ok(false)
268}
269
270pub struct Store {
272 config: Config,
273}
274
275impl Store {
276 pub fn open(config: Config) -> Result<Self> {
278 if !config.db_path().exists() {
279 return Err(Error::NotInitialized(config.bird_root.clone()));
280 }
281 Ok(Self { config })
282 }
283
284 fn open_connection_with_retry(&self) -> Result<Connection> {
290 const MAX_RETRIES: u32 = 10;
291 const INITIAL_DELAY_MS: u64 = 10;
292 const MAX_DELAY_MS: u64 = 1000;
293
294 let db_path = self.config.db_path();
295 let mut delay_ms = INITIAL_DELAY_MS;
296 let mut last_error = None;
297
298 for attempt in 0..MAX_RETRIES {
299 match Connection::open(&db_path) {
300 Ok(conn) => return Ok(conn),
301 Err(e) => {
302 let err_msg = e.to_string();
303 if err_msg.contains("Could not set lock")
305 || err_msg.contains("Conflicting lock")
306 || err_msg.contains("database is locked")
307 {
308 last_error = Some(e);
309 if attempt < MAX_RETRIES - 1 {
310 let jitter = (attempt as u64 * 7) % 10;
312 thread::sleep(Duration::from_millis(delay_ms + jitter));
313 delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
314 continue;
315 }
316 } else {
317 return Err(e.into());
319 }
320 }
321 }
322 }
323
324 Err(last_error
326 .map(|e| e.into())
327 .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
328 }
329
330 pub fn connection(&self) -> Result<Connection> {
332 self.connect(ConnectionOptions::full())
333 }
334
335 pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
337 let opts = if attach_remotes {
338 ConnectionOptions::full()
339 } else {
340 ConnectionOptions::minimal()
341 };
342 self.connect(opts)
343 }
344
345 pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
355 let conn = self.open_connection_with_retry()?;
356
357 conn.execute("SET allow_community_extensions = true", [])?;
361
362 for ext in ["parquet", "icu"] {
363 if !ensure_extension(&conn, ext)? {
364 return Err(Error::Extension(format!(
365 "Required extension '{}' could not be loaded",
366 ext
367 )));
368 }
369 }
370
371 for (ext, desc) in [
373 ("scalarfs", "data: URL support for inline blobs"),
374 ("duck_hunt", "log/output parsing for event extraction"),
375 ] {
376 if !ensure_extension(&conn, ext)? {
377 eprintln!("Warning: {} extension not available ({})", ext, desc);
378 }
379 }
380
381 conn.execute(
383 &format!(
384 "SET file_search_path = '{}'",
385 self.config.data_dir().display()
386 ),
387 [],
388 )?;
389
390 if opts.run_migration {
392 self.migrate_to_new_schema(&conn)?;
393 }
394
395 self.setup_s3_credentials(&conn)?;
398 self.setup_blob_resolution(&conn)?;
399
400 if opts.attach_remotes && !self.config.remotes.is_empty() {
402 self.attach_remotes(&conn)?;
403 self.create_remote_macros(&conn)?;
404 }
405
406 if opts.attach_project {
408 self.attach_project_db(&conn)?;
409 }
410
411 if opts.create_ephemeral_views {
414 self.create_cwd_macros(&conn)?;
415 }
416
417 Ok(conn)
418 }
419
420 fn attach_project_db(&self, conn: &Connection) -> Result<()> {
425 use crate::project::find_current_project;
426
427 let Some(project) = find_current_project() else {
428 return Ok(()); };
430
431 if !project.is_initialized() {
432 return Ok(()); }
434
435 if project.db_path == self.config.db_path() {
437 return Ok(());
438 }
439
440 let attach_sql = format!(
442 "ATTACH '{}' AS project (READ_ONLY)",
443 project.db_path.display()
444 );
445
446 if let Err(e) = conn.execute(&attach_sql, []) {
447 eprintln!("Note: Could not attach project database: {}", e);
449 }
450
451 Ok(())
452 }
453
454 fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
459 let local_exists: bool = conn
461 .query_row(
462 "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
463 [],
464 |row| row.get(0),
465 )
466 .unwrap_or(false);
467
468 if local_exists {
469 return Ok(());
470 }
471
472 eprintln!("Note: Migrating to new schema architecture...");
478
479 conn.execute_batch(
481 r#"
482 CREATE SCHEMA IF NOT EXISTS local;
483 CREATE SCHEMA IF NOT EXISTS cached_placeholder;
484 CREATE SCHEMA IF NOT EXISTS remote_placeholder;
485 CREATE SCHEMA IF NOT EXISTS caches;
486 CREATE SCHEMA IF NOT EXISTS remotes;
487 CREATE SCHEMA IF NOT EXISTS unified;
488 CREATE SCHEMA IF NOT EXISTS cwd;
489 "#,
490 )?;
491
492 if self.config.storage_mode == crate::StorageMode::DuckDB {
494 conn.execute_batch(
495 r#"
496 CREATE TABLE IF NOT EXISTS local.sessions (
497 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
498 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
499 );
500 CREATE TABLE IF NOT EXISTS local.invocations (
501 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
502 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
503 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
504 tag VARCHAR, date DATE
505 );
506 CREATE TABLE IF NOT EXISTS local.outputs (
507 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
508 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
509 content_type VARCHAR, date DATE
510 );
511 CREATE TABLE IF NOT EXISTS local.events (
512 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
513 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
514 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
515 status VARCHAR, format_used VARCHAR, date DATE
516 );
517 "#,
518 )?;
519
520 let old_tables_exist: bool = conn
522 .query_row(
523 "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
524 [],
525 |row| row.get(0),
526 )
527 .unwrap_or(false);
528
529 if old_tables_exist {
530 conn.execute_batch(
531 r#"
532 INSERT INTO local.sessions SELECT * FROM sessions_table;
533 INSERT INTO local.invocations SELECT * FROM invocations_table;
534 INSERT INTO local.outputs SELECT * FROM outputs_table;
535 INSERT INTO local.events SELECT * FROM events_table;
536 "#,
537 )?;
538 }
539 } else {
540 conn.execute_batch(
542 r#"
543 CREATE OR REPLACE VIEW local.sessions AS
544 SELECT * EXCLUDE (filename) FROM read_parquet(
545 'recent/sessions/**/*.parquet',
546 union_by_name = true, hive_partitioning = true, filename = true
547 );
548 CREATE OR REPLACE VIEW local.invocations AS
549 SELECT * EXCLUDE (filename) FROM read_parquet(
550 'recent/invocations/**/*.parquet',
551 union_by_name = true, hive_partitioning = true, filename = true
552 );
553 CREATE OR REPLACE VIEW local.outputs AS
554 SELECT * EXCLUDE (filename) FROM read_parquet(
555 'recent/outputs/**/*.parquet',
556 union_by_name = true, hive_partitioning = true, filename = true
557 );
558 CREATE OR REPLACE VIEW local.events AS
559 SELECT * EXCLUDE (filename) FROM read_parquet(
560 'recent/events/**/*.parquet',
561 union_by_name = true, hive_partitioning = true, filename = true
562 );
563 "#,
564 )?;
565 }
566
567 conn.execute_batch(
569 r#"
570 CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
571 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
572 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
573 );
574 CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
575 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
576 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
577 status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
578 );
579 CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
580 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
581 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
582 content_type VARCHAR, date DATE, _source VARCHAR
583 );
584 CREATE TABLE IF NOT EXISTS cached_placeholder.events (
585 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
586 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
587 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
588 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
589 );
590 CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
591 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
592 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
593 );
594 CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
595 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
596 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
597 status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
598 );
599 CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
600 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
601 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
602 content_type VARCHAR, date DATE, _source VARCHAR
603 );
604 CREATE TABLE IF NOT EXISTS remote_placeholder.events (
605 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
606 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
607 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
608 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
609 );
610 "#,
611 )?;
612
613 conn.execute_batch(
615 r#"
616 CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
617 CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
618 CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
619 CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
620
621 CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
622 CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
623 CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
624 CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
625
626 CREATE OR REPLACE VIEW main.sessions AS
627 SELECT *, 'local' as _source FROM local.sessions
628 UNION ALL BY NAME SELECT * FROM caches.sessions;
629 CREATE OR REPLACE VIEW main.invocations AS
630 SELECT *, 'local' as _source FROM local.invocations
631 UNION ALL BY NAME SELECT * FROM caches.invocations;
632 CREATE OR REPLACE VIEW main.outputs AS
633 SELECT *, 'local' as _source FROM local.outputs
634 UNION ALL BY NAME SELECT * FROM caches.outputs;
635 CREATE OR REPLACE VIEW main.events AS
636 SELECT *, 'local' as _source FROM local.events
637 UNION ALL BY NAME SELECT * FROM caches.events;
638
639 CREATE OR REPLACE VIEW unified.sessions AS
640 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
641 CREATE OR REPLACE VIEW unified.invocations AS
642 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
643 CREATE OR REPLACE VIEW unified.outputs AS
644 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
645 CREATE OR REPLACE VIEW unified.events AS
646 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
647
648 -- Qualified views: deduplicated with source list
649 CREATE OR REPLACE VIEW unified.qualified_sessions AS
650 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
651 FROM unified.sessions GROUP BY ALL;
652 CREATE OR REPLACE VIEW unified.qualified_invocations AS
653 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
654 FROM unified.invocations GROUP BY ALL;
655 CREATE OR REPLACE VIEW unified.qualified_outputs AS
656 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
657 FROM unified.outputs GROUP BY ALL;
658 CREATE OR REPLACE VIEW unified.qualified_events AS
659 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
660 FROM unified.events GROUP BY ALL;
661 "#,
662 )?;
663
664 Ok(())
665 }
666
667 fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
670 let has_s3 = self.config.remotes.iter().any(|r| {
672 r.remote_type == crate::config::RemoteType::S3
673 });
674
675 if !has_s3 {
676 return Ok(());
677 }
678
679 for remote in &self.config.remotes {
681 if remote.remote_type == crate::config::RemoteType::S3 {
682 if let Some(provider) = &remote.credential_provider {
683 let secret_sql = format!(
684 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
685 remote.name, provider
686 );
687 if let Err(e) = conn.execute(&secret_sql, []) {
688 eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
689 }
690 }
691 }
692 }
693
694 Ok(())
695 }
696
697 fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
704 let blob_roots = self.config.blob_roots();
705
706 let roots_sql: String = blob_roots
708 .iter()
709 .map(|r| format!("'{}'", r.replace('\'', "''")))
710 .collect::<Vec<_>>()
711 .join(", ");
712
713 conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
715
716 conn.execute(
718 r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
719 ref[:5] = 'data:' OR ref[:5] = 'data+'
720 )"#,
721 [],
722 )?;
723
724 conn.execute(
726 r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
727 ref[:5] = 'file:'
728 )"#,
729 [],
730 )?;
731
732 conn.execute(
737 r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
738 CASE
739 WHEN is_inline_data(ref) THEN [ref]
740 WHEN is_file_ref(ref) THEN
741 [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
742 ELSE [ref]
743 END
744 )"#,
745 [],
746 )?;
747
748 Ok(())
749 }
750
751 fn attach_remotes(&self, conn: &Connection) -> Result<()> {
754 let remotes = self.config.auto_attach_remotes();
755
756 let remote_data_dirs: Vec<String> = remotes
758 .iter()
759 .filter_map(|r| r.data_dir())
760 .map(|p| p.display().to_string())
761 .collect();
762
763 if !remote_data_dirs.is_empty() {
765 let current_path: String = conn
766 .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
767 .unwrap_or_default();
768
769 let mut paths: Vec<&str> = if current_path.is_empty() {
770 vec![]
771 } else {
772 current_path.split(',').collect()
773 };
774
775 for dir in &remote_data_dirs {
776 if !paths.contains(&dir.as_str()) {
777 paths.push(dir);
778 }
779 }
780
781 let new_path = paths.join(",");
782 if let Err(e) = conn.execute(&format!("SET file_search_path = '{}'", new_path), []) {
783 eprintln!("Warning: Failed to set file_search_path: {}", e);
784 }
785 }
786
787 for remote in &remotes {
789 let attach_sql = remote.attach_sql();
790 if let Err(e) = conn.execute(&attach_sql, []) {
791 eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
792 }
793 }
794
795 Ok(())
796 }
797
798 pub(crate) fn detect_remote_table_path(&self, conn: &Connection, remote_schema: &str) -> String {
807 let check_sql = format!(
809 "SELECT 1 FROM information_schema.tables \
810 WHERE table_catalog = '{}' AND table_schema = 'local' AND table_name = 'invocations' \
811 LIMIT 1",
812 remote_schema.trim_matches('"')
813 );
814 if conn.execute(&check_sql, []).is_ok() {
815 if let Ok(mut stmt) = conn.prepare(&check_sql) {
816 if stmt.query([]).is_ok_and(|mut rows| rows.next().is_ok_and(|r| r.is_some())) {
817 return "local.".to_string();
818 }
819 }
820 }
821
822 String::new()
824 }
825
826 fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
836 let remotes = self.config.auto_attach_remotes();
837 if remotes.is_empty() {
838 return Ok(());
839 }
840
841 for remote in &remotes {
843 let schema = remote.quoted_schema_name();
844 let name = &remote.name;
845 let safe_name = name.replace(['-', '.'], "_");
847
848 let table_prefix = self.detect_remote_table_path(conn, &schema);
850
851 for table in &["sessions", "invocations", "outputs", "events"] {
852 let macro_name = format!("\"remote_{safe_name}_{table}\"");
853 let sql = format!(
854 r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
855 SELECT *, '{name}' as _source FROM {schema}.{prefix}{table}
856 )"#,
857 macro_name = macro_name,
858 name = name,
859 schema = schema,
860 prefix = table_prefix,
861 table = table
862 );
863 if let Err(e) = conn.execute(&sql, []) {
864 eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
865 }
866 }
867 }
868
869 for table in &["sessions", "invocations", "outputs", "events"] {
871 let mut union_parts: Vec<String> = remotes
872 .iter()
873 .map(|r| {
874 let safe_name = r.name.replace(['-', '.'], "_");
875 format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
876 })
877 .collect();
878
879 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
881
882 let sql = format!(
883 r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
884 {union}
885 )"#,
886 table = table,
887 union = union_parts.join(" UNION ALL BY NAME ")
888 );
889 if let Err(e) = conn.execute(&sql, []) {
890 eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
891 }
892 }
893
894 for table in &["sessions", "invocations", "outputs", "events"] {
898 let mut union_parts: Vec<String> = remotes
900 .iter()
901 .map(|r| {
902 let safe_name = r.name.replace(['-', '.'], "_");
903 format!(
904 "SELECT * FROM \"remote_{safe_name}_{table}\"()",
905 safe_name = safe_name,
906 table = table
907 )
908 })
909 .collect();
910
911 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
913
914 let remotes_sql = format!(
915 "CREATE OR REPLACE VIEW remotes.{table} AS {union}",
916 table = table,
917 union = union_parts.join(" UNION ALL BY NAME ")
918 );
919 if let Err(e) = conn.execute(&remotes_sql, []) {
920 eprintln!("Warning: Failed to rebuild remotes.{} view: {}", table, e);
921 }
922 }
923
924 let unified_views = r#"
926 CREATE OR REPLACE VIEW unified.sessions AS
927 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
928 CREATE OR REPLACE VIEW unified.invocations AS
929 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
930 CREATE OR REPLACE VIEW unified.outputs AS
931 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
932 CREATE OR REPLACE VIEW unified.events AS
933 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
934 "#;
935 if let Err(e) = conn.execute_batch(unified_views) {
936 eprintln!("Warning: Failed to rebuild unified views: {}", e);
937 }
938
939 Ok(())
940 }
941
942 fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
949 let cwd = std::env::current_dir()
950 .map(|p| p.to_string_lossy().to_string())
951 .unwrap_or_default();
952 let cwd_escaped = cwd.replace('\'', "''");
953
954 let macros = format!(
956 r#"
957 CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
958 SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
959 );
960 CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
961 SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
962 );
963 CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
964 SELECT o.* FROM main.outputs o
965 JOIN main.invocations i ON o.invocation_id = i.id
966 WHERE i.cwd LIKE '{}%'
967 );
968 CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
969 SELECT e.* FROM main.events e
970 JOIN main.invocations i ON e.invocation_id = i.id
971 WHERE i.cwd LIKE '{}%'
972 );
973 "#,
974 cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
975 );
976
977 conn.execute_batch(¯os)?;
978 Ok(())
979 }
980
981 pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
983 if let Some(provider) = &remote.credential_provider {
985 if remote.remote_type == crate::config::RemoteType::S3 {
986 let secret_sql = format!(
987 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
988 remote.name, provider
989 );
990 conn.execute(&secret_sql, [])?;
991 }
992 }
993
994 if let Some(remote_data_dir) = remote.data_dir() {
997 let current_path: String = conn
999 .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
1000 .unwrap_or_default();
1001
1002 let remote_path = remote_data_dir.display().to_string();
1003 let new_path = if current_path.is_empty() {
1004 remote_path
1005 } else if current_path.contains(&remote_path) {
1006 current_path
1008 } else {
1009 format!("{},{}", current_path, remote_path)
1010 };
1011
1012 conn.execute(&format!("SET file_search_path = '{}'", new_path), [])?;
1013 }
1014
1015 conn.execute(&remote.attach_sql(), [])?;
1017 Ok(())
1018 }
1019
1020 pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
1022 conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
1023 Ok(())
1024 }
1025
1026 pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
1028 let conn = self.connection_with_options(false)?;
1029 self.attach_remote(&conn, remote)?;
1030
1031 let test_sql = format!(
1033 "SELECT 1 FROM {}.invocations LIMIT 1",
1034 remote.quoted_schema_name()
1035 );
1036 conn.execute(&test_sql, [])?;
1037
1038 Ok(())
1039 }
1040
1041 pub fn config(&self) -> &Config {
1043 &self.config
1044 }
1045
1046 pub fn query(&self, sql: &str) -> Result<QueryResult> {
1050 let conn = self.connection()?;
1051 let mut stmt = conn.prepare(sql)?;
1052
1053 let mut rows_iter = stmt.query([])?;
1055
1056 let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
1058 let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
1059 (0..column_count)
1060 .map(|i| {
1061 row_ref
1062 .column_name(i)
1063 .map(|s| s.to_string())
1064 .unwrap_or_else(|_| format!("col{}", i))
1065 })
1066 .collect()
1067 } else {
1068 Vec::new()
1069 };
1070
1071 let mut result_rows = Vec::new();
1073 while let Some(row) = rows_iter.next()? {
1074 let mut values = Vec::with_capacity(column_count);
1075 for i in 0..column_count {
1076 let value = match row.get_ref(i)? {
1078 ValueRef::Null => "NULL".to_string(),
1079 ValueRef::Boolean(b) => b.to_string(),
1080 ValueRef::TinyInt(n) => n.to_string(),
1081 ValueRef::SmallInt(n) => n.to_string(),
1082 ValueRef::Int(n) => n.to_string(),
1083 ValueRef::BigInt(n) => n.to_string(),
1084 ValueRef::HugeInt(n) => n.to_string(),
1085 ValueRef::UTinyInt(n) => n.to_string(),
1086 ValueRef::USmallInt(n) => n.to_string(),
1087 ValueRef::UInt(n) => n.to_string(),
1088 ValueRef::UBigInt(n) => n.to_string(),
1089 ValueRef::Float(f) => f.to_string(),
1090 ValueRef::Double(f) => f.to_string(),
1091 ValueRef::Decimal(d) => d.to_string(),
1092 ValueRef::Timestamp(unit, val) => {
1093 let micros = match unit {
1095 TimeUnit::Second => val * 1_000_000,
1096 TimeUnit::Millisecond => val * 1_000,
1097 TimeUnit::Microsecond => val,
1098 TimeUnit::Nanosecond => val / 1_000,
1099 };
1100 DateTime::<Utc>::from_timestamp_micros(micros)
1101 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
1102 .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
1103 }
1104 ValueRef::Date32(days) => {
1105 NaiveDate::from_ymd_opt(1970, 1, 1)
1107 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
1108 .map(|d| d.format("%Y-%m-%d").to_string())
1109 .unwrap_or_else(|| format!("<invalid date {}>", days))
1110 }
1111 ValueRef::Time64(unit, val) => {
1112 let micros = match unit {
1114 TimeUnit::Second => val * 1_000_000,
1115 TimeUnit::Millisecond => val * 1_000,
1116 TimeUnit::Microsecond => val,
1117 TimeUnit::Nanosecond => val / 1_000,
1118 };
1119 let secs = (micros / 1_000_000) as u32;
1120 let micro_part = (micros % 1_000_000) as u32;
1121 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
1122 .map(|t| t.format("%H:%M:%S").to_string())
1123 .unwrap_or_else(|| format!("<invalid time {}>", val))
1124 }
1125 ValueRef::Interval { months, days, nanos } => {
1126 format!("{} months {} days {} ns", months, days, nanos)
1127 }
1128 ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
1129 ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
1130 other => {
1131 let owned: Value = other.into();
1133 format_value(&owned)
1134 }
1135 };
1136 values.push(value);
1137 }
1138 result_rows.push(values);
1139 }
1140
1141 Ok(QueryResult {
1142 columns: column_names,
1143 rows: result_rows,
1144 })
1145 }
1146
1147 pub fn last_invocation_with_output(
1149 &self,
1150 ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1151 if let Some(inv) = self.last_invocation()? {
1152 let output = self.get_output(&inv.id)?;
1153 Ok(Some((inv, output)))
1154 } else {
1155 Ok(None)
1156 }
1157 }
1158
1159 pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1165 let invocation = batch
1166 .invocation
1167 .as_ref()
1168 .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1169
1170 match self.config.storage_mode {
1171 StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1172 StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1173 }
1174 }
1175
1176 fn write_batch_parquet(
1178 &self,
1179 batch: &InvocationBatch,
1180 invocation: &InvocationRecord,
1181 ) -> Result<()> {
1182 if let Some(ref session) = batch.session {
1188 self.ensure_session(session)?;
1189 }
1190
1191 self.write_invocation(invocation)?;
1193
1194 let date = invocation.date();
1195 let inv_id = invocation.id;
1196
1197 for (stream, content) in &batch.outputs {
1199 self.store_output(
1200 inv_id,
1201 stream,
1202 content,
1203 date,
1204 invocation.executable.as_deref(),
1205 )?;
1206 }
1207
1208 if let Some(ref events) = batch.events {
1210 if !events.is_empty() {
1211 self.write_events(events)?;
1212 }
1213 }
1214
1215 Ok(())
1216 }
1217
1218 fn write_batch_duckdb(
1220 &self,
1221 batch: &InvocationBatch,
1222 invocation: &InvocationRecord,
1223 ) -> Result<()> {
1224 let conn = self.connection()?;
1225
1226 conn.execute("BEGIN TRANSACTION", [])?;
1228
1229 let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1230
1231 match result {
1232 Ok(()) => {
1233 conn.execute("COMMIT", [])?;
1234 Ok(())
1235 }
1236 Err(e) => {
1237 let _ = conn.execute("ROLLBACK", []);
1239 Err(e)
1240 }
1241 }
1242 }
1243
1244 fn write_batch_duckdb_inner(
1246 &self,
1247 conn: &Connection,
1248 batch: &InvocationBatch,
1249 invocation: &InvocationRecord,
1250 ) -> Result<()> {
1251 use base64::Engine;
1252
1253 let date = invocation.date();
1254 let inv_id = invocation.id;
1255
1256 if let Some(ref session) = batch.session {
1258 let exists: i64 = conn
1260 .query_row(
1261 "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1262 params![&session.session_id],
1263 |row| row.get(0),
1264 )
1265 .unwrap_or(0);
1266
1267 if exists == 0 {
1268 conn.execute(
1269 r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1270 params![
1271 session.session_id,
1272 session.client_id,
1273 session.invoker,
1274 session.invoker_pid,
1275 session.invoker_type,
1276 session.registered_at.to_rfc3339(),
1277 session.cwd,
1278 session.date.to_string(),
1279 ],
1280 )?;
1281 }
1282 }
1283
1284 let attempt = invocation.to_attempt();
1286 let outcome = invocation.to_outcome();
1287
1288 let attempt_metadata_map = if attempt.metadata.is_empty() {
1290 "map([],[]::JSON[])".to_string()
1291 } else {
1292 let entries: Vec<String> = attempt.metadata.iter()
1293 .map(|(k, v)| {
1294 let key = k.replace('\'', "''");
1295 let value = v.to_string().replace('\'', "''");
1296 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
1297 })
1298 .collect();
1299 format!("map_from_entries([{}])", entries.join(", "))
1300 };
1301
1302 conn.execute(
1304 &format!(
1305 r#"INSERT INTO local.attempts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?)"#,
1306 attempt_metadata_map
1307 ),
1308 params![
1309 attempt.id.to_string(),
1310 attempt.timestamp.to_rfc3339(),
1311 attempt.cmd,
1312 attempt.cwd,
1313 attempt.session_id,
1314 attempt.tag,
1315 attempt.source_client,
1316 attempt.machine_id,
1317 attempt.hostname,
1318 attempt.executable,
1319 attempt.format_hint,
1320 date.to_string(),
1321 ],
1322 )?;
1323
1324 if let Some(outcome) = outcome {
1326 let outcome_metadata_map = if outcome.metadata.is_empty() {
1327 "map([],[]::JSON[])".to_string()
1328 } else {
1329 let entries: Vec<String> = outcome.metadata.iter()
1330 .map(|(k, v)| {
1331 let key = k.replace('\'', "''");
1332 let value = v.to_string().replace('\'', "''");
1333 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
1334 })
1335 .collect();
1336 format!("map_from_entries([{}])", entries.join(", "))
1337 };
1338
1339 conn.execute(
1340 &format!(
1341 r#"INSERT INTO local.outcomes VALUES (?, ?, ?, ?, ?, ?, {}, ?)"#,
1342 outcome_metadata_map
1343 ),
1344 params![
1345 outcome.attempt_id.to_string(),
1346 outcome.completed_at.to_rfc3339(),
1347 outcome.exit_code,
1348 outcome.duration_ms,
1349 outcome.signal,
1350 outcome.timeout,
1351 outcome.date.to_string(),
1352 ],
1353 )?;
1354 }
1355
1356 for (stream, content) in &batch.outputs {
1358 let hash = blake3::hash(content);
1360 let hash_hex = hash.to_hex().to_string();
1361
1362 let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1364 let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1366 let data_url = format!("data:application/octet-stream;base64,{}", b64);
1367 ("inline".to_string(), data_url)
1368 } else {
1369 let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1371 let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1372
1373 if let Some(parent) = blob_path.parent() {
1374 fs::create_dir_all(parent)?;
1375 }
1376
1377 let rel_path = blob_path
1378 .strip_prefix(self.config.data_dir())
1379 .map(|p| p.to_string_lossy().to_string())
1380 .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1381
1382 let wrote_new = atomic::write_file(&blob_path, content)?;
1384
1385 if wrote_new {
1386 conn.execute(
1387 "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1388 params![&hash_hex, content.len() as i64, &rel_path],
1389 )?;
1390 } else {
1391 conn.execute(
1392 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1393 params![&hash_hex],
1394 )?;
1395 }
1396
1397 ("blob".to_string(), format!("file://{}", rel_path))
1398 };
1399
1400 let output_id = uuid::Uuid::now_v7();
1402 conn.execute(
1403 r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1404 params![
1405 output_id.to_string(),
1406 inv_id.to_string(),
1407 stream,
1408 hash_hex,
1409 content.len() as i64,
1410 storage_type,
1411 storage_ref,
1412 Option::<String>::None, date.to_string(),
1414 ],
1415 )?;
1416 }
1417
1418 if let Some(ref events) = batch.events {
1420 for event in events {
1421 conn.execute(
1422 r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1423 params![
1424 event.id.to_string(),
1425 event.invocation_id.to_string(),
1426 event.client_id,
1427 event.hostname,
1428 event.event_type,
1429 event.severity,
1430 event.ref_file,
1431 event.ref_line,
1432 event.ref_column,
1433 event.message,
1434 event.error_code,
1435 event.test_name,
1436 event.status,
1437 event.format_used,
1438 event.date.to_string(),
1439 ],
1440 )?;
1441 }
1442 }
1443
1444 Ok(())
1445 }
1446
1447 pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1449 let path = self.config.format_hints_path();
1450
1451 if path.exists() {
1453 return crate::FormatHints::load(&path);
1454 }
1455
1456 let legacy_path = self.config.event_formats_path();
1458 if legacy_path.exists() {
1459 return crate::FormatHints::load(&legacy_path);
1460 }
1461
1462 Ok(crate::FormatHints::new())
1463 }
1464
1465 pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1467 hints.save(&self.config.format_hints_path())
1468 }
1469
1470 pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1479 let hints = self.load_format_hints()?;
1480 Ok(hints.detect(cmd).to_string())
1481 }
1482
1483 pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1488 let conn = self.connection()?;
1489
1490 let mut stmt = conn.prepare(
1491 "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1492 )?;
1493
1494 let rows = stmt.query_map([], |row| {
1495 Ok(BuiltinFormat {
1496 format: row.get(0)?,
1497 pattern: row.get::<_, String>(1)?, priority: row.get(2)?,
1499 })
1500 })?;
1501
1502 let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1503 Ok(results)
1504 }
1505
1506 pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1512 let hints = self.load_format_hints()?;
1513
1514 for hint in hints.hints() {
1516 if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1517 return Ok(FormatMatch {
1518 format: hint.format.clone(),
1519 source: FormatSource::UserDefined {
1520 pattern: hint.pattern.clone(),
1521 priority: hint.priority,
1522 },
1523 });
1524 }
1525 }
1526
1527 Ok(FormatMatch {
1529 format: hints.default_format().to_string(),
1530 source: FormatSource::Default,
1531 })
1532 }
1533}
1534
1535#[derive(Debug, Clone)]
1537pub struct BuiltinFormat {
1538 pub format: String,
1539 pub pattern: String,
1540 pub priority: i32,
1541}
1542
1543#[derive(Debug, Clone)]
1545pub struct FormatMatch {
1546 pub format: String,
1547 pub source: FormatSource,
1548}
1549
1550#[derive(Debug, Clone)]
1552pub enum FormatSource {
1553 UserDefined { pattern: String, priority: i32 },
1554 Builtin { pattern: String, priority: i32 },
1555 Default,
1556}
1557
1558#[derive(Debug)]
1560pub struct QueryResult {
1561 pub columns: Vec<String>,
1562 pub rows: Vec<Vec<String>>,
1563}
1564
1565fn sanitize_filename(s: &str) -> String {
1567 s.chars()
1568 .map(|c| match c {
1569 '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1570 ' ' => '-',
1571 c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1572 _ => '_',
1573 })
1574 .take(64)
1575 .collect()
1576}
1577
1578#[cfg(test)]
1579mod tests {
1580 use super::*;
1581 use crate::init::initialize;
1582 use crate::schema::SessionRecord;
1583 use tempfile::TempDir;
1584
1585 fn setup_store() -> (TempDir, Store) {
1586 let tmp = TempDir::new().unwrap();
1587 let config = Config::with_root(tmp.path());
1588 initialize(&config).unwrap();
1589 let store = Store::open(config).unwrap();
1590 (tmp, store)
1591 }
1592
1593 fn setup_store_duckdb() -> (TempDir, Store) {
1594 let tmp = TempDir::new().unwrap();
1595 let config = Config::with_duckdb_mode(tmp.path());
1596 initialize(&config).unwrap();
1597 let store = Store::open(config).unwrap();
1598 (tmp, store)
1599 }
1600
1601 #[test]
1602 fn test_store_open_uninitialized_fails() {
1603 let tmp = TempDir::new().unwrap();
1604 let config = Config::with_root(tmp.path());
1605
1606 let result = Store::open(config);
1607 assert!(matches!(result, Err(Error::NotInitialized(_))));
1608 }
1609
1610 #[test]
1611 fn test_sanitize_filename() {
1612 assert_eq!(sanitize_filename("make test"), "make-test");
1613 assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1614 assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1615 }
1616
1617 #[test]
1620 fn test_batch_write_parquet_invocation_only() {
1621 let (_tmp, store) = setup_store();
1622
1623 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1624
1625 let batch = InvocationBatch::new(inv);
1626 store.write_batch(&batch).unwrap();
1627
1628 assert_eq!(store.invocation_count().unwrap(), 1);
1629 }
1630
1631 #[test]
1632 fn test_batch_write_parquet_with_output() {
1633 let (_tmp, store) = setup_store();
1634
1635 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1636 let inv_id = inv.id;
1637
1638 let batch = InvocationBatch::new(inv)
1639 .with_output("stdout", b"hello world\n".to_vec());
1640
1641 store.write_batch(&batch).unwrap();
1642
1643 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1644 assert_eq!(outputs.len(), 1);
1645 assert_eq!(outputs[0].stream, "stdout");
1646 }
1647
1648 #[test]
1649 fn test_batch_write_parquet_with_session() {
1650 let (_tmp, store) = setup_store();
1651
1652 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1653 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1654
1655 let batch = InvocationBatch::new(inv).with_session(session);
1656 store.write_batch(&batch).unwrap();
1657
1658 assert!(store.session_exists("test-session").unwrap());
1659 }
1660
1661 #[test]
1662 fn test_batch_write_parquet_full() {
1663 let (_tmp, store) = setup_store();
1664
1665 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1666 let inv_id = inv.id;
1667 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1668
1669 let batch = InvocationBatch::new(inv)
1670 .with_session(session)
1671 .with_output("stdout", b"Building...\n".to_vec())
1672 .with_output("stderr", b"error: failed\n".to_vec());
1673
1674 store.write_batch(&batch).unwrap();
1675
1676 assert_eq!(store.invocation_count().unwrap(), 1);
1677 assert!(store.session_exists("test-session").unwrap());
1678
1679 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1680 assert_eq!(outputs.len(), 2);
1681 }
1682
1683 #[test]
1686 fn test_batch_write_duckdb_invocation_only() {
1687 let (_tmp, store) = setup_store_duckdb();
1688
1689 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1690
1691 let batch = InvocationBatch::new(inv);
1692 store.write_batch(&batch).unwrap();
1693
1694 assert_eq!(store.invocation_count().unwrap(), 1);
1695 }
1696
1697 #[test]
1698 fn test_batch_write_duckdb_with_output() {
1699 let (_tmp, store) = setup_store_duckdb();
1700
1701 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1702 let inv_id = inv.id;
1703
1704 let batch = InvocationBatch::new(inv)
1705 .with_output("stdout", b"hello world\n".to_vec());
1706
1707 store.write_batch(&batch).unwrap();
1708
1709 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1710 assert_eq!(outputs.len(), 1);
1711 assert_eq!(outputs[0].stream, "stdout");
1712 }
1713
1714 #[test]
1715 fn test_batch_write_duckdb_with_session() {
1716 let (_tmp, store) = setup_store_duckdb();
1717
1718 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1719 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1720
1721 let batch = InvocationBatch::new(inv).with_session(session);
1722 store.write_batch(&batch).unwrap();
1723
1724 assert!(store.session_exists("test-session").unwrap());
1725 }
1726
1727 #[test]
1728 fn test_batch_write_duckdb_full() {
1729 let (_tmp, store) = setup_store_duckdb();
1730
1731 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1732 let inv_id = inv.id;
1733 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1734
1735 let batch = InvocationBatch::new(inv)
1736 .with_session(session)
1737 .with_output("stdout", b"Building...\n".to_vec())
1738 .with_output("stderr", b"error: failed\n".to_vec());
1739
1740 store.write_batch(&batch).unwrap();
1741
1742 assert_eq!(store.invocation_count().unwrap(), 1);
1743 assert!(store.session_exists("test-session").unwrap());
1744
1745 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1746 assert_eq!(outputs.len(), 2);
1747 }
1748
1749 #[test]
1750 fn test_batch_requires_invocation() {
1751 let (_tmp, store) = setup_store();
1752
1753 let batch = InvocationBatch::default();
1754 let result = store.write_batch(&batch);
1755
1756 assert!(result.is_err());
1757 }
1758
1759 #[test]
1762 fn test_ensure_extension_parquet() {
1763 let conn = duckdb::Connection::open_in_memory().unwrap();
1765 let result = ensure_extension(&conn, "parquet").unwrap();
1766 assert!(result, "parquet extension should be loadable");
1767 }
1768
1769 #[test]
1770 fn test_ensure_extension_icu() {
1771 let conn = duckdb::Connection::open_in_memory().unwrap();
1773 let result = ensure_extension(&conn, "icu").unwrap();
1774 assert!(result, "icu extension should be loadable");
1775 }
1776
1777 #[test]
1778 fn test_ensure_extension_community() {
1779 let conn = duckdb::Connection::open_in_memory().unwrap();
1781 conn.execute("SET allow_community_extensions = true", []).unwrap();
1782
1783 let result = ensure_extension(&conn, "scalarfs").unwrap();
1785 assert!(result, "scalarfs extension should be loadable from community");
1786
1787 let result = ensure_extension(&conn, "duck_hunt").unwrap();
1788 assert!(result, "duck_hunt extension should be loadable from community");
1789 }
1790
1791 #[test]
1792 fn test_ensure_extension_nonexistent() {
1793 let conn = duckdb::Connection::open_in_memory().unwrap();
1794 conn.execute("SET allow_community_extensions = true", []).unwrap();
1795
1796 let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1798 assert!(!result, "nonexistent extension should return false");
1799 }
1800
1801 #[test]
1802 fn test_extension_loading_is_cached() {
1803 let conn = duckdb::Connection::open_in_memory().unwrap();
1805
1806 ensure_extension(&conn, "parquet").unwrap();
1808
1809 let start = std::time::Instant::now();
1811 ensure_extension(&conn, "parquet").unwrap();
1812 let elapsed = start.elapsed();
1813
1814 assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1816 }
1817}