1mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod pending;
11mod remote;
12mod sessions;
13
14use std::fs;
15use std::thread;
16use std::time::Duration;
17
18use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
19use duckdb::{
20 params,
21 types::{TimeUnit, Value, ValueRef},
22 Connection,
23};
24
25use crate::config::StorageMode;
26use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
27use crate::{Config, Error, Result};
28
29fn format_value(value: &Value) -> String {
32 match value {
33 Value::Null => "NULL".to_string(),
34 Value::Boolean(b) => b.to_string(),
35 Value::TinyInt(n) => n.to_string(),
36 Value::SmallInt(n) => n.to_string(),
37 Value::Int(n) => n.to_string(),
38 Value::BigInt(n) => n.to_string(),
39 Value::HugeInt(n) => n.to_string(),
40 Value::UTinyInt(n) => n.to_string(),
41 Value::USmallInt(n) => n.to_string(),
42 Value::UInt(n) => n.to_string(),
43 Value::UBigInt(n) => n.to_string(),
44 Value::Float(f) => f.to_string(),
45 Value::Double(f) => f.to_string(),
46 Value::Decimal(d) => d.to_string(),
47 Value::Timestamp(_, micros) => {
48 DateTime::<Utc>::from_timestamp_micros(*micros)
49 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
50 .unwrap_or_else(|| format!("<timestamp {}>", micros))
51 }
52 Value::Text(s) => s.clone(),
53 Value::Blob(b) => format!("<blob {} bytes>", b.len()),
54 Value::Date32(days) => {
55 NaiveDate::from_ymd_opt(1970, 1, 1)
56 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
57 .map(|d| d.format("%Y-%m-%d").to_string())
58 .unwrap_or_else(|| format!("<date {}>", days))
59 }
60 Value::Time64(_, micros) => {
61 let secs = (*micros / 1_000_000) as u32;
62 let micro_part = (*micros % 1_000_000) as u32;
63 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
64 .map(|t| t.format("%H:%M:%S").to_string())
65 .unwrap_or_else(|| format!("<time {}>", micros))
66 }
67 Value::Interval { months, days, nanos } => {
68 format!("{} months {} days {} ns", months, days, nanos)
69 }
70 Value::List(items) => {
72 let formatted: Vec<String> = items.iter().map(format_value).collect();
73 format!("[{}]", formatted.join(", "))
74 }
75 Value::Array(items) => {
76 let formatted: Vec<String> = items.iter().map(format_value).collect();
77 format!("[{}]", formatted.join(", "))
78 }
79 Value::Map(map) => {
80 let formatted: Vec<String> = map
81 .iter()
82 .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
83 .collect();
84 format!("{{{}}}", formatted.join(", "))
85 }
86 Value::Struct(fields) => {
87 let formatted: Vec<String> = fields
88 .iter()
89 .map(|(k, v)| format!("{}: {}", k, format_value(v)))
90 .collect();
91 format!("{{{}}}", formatted.join(", "))
92 }
93 Value::Enum(s) => s.clone(),
94 _ => "<unknown>".to_string(),
95 }
96}
97
98pub use compact::{
100 ArchiveStats, AutoCompactOptions, CleanOptions, CleanStats, CompactOptions, CompactStats,
101 PruneStats,
102};
103pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
104pub use invocations::InvocationSummary;
105pub use outputs::OutputInfo;
106pub use pending::{
107 delete_pending_file, is_runner_alive, list_pending_files, write_pending_file,
108 PendingInvocation, RecoveryStats,
109};
110pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
111
112#[derive(Debug, Default)]
120pub struct InvocationBatch {
121 pub invocation: Option<InvocationRecord>,
123
124 pub outputs: Vec<(String, Vec<u8>)>,
127
128 pub session: Option<SessionRecord>,
130
131 pub events: Option<Vec<EventRecord>>,
133}
134
135impl InvocationBatch {
136 pub fn new(invocation: InvocationRecord) -> Self {
138 Self {
139 invocation: Some(invocation),
140 outputs: Vec::new(),
141 session: None,
142 events: None,
143 }
144 }
145
146 pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
148 self.outputs.push((stream.into(), content));
149 self
150 }
151
152 pub fn with_session(mut self, session: SessionRecord) -> Self {
154 self.session = Some(session);
155 self
156 }
157
158 pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
160 self.events = Some(events);
161 self
162 }
163}
164
165#[derive(Debug, Clone, Default)]
179pub struct ConnectionOptions {
180 pub attach_remotes: bool,
184
185 pub attach_project: bool,
187
188 pub create_ephemeral_views: bool,
191
192 pub run_migration: bool,
195}
196
197impl ConnectionOptions {
198 pub fn full() -> Self {
200 Self {
201 attach_remotes: true,
202 attach_project: true,
203 create_ephemeral_views: true,
204 run_migration: false,
205 }
206 }
207
208 pub fn minimal() -> Self {
211 Self {
212 attach_remotes: false,
213 attach_project: false,
214 create_ephemeral_views: false,
215 run_migration: false,
216 }
217 }
218
219 pub fn for_migration() -> Self {
221 Self {
222 attach_remotes: false,
223 attach_project: false,
224 create_ephemeral_views: false,
225 run_migration: true,
226 }
227 }
228}
229
230fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
239 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
241 return Ok(true);
242 }
243
244 if conn.execute(&format!("INSTALL {}", name), []).is_ok()
246 && conn.execute(&format!("LOAD {}", name), []).is_ok()
247 {
248 return Ok(true);
249 }
250
251 if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
253 && conn.execute(&format!("LOAD {}", name), []).is_ok()
254 {
255 return Ok(true);
256 }
257
258 Ok(false)
259}
260
261pub struct Store {
263 config: Config,
264}
265
266impl Store {
267 pub fn open(config: Config) -> Result<Self> {
269 if !config.db_path().exists() {
270 return Err(Error::NotInitialized(config.bird_root.clone()));
271 }
272 Ok(Self { config })
273 }
274
275 fn open_connection_with_retry(&self) -> Result<Connection> {
281 const MAX_RETRIES: u32 = 10;
282 const INITIAL_DELAY_MS: u64 = 10;
283 const MAX_DELAY_MS: u64 = 1000;
284
285 let db_path = self.config.db_path();
286 let mut delay_ms = INITIAL_DELAY_MS;
287 let mut last_error = None;
288
289 for attempt in 0..MAX_RETRIES {
290 match Connection::open(&db_path) {
291 Ok(conn) => return Ok(conn),
292 Err(e) => {
293 let err_msg = e.to_string();
294 if err_msg.contains("Could not set lock")
296 || err_msg.contains("Conflicting lock")
297 || err_msg.contains("database is locked")
298 {
299 last_error = Some(e);
300 if attempt < MAX_RETRIES - 1 {
301 let jitter = (attempt as u64 * 7) % 10;
303 thread::sleep(Duration::from_millis(delay_ms + jitter));
304 delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
305 continue;
306 }
307 } else {
308 return Err(e.into());
310 }
311 }
312 }
313 }
314
315 Err(last_error
317 .map(|e| e.into())
318 .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
319 }
320
321 pub fn connection(&self) -> Result<Connection> {
323 self.connect(ConnectionOptions::full())
324 }
325
326 pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
328 let opts = if attach_remotes {
329 ConnectionOptions::full()
330 } else {
331 ConnectionOptions::minimal()
332 };
333 self.connect(opts)
334 }
335
336 pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
346 let conn = self.open_connection_with_retry()?;
347
348 conn.execute("SET allow_community_extensions = true", [])?;
352
353 for ext in ["parquet", "icu"] {
354 if !ensure_extension(&conn, ext)? {
355 return Err(Error::Extension(format!(
356 "Required extension '{}' could not be loaded",
357 ext
358 )));
359 }
360 }
361
362 for (ext, desc) in [
364 ("scalarfs", "data: URL support for inline blobs"),
365 ("duck_hunt", "log/output parsing for event extraction"),
366 ] {
367 if !ensure_extension(&conn, ext)? {
368 eprintln!("Warning: {} extension not available ({})", ext, desc);
369 }
370 }
371
372 conn.execute(
374 &format!(
375 "SET file_search_path = '{}'",
376 self.config.data_dir().display()
377 ),
378 [],
379 )?;
380
381 if opts.run_migration {
383 self.migrate_to_new_schema(&conn)?;
384 }
385
386 self.setup_s3_credentials(&conn)?;
389 self.setup_blob_resolution(&conn)?;
390
391 if opts.attach_remotes && !self.config.remotes.is_empty() {
393 self.attach_remotes(&conn)?;
394 self.create_remote_macros(&conn)?;
395 }
396
397 if opts.attach_project {
399 self.attach_project_db(&conn)?;
400 }
401
402 if opts.create_ephemeral_views {
405 self.create_cwd_macros(&conn)?;
406 }
407
408 Ok(conn)
409 }
410
411 fn attach_project_db(&self, conn: &Connection) -> Result<()> {
416 use crate::project::find_current_project;
417
418 let Some(project) = find_current_project() else {
419 return Ok(()); };
421
422 if !project.is_initialized() {
423 return Ok(()); }
425
426 if project.db_path == self.config.db_path() {
428 return Ok(());
429 }
430
431 let attach_sql = format!(
433 "ATTACH '{}' AS project (READ_ONLY)",
434 project.db_path.display()
435 );
436
437 if let Err(e) = conn.execute(&attach_sql, []) {
438 eprintln!("Note: Could not attach project database: {}", e);
440 }
441
442 Ok(())
443 }
444
445 fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
450 let local_exists: bool = conn
452 .query_row(
453 "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
454 [],
455 |row| row.get(0),
456 )
457 .unwrap_or(false);
458
459 if local_exists {
460 return Ok(());
461 }
462
463 eprintln!("Note: Migrating to new schema architecture...");
469
470 conn.execute_batch(
472 r#"
473 CREATE SCHEMA IF NOT EXISTS local;
474 CREATE SCHEMA IF NOT EXISTS cached_placeholder;
475 CREATE SCHEMA IF NOT EXISTS remote_placeholder;
476 CREATE SCHEMA IF NOT EXISTS caches;
477 CREATE SCHEMA IF NOT EXISTS remotes;
478 CREATE SCHEMA IF NOT EXISTS unified;
479 CREATE SCHEMA IF NOT EXISTS cwd;
480 "#,
481 )?;
482
483 if self.config.storage_mode == crate::StorageMode::DuckDB {
485 conn.execute_batch(
486 r#"
487 CREATE TABLE IF NOT EXISTS local.sessions (
488 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
489 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
490 );
491 CREATE TABLE IF NOT EXISTS local.invocations (
492 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
493 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
494 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
495 tag VARCHAR, date DATE
496 );
497 CREATE TABLE IF NOT EXISTS local.outputs (
498 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
499 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
500 content_type VARCHAR, date DATE
501 );
502 CREATE TABLE IF NOT EXISTS local.events (
503 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
504 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
505 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
506 status VARCHAR, format_used VARCHAR, date DATE
507 );
508 "#,
509 )?;
510
511 let old_tables_exist: bool = conn
513 .query_row(
514 "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
515 [],
516 |row| row.get(0),
517 )
518 .unwrap_or(false);
519
520 if old_tables_exist {
521 conn.execute_batch(
522 r#"
523 INSERT INTO local.sessions SELECT * FROM sessions_table;
524 INSERT INTO local.invocations SELECT * FROM invocations_table;
525 INSERT INTO local.outputs SELECT * FROM outputs_table;
526 INSERT INTO local.events SELECT * FROM events_table;
527 "#,
528 )?;
529 }
530 } else {
531 conn.execute_batch(
533 r#"
534 CREATE OR REPLACE VIEW local.sessions AS
535 SELECT * EXCLUDE (filename) FROM read_parquet(
536 'recent/sessions/**/*.parquet',
537 union_by_name = true, hive_partitioning = true, filename = true
538 );
539 CREATE OR REPLACE VIEW local.invocations AS
540 SELECT * EXCLUDE (filename) FROM read_parquet(
541 'recent/invocations/**/*.parquet',
542 union_by_name = true, hive_partitioning = true, filename = true
543 );
544 CREATE OR REPLACE VIEW local.outputs AS
545 SELECT * EXCLUDE (filename) FROM read_parquet(
546 'recent/outputs/**/*.parquet',
547 union_by_name = true, hive_partitioning = true, filename = true
548 );
549 CREATE OR REPLACE VIEW local.events AS
550 SELECT * EXCLUDE (filename) FROM read_parquet(
551 'recent/events/**/*.parquet',
552 union_by_name = true, hive_partitioning = true, filename = true
553 );
554 "#,
555 )?;
556 }
557
558 conn.execute_batch(
560 r#"
561 CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
562 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
563 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
564 );
565 CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
566 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
567 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
568 status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
569 );
570 CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
571 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
572 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
573 content_type VARCHAR, date DATE, _source VARCHAR
574 );
575 CREATE TABLE IF NOT EXISTS cached_placeholder.events (
576 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
577 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
578 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
579 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
580 );
581 CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
582 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
583 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
584 );
585 CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
586 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
587 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR, exit_code INTEGER,
588 status VARCHAR, format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE, _source VARCHAR
589 );
590 CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
591 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
592 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
593 content_type VARCHAR, date DATE, _source VARCHAR
594 );
595 CREATE TABLE IF NOT EXISTS remote_placeholder.events (
596 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
597 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
598 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
599 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
600 );
601 "#,
602 )?;
603
604 conn.execute_batch(
606 r#"
607 CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
608 CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
609 CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
610 CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
611
612 CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
613 CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
614 CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
615 CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
616
617 CREATE OR REPLACE VIEW main.sessions AS
618 SELECT *, 'local' as _source FROM local.sessions
619 UNION ALL BY NAME SELECT * FROM caches.sessions;
620 CREATE OR REPLACE VIEW main.invocations AS
621 SELECT *, 'local' as _source FROM local.invocations
622 UNION ALL BY NAME SELECT * FROM caches.invocations;
623 CREATE OR REPLACE VIEW main.outputs AS
624 SELECT *, 'local' as _source FROM local.outputs
625 UNION ALL BY NAME SELECT * FROM caches.outputs;
626 CREATE OR REPLACE VIEW main.events AS
627 SELECT *, 'local' as _source FROM local.events
628 UNION ALL BY NAME SELECT * FROM caches.events;
629
630 CREATE OR REPLACE VIEW unified.sessions AS
631 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
632 CREATE OR REPLACE VIEW unified.invocations AS
633 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
634 CREATE OR REPLACE VIEW unified.outputs AS
635 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
636 CREATE OR REPLACE VIEW unified.events AS
637 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
638
639 -- Qualified views: deduplicated with source list
640 CREATE OR REPLACE VIEW unified.qualified_sessions AS
641 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
642 FROM unified.sessions GROUP BY ALL;
643 CREATE OR REPLACE VIEW unified.qualified_invocations AS
644 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
645 FROM unified.invocations GROUP BY ALL;
646 CREATE OR REPLACE VIEW unified.qualified_outputs AS
647 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
648 FROM unified.outputs GROUP BY ALL;
649 CREATE OR REPLACE VIEW unified.qualified_events AS
650 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
651 FROM unified.events GROUP BY ALL;
652 "#,
653 )?;
654
655 Ok(())
656 }
657
658 fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
661 let has_s3 = self.config.remotes.iter().any(|r| {
663 r.remote_type == crate::config::RemoteType::S3
664 });
665
666 if !has_s3 {
667 return Ok(());
668 }
669
670 for remote in &self.config.remotes {
672 if remote.remote_type == crate::config::RemoteType::S3 {
673 if let Some(provider) = &remote.credential_provider {
674 let secret_sql = format!(
675 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
676 remote.name, provider
677 );
678 if let Err(e) = conn.execute(&secret_sql, []) {
679 eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
680 }
681 }
682 }
683 }
684
685 Ok(())
686 }
687
688 fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
695 let blob_roots = self.config.blob_roots();
696
697 let roots_sql: String = blob_roots
699 .iter()
700 .map(|r| format!("'{}'", r.replace('\'', "''")))
701 .collect::<Vec<_>>()
702 .join(", ");
703
704 conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
706
707 conn.execute(
709 r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
710 ref[:5] = 'data:' OR ref[:5] = 'data+'
711 )"#,
712 [],
713 )?;
714
715 conn.execute(
717 r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
718 ref[:5] = 'file:'
719 )"#,
720 [],
721 )?;
722
723 conn.execute(
728 r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
729 CASE
730 WHEN is_inline_data(ref) THEN [ref]
731 WHEN is_file_ref(ref) THEN
732 [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
733 ELSE [ref]
734 END
735 )"#,
736 [],
737 )?;
738
739 Ok(())
740 }
741
742 fn attach_remotes(&self, conn: &Connection) -> Result<()> {
745 let remotes = self.config.auto_attach_remotes();
746
747 let remote_data_dirs: Vec<String> = remotes
749 .iter()
750 .filter_map(|r| r.data_dir())
751 .map(|p| p.display().to_string())
752 .collect();
753
754 if !remote_data_dirs.is_empty() {
756 let current_path: String = conn
757 .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
758 .unwrap_or_default();
759
760 let mut paths: Vec<&str> = if current_path.is_empty() {
761 vec![]
762 } else {
763 current_path.split(',').collect()
764 };
765
766 for dir in &remote_data_dirs {
767 if !paths.contains(&dir.as_str()) {
768 paths.push(dir);
769 }
770 }
771
772 let new_path = paths.join(",");
773 if let Err(e) = conn.execute(&format!("SET file_search_path = '{}'", new_path), []) {
774 eprintln!("Warning: Failed to set file_search_path: {}", e);
775 }
776 }
777
778 for remote in &remotes {
780 let attach_sql = remote.attach_sql();
781 if let Err(e) = conn.execute(&attach_sql, []) {
782 eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
783 }
784 }
785
786 Ok(())
787 }
788
789 pub(crate) fn detect_remote_table_path(&self, conn: &Connection, remote_schema: &str) -> String {
798 let check_sql = format!(
800 "SELECT 1 FROM information_schema.tables \
801 WHERE table_catalog = '{}' AND table_schema = 'local' AND table_name = 'invocations' \
802 LIMIT 1",
803 remote_schema.trim_matches('"')
804 );
805 if conn.execute(&check_sql, []).is_ok() {
806 if let Ok(mut stmt) = conn.prepare(&check_sql) {
807 if stmt.query([]).is_ok_and(|mut rows| rows.next().is_ok_and(|r| r.is_some())) {
808 return "local.".to_string();
809 }
810 }
811 }
812
813 String::new()
815 }
816
817 fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
827 let remotes = self.config.auto_attach_remotes();
828 if remotes.is_empty() {
829 return Ok(());
830 }
831
832 for remote in &remotes {
834 let schema = remote.quoted_schema_name();
835 let name = &remote.name;
836 let safe_name = name.replace(['-', '.'], "_");
838
839 let table_prefix = self.detect_remote_table_path(conn, &schema);
841
842 for table in &["sessions", "invocations", "outputs", "events"] {
843 let macro_name = format!("\"remote_{safe_name}_{table}\"");
844 let sql = format!(
845 r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
846 SELECT *, '{name}' as _source FROM {schema}.{prefix}{table}
847 )"#,
848 macro_name = macro_name,
849 name = name,
850 schema = schema,
851 prefix = table_prefix,
852 table = table
853 );
854 if let Err(e) = conn.execute(&sql, []) {
855 eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
856 }
857 }
858 }
859
860 for table in &["sessions", "invocations", "outputs", "events"] {
862 let mut union_parts: Vec<String> = remotes
863 .iter()
864 .map(|r| {
865 let safe_name = r.name.replace(['-', '.'], "_");
866 format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
867 })
868 .collect();
869
870 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
872
873 let sql = format!(
874 r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
875 {union}
876 )"#,
877 table = table,
878 union = union_parts.join(" UNION ALL BY NAME ")
879 );
880 if let Err(e) = conn.execute(&sql, []) {
881 eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
882 }
883 }
884
885 for table in &["sessions", "invocations", "outputs", "events"] {
889 let mut union_parts: Vec<String> = remotes
891 .iter()
892 .map(|r| {
893 let safe_name = r.name.replace(['-', '.'], "_");
894 format!(
895 "SELECT * FROM \"remote_{safe_name}_{table}\"()",
896 safe_name = safe_name,
897 table = table
898 )
899 })
900 .collect();
901
902 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
904
905 let remotes_sql = format!(
906 "CREATE OR REPLACE VIEW remotes.{table} AS {union}",
907 table = table,
908 union = union_parts.join(" UNION ALL BY NAME ")
909 );
910 if let Err(e) = conn.execute(&remotes_sql, []) {
911 eprintln!("Warning: Failed to rebuild remotes.{} view: {}", table, e);
912 }
913 }
914
915 let unified_views = r#"
917 CREATE OR REPLACE VIEW unified.sessions AS
918 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
919 CREATE OR REPLACE VIEW unified.invocations AS
920 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
921 CREATE OR REPLACE VIEW unified.outputs AS
922 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
923 CREATE OR REPLACE VIEW unified.events AS
924 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
925 "#;
926 if let Err(e) = conn.execute_batch(unified_views) {
927 eprintln!("Warning: Failed to rebuild unified views: {}", e);
928 }
929
930 Ok(())
931 }
932
933 fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
940 let cwd = std::env::current_dir()
941 .map(|p| p.to_string_lossy().to_string())
942 .unwrap_or_default();
943 let cwd_escaped = cwd.replace('\'', "''");
944
945 let macros = format!(
947 r#"
948 CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
949 SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
950 );
951 CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
952 SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
953 );
954 CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
955 SELECT o.* FROM main.outputs o
956 JOIN main.invocations i ON o.invocation_id = i.id
957 WHERE i.cwd LIKE '{}%'
958 );
959 CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
960 SELECT e.* FROM main.events e
961 JOIN main.invocations i ON e.invocation_id = i.id
962 WHERE i.cwd LIKE '{}%'
963 );
964 "#,
965 cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
966 );
967
968 conn.execute_batch(¯os)?;
969 Ok(())
970 }
971
972 pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
974 if let Some(provider) = &remote.credential_provider {
976 if remote.remote_type == crate::config::RemoteType::S3 {
977 let secret_sql = format!(
978 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
979 remote.name, provider
980 );
981 conn.execute(&secret_sql, [])?;
982 }
983 }
984
985 if let Some(remote_data_dir) = remote.data_dir() {
988 let current_path: String = conn
990 .query_row("SELECT current_setting('file_search_path')", [], |r| r.get(0))
991 .unwrap_or_default();
992
993 let remote_path = remote_data_dir.display().to_string();
994 let new_path = if current_path.is_empty() {
995 remote_path
996 } else if current_path.contains(&remote_path) {
997 current_path
999 } else {
1000 format!("{},{}", current_path, remote_path)
1001 };
1002
1003 conn.execute(&format!("SET file_search_path = '{}'", new_path), [])?;
1004 }
1005
1006 conn.execute(&remote.attach_sql(), [])?;
1008 Ok(())
1009 }
1010
1011 pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
1013 conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
1014 Ok(())
1015 }
1016
1017 pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
1019 let conn = self.connection_with_options(false)?;
1020 self.attach_remote(&conn, remote)?;
1021
1022 let test_sql = format!(
1024 "SELECT 1 FROM {}.invocations LIMIT 1",
1025 remote.quoted_schema_name()
1026 );
1027 conn.execute(&test_sql, [])?;
1028
1029 Ok(())
1030 }
1031
1032 pub fn config(&self) -> &Config {
1034 &self.config
1035 }
1036
1037 pub fn query(&self, sql: &str) -> Result<QueryResult> {
1041 let conn = self.connection()?;
1042 let mut stmt = conn.prepare(sql)?;
1043
1044 let mut rows_iter = stmt.query([])?;
1046
1047 let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
1049 let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
1050 (0..column_count)
1051 .map(|i| {
1052 row_ref
1053 .column_name(i)
1054 .map(|s| s.to_string())
1055 .unwrap_or_else(|_| format!("col{}", i))
1056 })
1057 .collect()
1058 } else {
1059 Vec::new()
1060 };
1061
1062 let mut result_rows = Vec::new();
1064 while let Some(row) = rows_iter.next()? {
1065 let mut values = Vec::with_capacity(column_count);
1066 for i in 0..column_count {
1067 let value = match row.get_ref(i)? {
1069 ValueRef::Null => "NULL".to_string(),
1070 ValueRef::Boolean(b) => b.to_string(),
1071 ValueRef::TinyInt(n) => n.to_string(),
1072 ValueRef::SmallInt(n) => n.to_string(),
1073 ValueRef::Int(n) => n.to_string(),
1074 ValueRef::BigInt(n) => n.to_string(),
1075 ValueRef::HugeInt(n) => n.to_string(),
1076 ValueRef::UTinyInt(n) => n.to_string(),
1077 ValueRef::USmallInt(n) => n.to_string(),
1078 ValueRef::UInt(n) => n.to_string(),
1079 ValueRef::UBigInt(n) => n.to_string(),
1080 ValueRef::Float(f) => f.to_string(),
1081 ValueRef::Double(f) => f.to_string(),
1082 ValueRef::Decimal(d) => d.to_string(),
1083 ValueRef::Timestamp(unit, val) => {
1084 let micros = match unit {
1086 TimeUnit::Second => val * 1_000_000,
1087 TimeUnit::Millisecond => val * 1_000,
1088 TimeUnit::Microsecond => val,
1089 TimeUnit::Nanosecond => val / 1_000,
1090 };
1091 DateTime::<Utc>::from_timestamp_micros(micros)
1092 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
1093 .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
1094 }
1095 ValueRef::Date32(days) => {
1096 NaiveDate::from_ymd_opt(1970, 1, 1)
1098 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
1099 .map(|d| d.format("%Y-%m-%d").to_string())
1100 .unwrap_or_else(|| format!("<invalid date {}>", days))
1101 }
1102 ValueRef::Time64(unit, val) => {
1103 let micros = match unit {
1105 TimeUnit::Second => val * 1_000_000,
1106 TimeUnit::Millisecond => val * 1_000,
1107 TimeUnit::Microsecond => val,
1108 TimeUnit::Nanosecond => val / 1_000,
1109 };
1110 let secs = (micros / 1_000_000) as u32;
1111 let micro_part = (micros % 1_000_000) as u32;
1112 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
1113 .map(|t| t.format("%H:%M:%S").to_string())
1114 .unwrap_or_else(|| format!("<invalid time {}>", val))
1115 }
1116 ValueRef::Interval { months, days, nanos } => {
1117 format!("{} months {} days {} ns", months, days, nanos)
1118 }
1119 ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
1120 ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
1121 other => {
1122 let owned: Value = other.into();
1124 format_value(&owned)
1125 }
1126 };
1127 values.push(value);
1128 }
1129 result_rows.push(values);
1130 }
1131
1132 Ok(QueryResult {
1133 columns: column_names,
1134 rows: result_rows,
1135 })
1136 }
1137
1138 pub fn last_invocation_with_output(
1140 &self,
1141 ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1142 if let Some(inv) = self.last_invocation()? {
1143 let output = self.get_output(&inv.id)?;
1144 Ok(Some((inv, output)))
1145 } else {
1146 Ok(None)
1147 }
1148 }
1149
1150 pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1156 let invocation = batch
1157 .invocation
1158 .as_ref()
1159 .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1160
1161 match self.config.storage_mode {
1162 StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1163 StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1164 }
1165 }
1166
1167 fn write_batch_parquet(
1169 &self,
1170 batch: &InvocationBatch,
1171 invocation: &InvocationRecord,
1172 ) -> Result<()> {
1173 if let Some(ref session) = batch.session {
1179 self.ensure_session(session)?;
1180 }
1181
1182 self.write_invocation(invocation)?;
1184
1185 let date = invocation.date();
1186 let inv_id = invocation.id;
1187
1188 for (stream, content) in &batch.outputs {
1190 self.store_output(
1191 inv_id,
1192 stream,
1193 content,
1194 date,
1195 invocation.executable.as_deref(),
1196 )?;
1197 }
1198
1199 if let Some(ref events) = batch.events {
1201 if !events.is_empty() {
1202 self.write_events(events)?;
1203 }
1204 }
1205
1206 Ok(())
1207 }
1208
1209 fn write_batch_duckdb(
1211 &self,
1212 batch: &InvocationBatch,
1213 invocation: &InvocationRecord,
1214 ) -> Result<()> {
1215 let conn = self.connection()?;
1216
1217 conn.execute("BEGIN TRANSACTION", [])?;
1219
1220 let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1221
1222 match result {
1223 Ok(()) => {
1224 conn.execute("COMMIT", [])?;
1225 Ok(())
1226 }
1227 Err(e) => {
1228 let _ = conn.execute("ROLLBACK", []);
1230 Err(e)
1231 }
1232 }
1233 }
1234
1235 fn write_batch_duckdb_inner(
1237 &self,
1238 conn: &Connection,
1239 batch: &InvocationBatch,
1240 invocation: &InvocationRecord,
1241 ) -> Result<()> {
1242 use base64::Engine;
1243
1244 let date = invocation.date();
1245 let inv_id = invocation.id;
1246
1247 if let Some(ref session) = batch.session {
1249 let exists: i64 = conn
1251 .query_row(
1252 "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1253 params![&session.session_id],
1254 |row| row.get(0),
1255 )
1256 .unwrap_or(0);
1257
1258 if exists == 0 {
1259 conn.execute(
1260 r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1261 params![
1262 session.session_id,
1263 session.client_id,
1264 session.invoker,
1265 session.invoker_pid,
1266 session.invoker_type,
1267 session.registered_at.to_rfc3339(),
1268 session.cwd,
1269 session.date.to_string(),
1270 ],
1271 )?;
1272 }
1273 }
1274
1275 conn.execute(
1277 r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1278 params![
1279 invocation.id.to_string(),
1280 invocation.session_id,
1281 invocation.timestamp.to_rfc3339(),
1282 invocation.duration_ms,
1283 invocation.cwd,
1284 invocation.cmd,
1285 invocation.executable,
1286 invocation.runner_id,
1287 invocation.exit_code,
1288 invocation.status,
1289 invocation.format_hint,
1290 invocation.client_id,
1291 invocation.hostname,
1292 invocation.username,
1293 invocation.tag,
1294 date.to_string(),
1295 ],
1296 )?;
1297
1298 for (stream, content) in &batch.outputs {
1300 let hash = blake3::hash(content);
1302 let hash_hex = hash.to_hex().to_string();
1303
1304 let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1306 let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1308 let data_url = format!("data:application/octet-stream;base64,{}", b64);
1309 ("inline".to_string(), data_url)
1310 } else {
1311 let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1313 let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1314
1315 if let Some(parent) = blob_path.parent() {
1316 fs::create_dir_all(parent)?;
1317 }
1318
1319 let rel_path = blob_path
1320 .strip_prefix(self.config.data_dir())
1321 .map(|p| p.to_string_lossy().to_string())
1322 .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1323
1324 let wrote_new = atomic::write_file(&blob_path, content)?;
1326
1327 if wrote_new {
1328 conn.execute(
1329 "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1330 params![&hash_hex, content.len() as i64, &rel_path],
1331 )?;
1332 } else {
1333 conn.execute(
1334 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1335 params![&hash_hex],
1336 )?;
1337 }
1338
1339 ("blob".to_string(), format!("file://{}", rel_path))
1340 };
1341
1342 let output_id = uuid::Uuid::now_v7();
1344 conn.execute(
1345 r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1346 params![
1347 output_id.to_string(),
1348 inv_id.to_string(),
1349 stream,
1350 hash_hex,
1351 content.len() as i64,
1352 storage_type,
1353 storage_ref,
1354 Option::<String>::None, date.to_string(),
1356 ],
1357 )?;
1358 }
1359
1360 if let Some(ref events) = batch.events {
1362 for event in events {
1363 conn.execute(
1364 r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1365 params![
1366 event.id.to_string(),
1367 event.invocation_id.to_string(),
1368 event.client_id,
1369 event.hostname,
1370 event.event_type,
1371 event.severity,
1372 event.ref_file,
1373 event.ref_line,
1374 event.ref_column,
1375 event.message,
1376 event.error_code,
1377 event.test_name,
1378 event.status,
1379 event.format_used,
1380 event.date.to_string(),
1381 ],
1382 )?;
1383 }
1384 }
1385
1386 Ok(())
1387 }
1388
1389 pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1391 let path = self.config.format_hints_path();
1392
1393 if path.exists() {
1395 return crate::FormatHints::load(&path);
1396 }
1397
1398 let legacy_path = self.config.event_formats_path();
1400 if legacy_path.exists() {
1401 return crate::FormatHints::load(&legacy_path);
1402 }
1403
1404 Ok(crate::FormatHints::new())
1405 }
1406
1407 pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1409 hints.save(&self.config.format_hints_path())
1410 }
1411
1412 pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1421 let hints = self.load_format_hints()?;
1422 Ok(hints.detect(cmd).to_string())
1423 }
1424
1425 pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1430 let conn = self.connection()?;
1431
1432 let mut stmt = conn.prepare(
1433 "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1434 )?;
1435
1436 let rows = stmt.query_map([], |row| {
1437 Ok(BuiltinFormat {
1438 format: row.get(0)?,
1439 pattern: row.get::<_, String>(1)?, priority: row.get(2)?,
1441 })
1442 })?;
1443
1444 let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1445 Ok(results)
1446 }
1447
1448 pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1454 let hints = self.load_format_hints()?;
1455
1456 for hint in hints.hints() {
1458 if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1459 return Ok(FormatMatch {
1460 format: hint.format.clone(),
1461 source: FormatSource::UserDefined {
1462 pattern: hint.pattern.clone(),
1463 priority: hint.priority,
1464 },
1465 });
1466 }
1467 }
1468
1469 Ok(FormatMatch {
1471 format: hints.default_format().to_string(),
1472 source: FormatSource::Default,
1473 })
1474 }
1475}
1476
1477#[derive(Debug, Clone)]
1479pub struct BuiltinFormat {
1480 pub format: String,
1481 pub pattern: String,
1482 pub priority: i32,
1483}
1484
1485#[derive(Debug, Clone)]
1487pub struct FormatMatch {
1488 pub format: String,
1489 pub source: FormatSource,
1490}
1491
1492#[derive(Debug, Clone)]
1494pub enum FormatSource {
1495 UserDefined { pattern: String, priority: i32 },
1496 Builtin { pattern: String, priority: i32 },
1497 Default,
1498}
1499
1500#[derive(Debug)]
1502pub struct QueryResult {
1503 pub columns: Vec<String>,
1504 pub rows: Vec<Vec<String>>,
1505}
1506
1507fn sanitize_filename(s: &str) -> String {
1509 s.chars()
1510 .map(|c| match c {
1511 '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1512 ' ' => '-',
1513 c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1514 _ => '_',
1515 })
1516 .take(64)
1517 .collect()
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522 use super::*;
1523 use crate::init::initialize;
1524 use crate::schema::SessionRecord;
1525 use tempfile::TempDir;
1526
1527 fn setup_store() -> (TempDir, Store) {
1528 let tmp = TempDir::new().unwrap();
1529 let config = Config::with_root(tmp.path());
1530 initialize(&config).unwrap();
1531 let store = Store::open(config).unwrap();
1532 (tmp, store)
1533 }
1534
1535 fn setup_store_duckdb() -> (TempDir, Store) {
1536 let tmp = TempDir::new().unwrap();
1537 let config = Config::with_duckdb_mode(tmp.path());
1538 initialize(&config).unwrap();
1539 let store = Store::open(config).unwrap();
1540 (tmp, store)
1541 }
1542
1543 #[test]
1544 fn test_store_open_uninitialized_fails() {
1545 let tmp = TempDir::new().unwrap();
1546 let config = Config::with_root(tmp.path());
1547
1548 let result = Store::open(config);
1549 assert!(matches!(result, Err(Error::NotInitialized(_))));
1550 }
1551
1552 #[test]
1553 fn test_sanitize_filename() {
1554 assert_eq!(sanitize_filename("make test"), "make-test");
1555 assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1556 assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1557 }
1558
1559 #[test]
1562 fn test_batch_write_parquet_invocation_only() {
1563 let (_tmp, store) = setup_store();
1564
1565 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1566
1567 let batch = InvocationBatch::new(inv);
1568 store.write_batch(&batch).unwrap();
1569
1570 assert_eq!(store.invocation_count().unwrap(), 1);
1571 }
1572
1573 #[test]
1574 fn test_batch_write_parquet_with_output() {
1575 let (_tmp, store) = setup_store();
1576
1577 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1578 let inv_id = inv.id;
1579
1580 let batch = InvocationBatch::new(inv)
1581 .with_output("stdout", b"hello world\n".to_vec());
1582
1583 store.write_batch(&batch).unwrap();
1584
1585 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1586 assert_eq!(outputs.len(), 1);
1587 assert_eq!(outputs[0].stream, "stdout");
1588 }
1589
1590 #[test]
1591 fn test_batch_write_parquet_with_session() {
1592 let (_tmp, store) = setup_store();
1593
1594 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1595 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1596
1597 let batch = InvocationBatch::new(inv).with_session(session);
1598 store.write_batch(&batch).unwrap();
1599
1600 assert!(store.session_exists("test-session").unwrap());
1601 }
1602
1603 #[test]
1604 fn test_batch_write_parquet_full() {
1605 let (_tmp, store) = setup_store();
1606
1607 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1608 let inv_id = inv.id;
1609 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1610
1611 let batch = InvocationBatch::new(inv)
1612 .with_session(session)
1613 .with_output("stdout", b"Building...\n".to_vec())
1614 .with_output("stderr", b"error: failed\n".to_vec());
1615
1616 store.write_batch(&batch).unwrap();
1617
1618 assert_eq!(store.invocation_count().unwrap(), 1);
1619 assert!(store.session_exists("test-session").unwrap());
1620
1621 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1622 assert_eq!(outputs.len(), 2);
1623 }
1624
1625 #[test]
1628 fn test_batch_write_duckdb_invocation_only() {
1629 let (_tmp, store) = setup_store_duckdb();
1630
1631 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1632
1633 let batch = InvocationBatch::new(inv);
1634 store.write_batch(&batch).unwrap();
1635
1636 assert_eq!(store.invocation_count().unwrap(), 1);
1637 }
1638
1639 #[test]
1640 fn test_batch_write_duckdb_with_output() {
1641 let (_tmp, store) = setup_store_duckdb();
1642
1643 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1644 let inv_id = inv.id;
1645
1646 let batch = InvocationBatch::new(inv)
1647 .with_output("stdout", b"hello world\n".to_vec());
1648
1649 store.write_batch(&batch).unwrap();
1650
1651 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1652 assert_eq!(outputs.len(), 1);
1653 assert_eq!(outputs[0].stream, "stdout");
1654 }
1655
1656 #[test]
1657 fn test_batch_write_duckdb_with_session() {
1658 let (_tmp, store) = setup_store_duckdb();
1659
1660 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1661 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1662
1663 let batch = InvocationBatch::new(inv).with_session(session);
1664 store.write_batch(&batch).unwrap();
1665
1666 assert!(store.session_exists("test-session").unwrap());
1667 }
1668
1669 #[test]
1670 fn test_batch_write_duckdb_full() {
1671 let (_tmp, store) = setup_store_duckdb();
1672
1673 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1674 let inv_id = inv.id;
1675 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1676
1677 let batch = InvocationBatch::new(inv)
1678 .with_session(session)
1679 .with_output("stdout", b"Building...\n".to_vec())
1680 .with_output("stderr", b"error: failed\n".to_vec());
1681
1682 store.write_batch(&batch).unwrap();
1683
1684 assert_eq!(store.invocation_count().unwrap(), 1);
1685 assert!(store.session_exists("test-session").unwrap());
1686
1687 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1688 assert_eq!(outputs.len(), 2);
1689 }
1690
1691 #[test]
1692 fn test_batch_requires_invocation() {
1693 let (_tmp, store) = setup_store();
1694
1695 let batch = InvocationBatch::default();
1696 let result = store.write_batch(&batch);
1697
1698 assert!(result.is_err());
1699 }
1700
1701 #[test]
1704 fn test_ensure_extension_parquet() {
1705 let conn = duckdb::Connection::open_in_memory().unwrap();
1707 let result = ensure_extension(&conn, "parquet").unwrap();
1708 assert!(result, "parquet extension should be loadable");
1709 }
1710
1711 #[test]
1712 fn test_ensure_extension_icu() {
1713 let conn = duckdb::Connection::open_in_memory().unwrap();
1715 let result = ensure_extension(&conn, "icu").unwrap();
1716 assert!(result, "icu extension should be loadable");
1717 }
1718
1719 #[test]
1720 fn test_ensure_extension_community() {
1721 let conn = duckdb::Connection::open_in_memory().unwrap();
1723 conn.execute("SET allow_community_extensions = true", []).unwrap();
1724
1725 let result = ensure_extension(&conn, "scalarfs").unwrap();
1727 assert!(result, "scalarfs extension should be loadable from community");
1728
1729 let result = ensure_extension(&conn, "duck_hunt").unwrap();
1730 assert!(result, "duck_hunt extension should be loadable from community");
1731 }
1732
1733 #[test]
1734 fn test_ensure_extension_nonexistent() {
1735 let conn = duckdb::Connection::open_in_memory().unwrap();
1736 conn.execute("SET allow_community_extensions = true", []).unwrap();
1737
1738 let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1740 assert!(!result, "nonexistent extension should return false");
1741 }
1742
1743 #[test]
1744 fn test_extension_loading_is_cached() {
1745 let conn = duckdb::Connection::open_in_memory().unwrap();
1747
1748 ensure_extension(&conn, "parquet").unwrap();
1750
1751 let start = std::time::Instant::now();
1753 ensure_extension(&conn, "parquet").unwrap();
1754 let elapsed = start.elapsed();
1755
1756 assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1758 }
1759}