1mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod remote;
11mod sessions;
12
13use std::fs;
14use std::thread;
15use std::time::Duration;
16
17use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
18use duckdb::{
19 params,
20 types::{TimeUnit, Value, ValueRef},
21 Connection,
22};
23
24use crate::config::StorageMode;
25use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
26use crate::{Config, Error, Result};
27
28fn format_value(value: &Value) -> String {
31 match value {
32 Value::Null => "NULL".to_string(),
33 Value::Boolean(b) => b.to_string(),
34 Value::TinyInt(n) => n.to_string(),
35 Value::SmallInt(n) => n.to_string(),
36 Value::Int(n) => n.to_string(),
37 Value::BigInt(n) => n.to_string(),
38 Value::HugeInt(n) => n.to_string(),
39 Value::UTinyInt(n) => n.to_string(),
40 Value::USmallInt(n) => n.to_string(),
41 Value::UInt(n) => n.to_string(),
42 Value::UBigInt(n) => n.to_string(),
43 Value::Float(f) => f.to_string(),
44 Value::Double(f) => f.to_string(),
45 Value::Decimal(d) => d.to_string(),
46 Value::Timestamp(_, micros) => {
47 DateTime::<Utc>::from_timestamp_micros(*micros)
48 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
49 .unwrap_or_else(|| format!("<timestamp {}>", micros))
50 }
51 Value::Text(s) => s.clone(),
52 Value::Blob(b) => format!("<blob {} bytes>", b.len()),
53 Value::Date32(days) => {
54 NaiveDate::from_ymd_opt(1970, 1, 1)
55 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
56 .map(|d| d.format("%Y-%m-%d").to_string())
57 .unwrap_or_else(|| format!("<date {}>", days))
58 }
59 Value::Time64(_, micros) => {
60 let secs = (*micros / 1_000_000) as u32;
61 let micro_part = (*micros % 1_000_000) as u32;
62 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
63 .map(|t| t.format("%H:%M:%S").to_string())
64 .unwrap_or_else(|| format!("<time {}>", micros))
65 }
66 Value::Interval { months, days, nanos } => {
67 format!("{} months {} days {} ns", months, days, nanos)
68 }
69 Value::List(items) => {
71 let formatted: Vec<String> = items.iter().map(format_value).collect();
72 format!("[{}]", formatted.join(", "))
73 }
74 Value::Array(items) => {
75 let formatted: Vec<String> = items.iter().map(format_value).collect();
76 format!("[{}]", formatted.join(", "))
77 }
78 Value::Map(map) => {
79 let formatted: Vec<String> = map
80 .iter()
81 .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
82 .collect();
83 format!("{{{}}}", formatted.join(", "))
84 }
85 Value::Struct(fields) => {
86 let formatted: Vec<String> = fields
87 .iter()
88 .map(|(k, v)| format!("{}: {}", k, format_value(v)))
89 .collect();
90 format!("{{{}}}", formatted.join(", "))
91 }
92 Value::Enum(s) => s.clone(),
93 _ => "<unknown>".to_string(),
94 }
95}
96
97pub use compact::{ArchiveStats, AutoCompactOptions, CompactOptions, CompactStats};
99pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
100pub use invocations::InvocationSummary;
101pub use outputs::OutputInfo;
102pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
103
104#[derive(Debug, Default)]
112pub struct InvocationBatch {
113 pub invocation: Option<InvocationRecord>,
115
116 pub outputs: Vec<(String, Vec<u8>)>,
119
120 pub session: Option<SessionRecord>,
122
123 pub events: Option<Vec<EventRecord>>,
125}
126
127impl InvocationBatch {
128 pub fn new(invocation: InvocationRecord) -> Self {
130 Self {
131 invocation: Some(invocation),
132 outputs: Vec::new(),
133 session: None,
134 events: None,
135 }
136 }
137
138 pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
140 self.outputs.push((stream.into(), content));
141 self
142 }
143
144 pub fn with_session(mut self, session: SessionRecord) -> Self {
146 self.session = Some(session);
147 self
148 }
149
150 pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
152 self.events = Some(events);
153 self
154 }
155}
156
157#[derive(Debug, Clone, Default)]
171pub struct ConnectionOptions {
172 pub attach_remotes: bool,
176
177 pub attach_project: bool,
179
180 pub create_ephemeral_views: bool,
183
184 pub run_migration: bool,
187}
188
189impl ConnectionOptions {
190 pub fn full() -> Self {
192 Self {
193 attach_remotes: true,
194 attach_project: true,
195 create_ephemeral_views: true,
196 run_migration: false,
197 }
198 }
199
200 pub fn minimal() -> Self {
203 Self {
204 attach_remotes: false,
205 attach_project: false,
206 create_ephemeral_views: false,
207 run_migration: false,
208 }
209 }
210
211 pub fn for_migration() -> Self {
213 Self {
214 attach_remotes: false,
215 attach_project: false,
216 create_ephemeral_views: false,
217 run_migration: true,
218 }
219 }
220}
221
222fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
231 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
233 return Ok(true);
234 }
235
236 if conn.execute(&format!("INSTALL {}", name), []).is_ok() {
238 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
239 return Ok(true);
240 }
241 }
242
243 if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok() {
245 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
246 return Ok(true);
247 }
248 }
249
250 Ok(false)
251}
252
253pub struct Store {
255 config: Config,
256}
257
258impl Store {
259 pub fn open(config: Config) -> Result<Self> {
261 if !config.db_path().exists() {
262 return Err(Error::NotInitialized(config.bird_root.clone()));
263 }
264 Ok(Self { config })
265 }
266
267 fn open_connection_with_retry(&self) -> Result<Connection> {
273 const MAX_RETRIES: u32 = 10;
274 const INITIAL_DELAY_MS: u64 = 10;
275 const MAX_DELAY_MS: u64 = 1000;
276
277 let db_path = self.config.db_path();
278 let mut delay_ms = INITIAL_DELAY_MS;
279 let mut last_error = None;
280
281 for attempt in 0..MAX_RETRIES {
282 match Connection::open(&db_path) {
283 Ok(conn) => return Ok(conn),
284 Err(e) => {
285 let err_msg = e.to_string();
286 if err_msg.contains("Could not set lock")
288 || err_msg.contains("Conflicting lock")
289 || err_msg.contains("database is locked")
290 {
291 last_error = Some(e);
292 if attempt < MAX_RETRIES - 1 {
293 let jitter = (attempt as u64 * 7) % 10;
295 thread::sleep(Duration::from_millis(delay_ms + jitter));
296 delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
297 continue;
298 }
299 } else {
300 return Err(e.into());
302 }
303 }
304 }
305 }
306
307 Err(last_error
309 .map(|e| e.into())
310 .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
311 }
312
313 pub fn connection(&self) -> Result<Connection> {
315 self.connect(ConnectionOptions::full())
316 }
317
318 pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
320 let opts = if attach_remotes {
321 ConnectionOptions::full()
322 } else {
323 ConnectionOptions::minimal()
324 };
325 self.connect(opts)
326 }
327
328 pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
338 let conn = self.open_connection_with_retry()?;
339
340 conn.execute("SET allow_community_extensions = true", [])?;
344
345 for ext in ["parquet", "icu"] {
346 if !ensure_extension(&conn, ext)? {
347 return Err(Error::Extension(format!(
348 "Required extension '{}' could not be loaded",
349 ext
350 )));
351 }
352 }
353
354 for (ext, desc) in [
356 ("scalarfs", "data: URL support for inline blobs"),
357 ("duck_hunt", "log/output parsing for event extraction"),
358 ] {
359 if !ensure_extension(&conn, ext)? {
360 eprintln!("Warning: {} extension not available ({})", ext, desc);
361 }
362 }
363
364 conn.execute(
366 &format!(
367 "SET file_search_path = '{}'",
368 self.config.data_dir().display()
369 ),
370 [],
371 )?;
372
373 if opts.run_migration {
375 self.migrate_to_new_schema(&conn)?;
376 }
377
378 self.setup_s3_credentials(&conn)?;
381 self.setup_blob_resolution(&conn)?;
382
383 if opts.attach_remotes && !self.config.remotes.is_empty() {
385 self.attach_remotes(&conn)?;
386 self.create_remote_macros(&conn)?;
387 }
388
389 if opts.attach_project {
391 self.attach_project_db(&conn)?;
392 }
393
394 if opts.create_ephemeral_views {
397 self.create_cwd_macros(&conn)?;
398 }
399
400 Ok(conn)
401 }
402
403 fn attach_project_db(&self, conn: &Connection) -> Result<()> {
408 use crate::project::find_current_project;
409
410 let Some(project) = find_current_project() else {
411 return Ok(()); };
413
414 if !project.is_initialized() {
415 return Ok(()); }
417
418 if project.db_path == self.config.db_path() {
420 return Ok(());
421 }
422
423 let attach_sql = format!(
425 "ATTACH '{}' AS project (READ_ONLY)",
426 project.db_path.display()
427 );
428
429 if let Err(e) = conn.execute(&attach_sql, []) {
430 eprintln!("Note: Could not attach project database: {}", e);
432 }
433
434 Ok(())
435 }
436
437 fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
442 let local_exists: bool = conn
444 .query_row(
445 "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
446 [],
447 |row| row.get(0),
448 )
449 .unwrap_or(false);
450
451 if local_exists {
452 return Ok(());
453 }
454
455 eprintln!("Note: Migrating to new schema architecture...");
461
462 conn.execute_batch(
464 r#"
465 CREATE SCHEMA IF NOT EXISTS local;
466 CREATE SCHEMA IF NOT EXISTS cached_placeholder;
467 CREATE SCHEMA IF NOT EXISTS remote_placeholder;
468 CREATE SCHEMA IF NOT EXISTS caches;
469 CREATE SCHEMA IF NOT EXISTS remotes;
470 CREATE SCHEMA IF NOT EXISTS unified;
471 CREATE SCHEMA IF NOT EXISTS cwd;
472 "#,
473 )?;
474
475 if self.config.storage_mode == crate::StorageMode::DuckDB {
477 conn.execute_batch(
478 r#"
479 CREATE TABLE IF NOT EXISTS local.sessions (
480 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
481 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
482 );
483 CREATE TABLE IF NOT EXISTS local.invocations (
484 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
485 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
486 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
487 tag VARCHAR, date DATE
488 );
489 CREATE TABLE IF NOT EXISTS local.outputs (
490 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
491 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
492 content_type VARCHAR, date DATE
493 );
494 CREATE TABLE IF NOT EXISTS local.events (
495 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
496 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
497 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
498 status VARCHAR, format_used VARCHAR, date DATE
499 );
500 "#,
501 )?;
502
503 let old_tables_exist: bool = conn
505 .query_row(
506 "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
507 [],
508 |row| row.get(0),
509 )
510 .unwrap_or(false);
511
512 if old_tables_exist {
513 conn.execute_batch(
514 r#"
515 INSERT INTO local.sessions SELECT * FROM sessions_table;
516 INSERT INTO local.invocations SELECT * FROM invocations_table;
517 INSERT INTO local.outputs SELECT * FROM outputs_table;
518 INSERT INTO local.events SELECT * FROM events_table;
519 "#,
520 )?;
521 }
522 } else {
523 conn.execute_batch(
525 r#"
526 CREATE OR REPLACE VIEW local.sessions AS
527 SELECT * EXCLUDE (filename) FROM read_parquet(
528 'recent/sessions/**/*.parquet',
529 union_by_name = true, hive_partitioning = true, filename = true
530 );
531 CREATE OR REPLACE VIEW local.invocations AS
532 SELECT * EXCLUDE (filename) FROM read_parquet(
533 'recent/invocations/**/*.parquet',
534 union_by_name = true, hive_partitioning = true, filename = true
535 );
536 CREATE OR REPLACE VIEW local.outputs AS
537 SELECT * EXCLUDE (filename) FROM read_parquet(
538 'recent/outputs/**/*.parquet',
539 union_by_name = true, hive_partitioning = true, filename = true
540 );
541 CREATE OR REPLACE VIEW local.events AS
542 SELECT * EXCLUDE (filename) FROM read_parquet(
543 'recent/events/**/*.parquet',
544 union_by_name = true, hive_partitioning = true, filename = true
545 );
546 "#,
547 )?;
548 }
549
550 conn.execute_batch(
552 r#"
553 CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
554 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
555 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
556 );
557 CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
558 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
559 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
560 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
561 );
562 CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
563 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
564 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
565 content_type VARCHAR, date DATE, _source VARCHAR
566 );
567 CREATE TABLE IF NOT EXISTS cached_placeholder.events (
568 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
569 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
570 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
571 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
572 );
573 CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
574 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
575 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
576 );
577 CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
578 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
579 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
580 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
581 );
582 CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
583 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
584 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
585 content_type VARCHAR, date DATE, _source VARCHAR
586 );
587 CREATE TABLE IF NOT EXISTS remote_placeholder.events (
588 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
592 );
593 "#,
594 )?;
595
596 conn.execute_batch(
598 r#"
599 CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
600 CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
601 CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
602 CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
603
604 CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
605 CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
606 CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
607 CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
608
609 CREATE OR REPLACE VIEW main.sessions AS
610 SELECT *, 'local' as _source FROM local.sessions
611 UNION ALL BY NAME SELECT * FROM caches.sessions;
612 CREATE OR REPLACE VIEW main.invocations AS
613 SELECT *, 'local' as _source FROM local.invocations
614 UNION ALL BY NAME SELECT * FROM caches.invocations;
615 CREATE OR REPLACE VIEW main.outputs AS
616 SELECT *, 'local' as _source FROM local.outputs
617 UNION ALL BY NAME SELECT * FROM caches.outputs;
618 CREATE OR REPLACE VIEW main.events AS
619 SELECT *, 'local' as _source FROM local.events
620 UNION ALL BY NAME SELECT * FROM caches.events;
621
622 CREATE OR REPLACE VIEW unified.sessions AS
623 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
624 CREATE OR REPLACE VIEW unified.invocations AS
625 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
626 CREATE OR REPLACE VIEW unified.outputs AS
627 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
628 CREATE OR REPLACE VIEW unified.events AS
629 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
630
631 -- Qualified views: deduplicated with source list
632 CREATE OR REPLACE VIEW unified.qualified_sessions AS
633 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
634 FROM unified.sessions GROUP BY ALL;
635 CREATE OR REPLACE VIEW unified.qualified_invocations AS
636 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
637 FROM unified.invocations GROUP BY ALL;
638 CREATE OR REPLACE VIEW unified.qualified_outputs AS
639 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
640 FROM unified.outputs GROUP BY ALL;
641 CREATE OR REPLACE VIEW unified.qualified_events AS
642 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
643 FROM unified.events GROUP BY ALL;
644 "#,
645 )?;
646
647 Ok(())
648 }
649
650 fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
653 let has_s3 = self.config.remotes.iter().any(|r| {
655 r.remote_type == crate::config::RemoteType::S3
656 });
657
658 if !has_s3 {
659 return Ok(());
660 }
661
662 conn.execute("LOAD httpfs", [])?;
664
665 for remote in &self.config.remotes {
667 if remote.remote_type == crate::config::RemoteType::S3 {
668 if let Some(provider) = &remote.credential_provider {
669 let secret_sql = format!(
670 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
671 remote.name, provider
672 );
673 if let Err(e) = conn.execute(&secret_sql, []) {
674 eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
675 }
676 }
677 }
678 }
679
680 Ok(())
681 }
682
683 fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
690 let blob_roots = self.config.blob_roots();
691
692 let roots_sql: String = blob_roots
694 .iter()
695 .map(|r| format!("'{}'", r.replace('\'', "''")))
696 .collect::<Vec<_>>()
697 .join(", ");
698
699 conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
701
702 conn.execute(
704 r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
705 ref[:5] = 'data:' OR ref[:5] = 'data+'
706 )"#,
707 [],
708 )?;
709
710 conn.execute(
712 r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
713 ref[:5] = 'file:'
714 )"#,
715 [],
716 )?;
717
718 conn.execute(
723 r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
724 CASE
725 WHEN is_inline_data(ref) THEN [ref]
726 WHEN is_file_ref(ref) THEN
727 [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
728 ELSE [ref]
729 END
730 )"#,
731 [],
732 )?;
733
734 Ok(())
735 }
736
737 fn attach_remotes(&self, conn: &Connection) -> Result<()> {
740 for remote in self.config.auto_attach_remotes() {
741 let attach_sql = remote.attach_sql();
743 if let Err(e) = conn.execute(&attach_sql, []) {
744 eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
745 }
746 }
747
748 Ok(())
749 }
750
751 fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
759 let remotes = self.config.auto_attach_remotes();
760 if remotes.is_empty() {
761 return Ok(());
762 }
763
764 for remote in &remotes {
766 let schema = remote.quoted_schema_name();
767 let name = &remote.name;
768 let safe_name = name.replace('-', "_").replace('.', "_");
770
771 for table in &["sessions", "invocations", "outputs", "events"] {
772 let macro_name = format!("\"remote_{safe_name}_{table}\"");
773 let sql = format!(
774 r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
775 SELECT *, '{name}' as _source FROM {schema}.{table}
776 )"#,
777 macro_name = macro_name,
778 name = name,
779 schema = schema,
780 table = table
781 );
782 if let Err(e) = conn.execute(&sql, []) {
783 eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
784 }
785 }
786 }
787
788 for table in &["sessions", "invocations", "outputs", "events"] {
790 let mut union_parts: Vec<String> = remotes
791 .iter()
792 .map(|r| {
793 let safe_name = r.name.replace('-', "_").replace('.', "_");
794 format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
795 })
796 .collect();
797
798 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
800
801 let sql = format!(
802 r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
803 {union}
804 )"#,
805 table = table,
806 union = union_parts.join(" UNION ALL BY NAME ")
807 );
808 if let Err(e) = conn.execute(&sql, []) {
809 eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
810 }
811 }
812
813 Ok(())
814 }
815
816 fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
823 let cwd = std::env::current_dir()
824 .map(|p| p.to_string_lossy().to_string())
825 .unwrap_or_default();
826 let cwd_escaped = cwd.replace('\'', "''");
827
828 let macros = format!(
830 r#"
831 CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
832 SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
833 );
834 CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
835 SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
836 );
837 CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
838 SELECT o.* FROM main.outputs o
839 JOIN main.invocations i ON o.invocation_id = i.id
840 WHERE i.cwd LIKE '{}%'
841 );
842 CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
843 SELECT e.* FROM main.events e
844 JOIN main.invocations i ON e.invocation_id = i.id
845 WHERE i.cwd LIKE '{}%'
846 );
847 "#,
848 cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
849 );
850
851 conn.execute_batch(¯os)?;
852 Ok(())
853 }
854
855 pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
857 conn.execute("LOAD httpfs", [])?;
859
860 if let Some(provider) = &remote.credential_provider {
862 if remote.remote_type == crate::config::RemoteType::S3 {
863 let secret_sql = format!(
864 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
865 remote.name, provider
866 );
867 conn.execute(&secret_sql, [])?;
868 }
869 }
870
871 conn.execute(&remote.attach_sql(), [])?;
873 Ok(())
874 }
875
876 pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
878 conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
879 Ok(())
880 }
881
882 pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
884 let conn = self.connection_with_options(false)?;
885 self.attach_remote(&conn, remote)?;
886
887 let test_sql = format!(
889 "SELECT 1 FROM {}.invocations LIMIT 1",
890 remote.quoted_schema_name()
891 );
892 conn.execute(&test_sql, [])?;
893
894 Ok(())
895 }
896
897 pub fn config(&self) -> &Config {
899 &self.config
900 }
901
902 pub fn query(&self, sql: &str) -> Result<QueryResult> {
906 let conn = self.connection()?;
907 let mut stmt = conn.prepare(sql)?;
908
909 let mut rows_iter = stmt.query([])?;
911
912 let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
914 let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
915 (0..column_count)
916 .map(|i| {
917 row_ref
918 .column_name(i)
919 .map(|s| s.to_string())
920 .unwrap_or_else(|_| format!("col{}", i))
921 })
922 .collect()
923 } else {
924 Vec::new()
925 };
926
927 let mut result_rows = Vec::new();
929 while let Some(row) = rows_iter.next()? {
930 let mut values = Vec::with_capacity(column_count);
931 for i in 0..column_count {
932 let value = match row.get_ref(i)? {
934 ValueRef::Null => "NULL".to_string(),
935 ValueRef::Boolean(b) => b.to_string(),
936 ValueRef::TinyInt(n) => n.to_string(),
937 ValueRef::SmallInt(n) => n.to_string(),
938 ValueRef::Int(n) => n.to_string(),
939 ValueRef::BigInt(n) => n.to_string(),
940 ValueRef::HugeInt(n) => n.to_string(),
941 ValueRef::UTinyInt(n) => n.to_string(),
942 ValueRef::USmallInt(n) => n.to_string(),
943 ValueRef::UInt(n) => n.to_string(),
944 ValueRef::UBigInt(n) => n.to_string(),
945 ValueRef::Float(f) => f.to_string(),
946 ValueRef::Double(f) => f.to_string(),
947 ValueRef::Decimal(d) => d.to_string(),
948 ValueRef::Timestamp(unit, val) => {
949 let micros = match unit {
951 TimeUnit::Second => val * 1_000_000,
952 TimeUnit::Millisecond => val * 1_000,
953 TimeUnit::Microsecond => val,
954 TimeUnit::Nanosecond => val / 1_000,
955 };
956 DateTime::<Utc>::from_timestamp_micros(micros)
957 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
958 .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
959 }
960 ValueRef::Date32(days) => {
961 NaiveDate::from_ymd_opt(1970, 1, 1)
963 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
964 .map(|d| d.format("%Y-%m-%d").to_string())
965 .unwrap_or_else(|| format!("<invalid date {}>", days))
966 }
967 ValueRef::Time64(unit, val) => {
968 let micros = match unit {
970 TimeUnit::Second => val * 1_000_000,
971 TimeUnit::Millisecond => val * 1_000,
972 TimeUnit::Microsecond => val,
973 TimeUnit::Nanosecond => val / 1_000,
974 };
975 let secs = (micros / 1_000_000) as u32;
976 let micro_part = (micros % 1_000_000) as u32;
977 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
978 .map(|t| t.format("%H:%M:%S").to_string())
979 .unwrap_or_else(|| format!("<invalid time {}>", val))
980 }
981 ValueRef::Interval { months, days, nanos } => {
982 format!("{} months {} days {} ns", months, days, nanos)
983 }
984 ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
985 ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
986 other => {
987 let owned: Value = other.into();
989 format_value(&owned)
990 }
991 };
992 values.push(value);
993 }
994 result_rows.push(values);
995 }
996
997 Ok(QueryResult {
998 columns: column_names,
999 rows: result_rows,
1000 })
1001 }
1002
1003 pub fn last_invocation_with_output(
1005 &self,
1006 ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1007 if let Some(inv) = self.last_invocation()? {
1008 let output = self.get_output(&inv.id)?;
1009 Ok(Some((inv, output)))
1010 } else {
1011 Ok(None)
1012 }
1013 }
1014
1015 pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1021 let invocation = batch
1022 .invocation
1023 .as_ref()
1024 .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1025
1026 match self.config.storage_mode {
1027 StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1028 StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1029 }
1030 }
1031
1032 fn write_batch_parquet(
1034 &self,
1035 batch: &InvocationBatch,
1036 invocation: &InvocationRecord,
1037 ) -> Result<()> {
1038 if let Some(ref session) = batch.session {
1044 self.ensure_session(session)?;
1045 }
1046
1047 self.write_invocation(invocation)?;
1049
1050 let date = invocation.date();
1051 let inv_id = invocation.id;
1052
1053 for (stream, content) in &batch.outputs {
1055 self.store_output(
1056 inv_id,
1057 stream,
1058 content,
1059 date,
1060 invocation.executable.as_deref(),
1061 )?;
1062 }
1063
1064 if let Some(ref events) = batch.events {
1066 if !events.is_empty() {
1067 self.write_events(events)?;
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073
1074 fn write_batch_duckdb(
1076 &self,
1077 batch: &InvocationBatch,
1078 invocation: &InvocationRecord,
1079 ) -> Result<()> {
1080 let conn = self.connection()?;
1081
1082 conn.execute("BEGIN TRANSACTION", [])?;
1084
1085 let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1086
1087 match result {
1088 Ok(()) => {
1089 conn.execute("COMMIT", [])?;
1090 Ok(())
1091 }
1092 Err(e) => {
1093 let _ = conn.execute("ROLLBACK", []);
1095 Err(e)
1096 }
1097 }
1098 }
1099
1100 fn write_batch_duckdb_inner(
1102 &self,
1103 conn: &Connection,
1104 batch: &InvocationBatch,
1105 invocation: &InvocationRecord,
1106 ) -> Result<()> {
1107 use base64::Engine;
1108
1109 let date = invocation.date();
1110 let inv_id = invocation.id;
1111
1112 if let Some(ref session) = batch.session {
1114 let exists: i64 = conn
1116 .query_row(
1117 "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1118 params![&session.session_id],
1119 |row| row.get(0),
1120 )
1121 .unwrap_or(0);
1122
1123 if exists == 0 {
1124 conn.execute(
1125 r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1126 params![
1127 session.session_id,
1128 session.client_id,
1129 session.invoker,
1130 session.invoker_pid,
1131 session.invoker_type,
1132 session.registered_at.to_rfc3339(),
1133 session.cwd,
1134 session.date.to_string(),
1135 ],
1136 )?;
1137 }
1138 }
1139
1140 conn.execute(
1142 r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1143 params![
1144 invocation.id.to_string(),
1145 invocation.session_id,
1146 invocation.timestamp.to_rfc3339(),
1147 invocation.duration_ms,
1148 invocation.cwd,
1149 invocation.cmd,
1150 invocation.executable,
1151 invocation.exit_code,
1152 invocation.format_hint,
1153 invocation.client_id,
1154 invocation.hostname,
1155 invocation.username,
1156 invocation.tag,
1157 date.to_string(),
1158 ],
1159 )?;
1160
1161 for (stream, content) in &batch.outputs {
1163 let hash = blake3::hash(content);
1165 let hash_hex = hash.to_hex().to_string();
1166
1167 let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1169 let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1171 let data_url = format!("data:application/octet-stream;base64,{}", b64);
1172 ("inline".to_string(), data_url)
1173 } else {
1174 let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1176 let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1177
1178 if let Some(parent) = blob_path.parent() {
1179 fs::create_dir_all(parent)?;
1180 }
1181
1182 let rel_path = blob_path
1183 .strip_prefix(&self.config.data_dir())
1184 .map(|p| p.to_string_lossy().to_string())
1185 .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1186
1187 let wrote_new = atomic::write_file(&blob_path, content)?;
1189
1190 if wrote_new {
1191 conn.execute(
1192 "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1193 params![&hash_hex, content.len() as i64, &rel_path],
1194 )?;
1195 } else {
1196 conn.execute(
1197 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1198 params![&hash_hex],
1199 )?;
1200 }
1201
1202 ("blob".to_string(), format!("file://{}", rel_path))
1203 };
1204
1205 let output_id = uuid::Uuid::now_v7();
1207 conn.execute(
1208 r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1209 params![
1210 output_id.to_string(),
1211 inv_id.to_string(),
1212 stream,
1213 hash_hex,
1214 content.len() as i64,
1215 storage_type,
1216 storage_ref,
1217 Option::<String>::None, date.to_string(),
1219 ],
1220 )?;
1221 }
1222
1223 if let Some(ref events) = batch.events {
1225 for event in events {
1226 conn.execute(
1227 r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1228 params![
1229 event.id.to_string(),
1230 event.invocation_id.to_string(),
1231 event.client_id,
1232 event.hostname,
1233 event.event_type,
1234 event.severity,
1235 event.ref_file,
1236 event.ref_line,
1237 event.ref_column,
1238 event.message,
1239 event.error_code,
1240 event.test_name,
1241 event.status,
1242 event.format_used,
1243 event.date.to_string(),
1244 ],
1245 )?;
1246 }
1247 }
1248
1249 Ok(())
1250 }
1251
1252 pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1254 let path = self.config.format_hints_path();
1255
1256 if path.exists() {
1258 return crate::FormatHints::load(&path);
1259 }
1260
1261 let legacy_path = self.config.event_formats_path();
1263 if legacy_path.exists() {
1264 return crate::FormatHints::load(&legacy_path);
1265 }
1266
1267 Ok(crate::FormatHints::new())
1268 }
1269
1270 pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1272 hints.save(&self.config.format_hints_path())
1273 }
1274
1275 pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1284 let hints = self.load_format_hints()?;
1285 Ok(hints.detect(cmd).to_string())
1286 }
1287
1288 pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1293 let conn = self.connection()?;
1294
1295 let mut stmt = conn.prepare(
1296 "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1297 )?;
1298
1299 let rows = stmt.query_map([], |row| {
1300 Ok(BuiltinFormat {
1301 format: row.get(0)?,
1302 pattern: row.get::<_, String>(1)?, priority: row.get(2)?,
1304 })
1305 })?;
1306
1307 let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1308 Ok(results)
1309 }
1310
1311 pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1317 let hints = self.load_format_hints()?;
1318
1319 for hint in hints.hints() {
1321 if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1322 return Ok(FormatMatch {
1323 format: hint.format.clone(),
1324 source: FormatSource::UserDefined {
1325 pattern: hint.pattern.clone(),
1326 priority: hint.priority,
1327 },
1328 });
1329 }
1330 }
1331
1332 Ok(FormatMatch {
1334 format: hints.default_format().to_string(),
1335 source: FormatSource::Default,
1336 })
1337 }
1338}
1339
1340#[derive(Debug, Clone)]
1342pub struct BuiltinFormat {
1343 pub format: String,
1344 pub pattern: String,
1345 pub priority: i32,
1346}
1347
1348#[derive(Debug, Clone)]
1350pub struct FormatMatch {
1351 pub format: String,
1352 pub source: FormatSource,
1353}
1354
1355#[derive(Debug, Clone)]
1357pub enum FormatSource {
1358 UserDefined { pattern: String, priority: i32 },
1359 Builtin { pattern: String, priority: i32 },
1360 Default,
1361}
1362
1363#[derive(Debug)]
1365pub struct QueryResult {
1366 pub columns: Vec<String>,
1367 pub rows: Vec<Vec<String>>,
1368}
1369
1370fn sanitize_filename(s: &str) -> String {
1372 s.chars()
1373 .map(|c| match c {
1374 '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1375 ' ' => '-',
1376 c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1377 _ => '_',
1378 })
1379 .take(64)
1380 .collect()
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385 use super::*;
1386 use crate::init::initialize;
1387 use crate::schema::SessionRecord;
1388 use tempfile::TempDir;
1389
1390 fn setup_store() -> (TempDir, Store) {
1391 let tmp = TempDir::new().unwrap();
1392 let config = Config::with_root(tmp.path());
1393 initialize(&config).unwrap();
1394 let store = Store::open(config).unwrap();
1395 (tmp, store)
1396 }
1397
1398 fn setup_store_duckdb() -> (TempDir, Store) {
1399 let tmp = TempDir::new().unwrap();
1400 let config = Config::with_duckdb_mode(tmp.path());
1401 initialize(&config).unwrap();
1402 let store = Store::open(config).unwrap();
1403 (tmp, store)
1404 }
1405
1406 #[test]
1407 fn test_store_open_uninitialized_fails() {
1408 let tmp = TempDir::new().unwrap();
1409 let config = Config::with_root(tmp.path());
1410
1411 let result = Store::open(config);
1412 assert!(matches!(result, Err(Error::NotInitialized(_))));
1413 }
1414
1415 #[test]
1416 fn test_sanitize_filename() {
1417 assert_eq!(sanitize_filename("make test"), "make-test");
1418 assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1419 assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1420 }
1421
1422 #[test]
1425 fn test_batch_write_parquet_invocation_only() {
1426 let (_tmp, store) = setup_store();
1427
1428 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1429
1430 let batch = InvocationBatch::new(inv);
1431 store.write_batch(&batch).unwrap();
1432
1433 assert_eq!(store.invocation_count().unwrap(), 1);
1434 }
1435
1436 #[test]
1437 fn test_batch_write_parquet_with_output() {
1438 let (_tmp, store) = setup_store();
1439
1440 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1441 let inv_id = inv.id;
1442
1443 let batch = InvocationBatch::new(inv)
1444 .with_output("stdout", b"hello world\n".to_vec());
1445
1446 store.write_batch(&batch).unwrap();
1447
1448 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1449 assert_eq!(outputs.len(), 1);
1450 assert_eq!(outputs[0].stream, "stdout");
1451 }
1452
1453 #[test]
1454 fn test_batch_write_parquet_with_session() {
1455 let (_tmp, store) = setup_store();
1456
1457 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1458 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1459
1460 let batch = InvocationBatch::new(inv).with_session(session);
1461 store.write_batch(&batch).unwrap();
1462
1463 assert!(store.session_exists("test-session").unwrap());
1464 }
1465
1466 #[test]
1467 fn test_batch_write_parquet_full() {
1468 let (_tmp, store) = setup_store();
1469
1470 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1471 let inv_id = inv.id;
1472 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1473
1474 let batch = InvocationBatch::new(inv)
1475 .with_session(session)
1476 .with_output("stdout", b"Building...\n".to_vec())
1477 .with_output("stderr", b"error: failed\n".to_vec());
1478
1479 store.write_batch(&batch).unwrap();
1480
1481 assert_eq!(store.invocation_count().unwrap(), 1);
1482 assert!(store.session_exists("test-session").unwrap());
1483
1484 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1485 assert_eq!(outputs.len(), 2);
1486 }
1487
1488 #[test]
1491 fn test_batch_write_duckdb_invocation_only() {
1492 let (_tmp, store) = setup_store_duckdb();
1493
1494 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1495
1496 let batch = InvocationBatch::new(inv);
1497 store.write_batch(&batch).unwrap();
1498
1499 assert_eq!(store.invocation_count().unwrap(), 1);
1500 }
1501
1502 #[test]
1503 fn test_batch_write_duckdb_with_output() {
1504 let (_tmp, store) = setup_store_duckdb();
1505
1506 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1507 let inv_id = inv.id;
1508
1509 let batch = InvocationBatch::new(inv)
1510 .with_output("stdout", b"hello world\n".to_vec());
1511
1512 store.write_batch(&batch).unwrap();
1513
1514 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1515 assert_eq!(outputs.len(), 1);
1516 assert_eq!(outputs[0].stream, "stdout");
1517 }
1518
1519 #[test]
1520 fn test_batch_write_duckdb_with_session() {
1521 let (_tmp, store) = setup_store_duckdb();
1522
1523 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1524 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1525
1526 let batch = InvocationBatch::new(inv).with_session(session);
1527 store.write_batch(&batch).unwrap();
1528
1529 assert!(store.session_exists("test-session").unwrap());
1530 }
1531
1532 #[test]
1533 fn test_batch_write_duckdb_full() {
1534 let (_tmp, store) = setup_store_duckdb();
1535
1536 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1537 let inv_id = inv.id;
1538 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1539
1540 let batch = InvocationBatch::new(inv)
1541 .with_session(session)
1542 .with_output("stdout", b"Building...\n".to_vec())
1543 .with_output("stderr", b"error: failed\n".to_vec());
1544
1545 store.write_batch(&batch).unwrap();
1546
1547 assert_eq!(store.invocation_count().unwrap(), 1);
1548 assert!(store.session_exists("test-session").unwrap());
1549
1550 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1551 assert_eq!(outputs.len(), 2);
1552 }
1553
1554 #[test]
1555 fn test_batch_requires_invocation() {
1556 let (_tmp, store) = setup_store();
1557
1558 let batch = InvocationBatch::default();
1559 let result = store.write_batch(&batch);
1560
1561 assert!(result.is_err());
1562 }
1563
1564 #[test]
1567 fn test_ensure_extension_parquet() {
1568 let conn = duckdb::Connection::open_in_memory().unwrap();
1570 let result = ensure_extension(&conn, "parquet").unwrap();
1571 assert!(result, "parquet extension should be loadable");
1572 }
1573
1574 #[test]
1575 fn test_ensure_extension_icu() {
1576 let conn = duckdb::Connection::open_in_memory().unwrap();
1578 let result = ensure_extension(&conn, "icu").unwrap();
1579 assert!(result, "icu extension should be loadable");
1580 }
1581
1582 #[test]
1583 fn test_ensure_extension_community() {
1584 let conn = duckdb::Connection::open_in_memory().unwrap();
1586 conn.execute("SET allow_community_extensions = true", []).unwrap();
1587
1588 let result = ensure_extension(&conn, "scalarfs").unwrap();
1590 assert!(result, "scalarfs extension should be loadable from community");
1591
1592 let result = ensure_extension(&conn, "duck_hunt").unwrap();
1593 assert!(result, "duck_hunt extension should be loadable from community");
1594 }
1595
1596 #[test]
1597 fn test_ensure_extension_nonexistent() {
1598 let conn = duckdb::Connection::open_in_memory().unwrap();
1599 conn.execute("SET allow_community_extensions = true", []).unwrap();
1600
1601 let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1603 assert!(!result, "nonexistent extension should return false");
1604 }
1605
1606 #[test]
1607 fn test_extension_loading_is_cached() {
1608 let conn = duckdb::Connection::open_in_memory().unwrap();
1610
1611 ensure_extension(&conn, "parquet").unwrap();
1613
1614 let start = std::time::Instant::now();
1616 ensure_extension(&conn, "parquet").unwrap();
1617 let elapsed = start.elapsed();
1618
1619 assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1621 }
1622}