1mod atomic;
6mod compact;
7mod events;
8mod invocations;
9mod outputs;
10mod remote;
11mod sessions;
12
13use std::fs;
14use std::thread;
15use std::time::Duration;
16
17use chrono::{DateTime, NaiveDate, NaiveTime, TimeDelta, Utc};
18use duckdb::{
19 params,
20 types::{TimeUnit, Value, ValueRef},
21 Connection,
22};
23
24use crate::config::StorageMode;
25use crate::schema::{EventRecord, InvocationRecord, SessionRecord};
26use crate::{Config, Error, Result};
27
28fn format_value(value: &Value) -> String {
31 match value {
32 Value::Null => "NULL".to_string(),
33 Value::Boolean(b) => b.to_string(),
34 Value::TinyInt(n) => n.to_string(),
35 Value::SmallInt(n) => n.to_string(),
36 Value::Int(n) => n.to_string(),
37 Value::BigInt(n) => n.to_string(),
38 Value::HugeInt(n) => n.to_string(),
39 Value::UTinyInt(n) => n.to_string(),
40 Value::USmallInt(n) => n.to_string(),
41 Value::UInt(n) => n.to_string(),
42 Value::UBigInt(n) => n.to_string(),
43 Value::Float(f) => f.to_string(),
44 Value::Double(f) => f.to_string(),
45 Value::Decimal(d) => d.to_string(),
46 Value::Timestamp(_, micros) => {
47 DateTime::<Utc>::from_timestamp_micros(*micros)
48 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
49 .unwrap_or_else(|| format!("<timestamp {}>", micros))
50 }
51 Value::Text(s) => s.clone(),
52 Value::Blob(b) => format!("<blob {} bytes>", b.len()),
53 Value::Date32(days) => {
54 NaiveDate::from_ymd_opt(1970, 1, 1)
55 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(*days as i64)))
56 .map(|d| d.format("%Y-%m-%d").to_string())
57 .unwrap_or_else(|| format!("<date {}>", days))
58 }
59 Value::Time64(_, micros) => {
60 let secs = (*micros / 1_000_000) as u32;
61 let micro_part = (*micros % 1_000_000) as u32;
62 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
63 .map(|t| t.format("%H:%M:%S").to_string())
64 .unwrap_or_else(|| format!("<time {}>", micros))
65 }
66 Value::Interval { months, days, nanos } => {
67 format!("{} months {} days {} ns", months, days, nanos)
68 }
69 Value::List(items) => {
71 let formatted: Vec<String> = items.iter().map(format_value).collect();
72 format!("[{}]", formatted.join(", "))
73 }
74 Value::Array(items) => {
75 let formatted: Vec<String> = items.iter().map(format_value).collect();
76 format!("[{}]", formatted.join(", "))
77 }
78 Value::Map(map) => {
79 let formatted: Vec<String> = map
80 .iter()
81 .map(|(k, v)| format!("{}: {}", format_value(k), format_value(v)))
82 .collect();
83 format!("{{{}}}", formatted.join(", "))
84 }
85 Value::Struct(fields) => {
86 let formatted: Vec<String> = fields
87 .iter()
88 .map(|(k, v)| format!("{}: {}", k, format_value(v)))
89 .collect();
90 format!("{{{}}}", formatted.join(", "))
91 }
92 Value::Enum(s) => s.clone(),
93 _ => "<unknown>".to_string(),
94 }
95}
96
97pub use compact::{ArchiveStats, AutoCompactOptions, CompactOptions, CompactStats};
99pub use events::{EventFilters, EventSummary, FormatConfig, FormatRule};
100pub use invocations::InvocationSummary;
101pub use outputs::OutputInfo;
102pub use remote::{parse_since, PullOptions, PullStats, PushOptions, PushStats};
103
104#[derive(Debug, Default)]
112pub struct InvocationBatch {
113 pub invocation: Option<InvocationRecord>,
115
116 pub outputs: Vec<(String, Vec<u8>)>,
119
120 pub session: Option<SessionRecord>,
122
123 pub events: Option<Vec<EventRecord>>,
125}
126
127impl InvocationBatch {
128 pub fn new(invocation: InvocationRecord) -> Self {
130 Self {
131 invocation: Some(invocation),
132 outputs: Vec::new(),
133 session: None,
134 events: None,
135 }
136 }
137
138 pub fn with_output(mut self, stream: impl Into<String>, content: Vec<u8>) -> Self {
140 self.outputs.push((stream.into(), content));
141 self
142 }
143
144 pub fn with_session(mut self, session: SessionRecord) -> Self {
146 self.session = Some(session);
147 self
148 }
149
150 pub fn with_events(mut self, events: Vec<EventRecord>) -> Self {
152 self.events = Some(events);
153 self
154 }
155}
156
157#[derive(Debug, Clone, Default)]
171pub struct ConnectionOptions {
172 pub attach_remotes: bool,
176
177 pub attach_project: bool,
179
180 pub create_ephemeral_views: bool,
183
184 pub run_migration: bool,
187}
188
189impl ConnectionOptions {
190 pub fn full() -> Self {
192 Self {
193 attach_remotes: true,
194 attach_project: true,
195 create_ephemeral_views: true,
196 run_migration: false,
197 }
198 }
199
200 pub fn minimal() -> Self {
203 Self {
204 attach_remotes: false,
205 attach_project: false,
206 create_ephemeral_views: false,
207 run_migration: false,
208 }
209 }
210
211 pub fn for_migration() -> Self {
213 Self {
214 attach_remotes: false,
215 attach_project: false,
216 create_ephemeral_views: false,
217 run_migration: true,
218 }
219 }
220}
221
222fn ensure_extension(conn: &Connection, name: &str) -> Result<bool> {
231 if conn.execute(&format!("LOAD {}", name), []).is_ok() {
233 return Ok(true);
234 }
235
236 if conn.execute(&format!("INSTALL {}", name), []).is_ok()
238 && conn.execute(&format!("LOAD {}", name), []).is_ok()
239 {
240 return Ok(true);
241 }
242
243 if conn.execute(&format!("INSTALL {} FROM community", name), []).is_ok()
245 && conn.execute(&format!("LOAD {}", name), []).is_ok()
246 {
247 return Ok(true);
248 }
249
250 Ok(false)
251}
252
253pub struct Store {
255 config: Config,
256}
257
258impl Store {
259 pub fn open(config: Config) -> Result<Self> {
261 if !config.db_path().exists() {
262 return Err(Error::NotInitialized(config.bird_root.clone()));
263 }
264 Ok(Self { config })
265 }
266
267 fn open_connection_with_retry(&self) -> Result<Connection> {
273 const MAX_RETRIES: u32 = 10;
274 const INITIAL_DELAY_MS: u64 = 10;
275 const MAX_DELAY_MS: u64 = 1000;
276
277 let db_path = self.config.db_path();
278 let mut delay_ms = INITIAL_DELAY_MS;
279 let mut last_error = None;
280
281 for attempt in 0..MAX_RETRIES {
282 match Connection::open(&db_path) {
283 Ok(conn) => return Ok(conn),
284 Err(e) => {
285 let err_msg = e.to_string();
286 if err_msg.contains("Could not set lock")
288 || err_msg.contains("Conflicting lock")
289 || err_msg.contains("database is locked")
290 {
291 last_error = Some(e);
292 if attempt < MAX_RETRIES - 1 {
293 let jitter = (attempt as u64 * 7) % 10;
295 thread::sleep(Duration::from_millis(delay_ms + jitter));
296 delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
297 continue;
298 }
299 } else {
300 return Err(e.into());
302 }
303 }
304 }
305 }
306
307 Err(last_error
309 .map(|e| e.into())
310 .unwrap_or_else(|| Error::Storage("Failed to open database after retries".to_string())))
311 }
312
313 pub fn connection(&self) -> Result<Connection> {
315 self.connect(ConnectionOptions::full())
316 }
317
318 pub fn connection_with_options(&self, attach_remotes: bool) -> Result<Connection> {
320 let opts = if attach_remotes {
321 ConnectionOptions::full()
322 } else {
323 ConnectionOptions::minimal()
324 };
325 self.connect(opts)
326 }
327
328 pub fn connect(&self, opts: ConnectionOptions) -> Result<Connection> {
338 let conn = self.open_connection_with_retry()?;
339
340 conn.execute("SET allow_community_extensions = true", [])?;
344
345 for ext in ["parquet", "icu"] {
346 if !ensure_extension(&conn, ext)? {
347 return Err(Error::Extension(format!(
348 "Required extension '{}' could not be loaded",
349 ext
350 )));
351 }
352 }
353
354 for (ext, desc) in [
356 ("scalarfs", "data: URL support for inline blobs"),
357 ("duck_hunt", "log/output parsing for event extraction"),
358 ] {
359 if !ensure_extension(&conn, ext)? {
360 eprintln!("Warning: {} extension not available ({})", ext, desc);
361 }
362 }
363
364 conn.execute(
366 &format!(
367 "SET file_search_path = '{}'",
368 self.config.data_dir().display()
369 ),
370 [],
371 )?;
372
373 if opts.run_migration {
375 self.migrate_to_new_schema(&conn)?;
376 }
377
378 self.setup_s3_credentials(&conn)?;
381 self.setup_blob_resolution(&conn)?;
382
383 if opts.attach_remotes && !self.config.remotes.is_empty() {
385 self.attach_remotes(&conn)?;
386 self.create_remote_macros(&conn)?;
387 }
388
389 if opts.attach_project {
391 self.attach_project_db(&conn)?;
392 }
393
394 if opts.create_ephemeral_views {
397 self.create_cwd_macros(&conn)?;
398 }
399
400 Ok(conn)
401 }
402
403 fn attach_project_db(&self, conn: &Connection) -> Result<()> {
408 use crate::project::find_current_project;
409
410 let Some(project) = find_current_project() else {
411 return Ok(()); };
413
414 if !project.is_initialized() {
415 return Ok(()); }
417
418 if project.db_path == self.config.db_path() {
420 return Ok(());
421 }
422
423 let attach_sql = format!(
425 "ATTACH '{}' AS project (READ_ONLY)",
426 project.db_path.display()
427 );
428
429 if let Err(e) = conn.execute(&attach_sql, []) {
430 eprintln!("Note: Could not attach project database: {}", e);
432 }
433
434 Ok(())
435 }
436
437 fn migrate_to_new_schema(&self, conn: &Connection) -> Result<()> {
442 let local_exists: bool = conn
444 .query_row(
445 "SELECT COUNT(*) > 0 FROM information_schema.schemata WHERE schema_name = 'local'",
446 [],
447 |row| row.get(0),
448 )
449 .unwrap_or(false);
450
451 if local_exists {
452 return Ok(());
453 }
454
455 eprintln!("Note: Migrating to new schema architecture...");
461
462 conn.execute_batch(
464 r#"
465 CREATE SCHEMA IF NOT EXISTS local;
466 CREATE SCHEMA IF NOT EXISTS cached_placeholder;
467 CREATE SCHEMA IF NOT EXISTS remote_placeholder;
468 CREATE SCHEMA IF NOT EXISTS caches;
469 CREATE SCHEMA IF NOT EXISTS remotes;
470 CREATE SCHEMA IF NOT EXISTS unified;
471 CREATE SCHEMA IF NOT EXISTS cwd;
472 "#,
473 )?;
474
475 if self.config.storage_mode == crate::StorageMode::DuckDB {
477 conn.execute_batch(
478 r#"
479 CREATE TABLE IF NOT EXISTS local.sessions (
480 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
481 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
482 );
483 CREATE TABLE IF NOT EXISTS local.invocations (
484 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
485 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
486 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
487 tag VARCHAR, date DATE
488 );
489 CREATE TABLE IF NOT EXISTS local.outputs (
490 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
491 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
492 content_type VARCHAR, date DATE
493 );
494 CREATE TABLE IF NOT EXISTS local.events (
495 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
496 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
497 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
498 status VARCHAR, format_used VARCHAR, date DATE
499 );
500 "#,
501 )?;
502
503 let old_tables_exist: bool = conn
505 .query_row(
506 "SELECT COUNT(*) > 0 FROM duckdb_tables() WHERE table_name = 'sessions_table'",
507 [],
508 |row| row.get(0),
509 )
510 .unwrap_or(false);
511
512 if old_tables_exist {
513 conn.execute_batch(
514 r#"
515 INSERT INTO local.sessions SELECT * FROM sessions_table;
516 INSERT INTO local.invocations SELECT * FROM invocations_table;
517 INSERT INTO local.outputs SELECT * FROM outputs_table;
518 INSERT INTO local.events SELECT * FROM events_table;
519 "#,
520 )?;
521 }
522 } else {
523 conn.execute_batch(
525 r#"
526 CREATE OR REPLACE VIEW local.sessions AS
527 SELECT * EXCLUDE (filename) FROM read_parquet(
528 'recent/sessions/**/*.parquet',
529 union_by_name = true, hive_partitioning = true, filename = true
530 );
531 CREATE OR REPLACE VIEW local.invocations AS
532 SELECT * EXCLUDE (filename) FROM read_parquet(
533 'recent/invocations/**/*.parquet',
534 union_by_name = true, hive_partitioning = true, filename = true
535 );
536 CREATE OR REPLACE VIEW local.outputs AS
537 SELECT * EXCLUDE (filename) FROM read_parquet(
538 'recent/outputs/**/*.parquet',
539 union_by_name = true, hive_partitioning = true, filename = true
540 );
541 CREATE OR REPLACE VIEW local.events AS
542 SELECT * EXCLUDE (filename) FROM read_parquet(
543 'recent/events/**/*.parquet',
544 union_by_name = true, hive_partitioning = true, filename = true
545 );
546 "#,
547 )?;
548 }
549
550 conn.execute_batch(
552 r#"
553 CREATE TABLE IF NOT EXISTS cached_placeholder.sessions (
554 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
555 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
556 );
557 CREATE TABLE IF NOT EXISTS cached_placeholder.invocations (
558 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
559 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
560 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
561 );
562 CREATE TABLE IF NOT EXISTS cached_placeholder.outputs (
563 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
564 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
565 content_type VARCHAR, date DATE, _source VARCHAR
566 );
567 CREATE TABLE IF NOT EXISTS cached_placeholder.events (
568 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
569 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
570 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
571 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
572 );
573 CREATE TABLE IF NOT EXISTS remote_placeholder.sessions (
574 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
575 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE, _source VARCHAR
576 );
577 CREATE TABLE IF NOT EXISTS remote_placeholder.invocations (
578 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
579 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
580 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR, date DATE, _source VARCHAR
581 );
582 CREATE TABLE IF NOT EXISTS remote_placeholder.outputs (
583 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
584 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
585 content_type VARCHAR, date DATE, _source VARCHAR
586 );
587 CREATE TABLE IF NOT EXISTS remote_placeholder.events (
588 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591 status VARCHAR, format_used VARCHAR, date DATE, _source VARCHAR
592 );
593 "#,
594 )?;
595
596 conn.execute_batch(
598 r#"
599 CREATE OR REPLACE VIEW caches.sessions AS SELECT * FROM cached_placeholder.sessions;
600 CREATE OR REPLACE VIEW caches.invocations AS SELECT * FROM cached_placeholder.invocations;
601 CREATE OR REPLACE VIEW caches.outputs AS SELECT * FROM cached_placeholder.outputs;
602 CREATE OR REPLACE VIEW caches.events AS SELECT * FROM cached_placeholder.events;
603
604 CREATE OR REPLACE VIEW remotes.sessions AS SELECT * FROM remote_placeholder.sessions;
605 CREATE OR REPLACE VIEW remotes.invocations AS SELECT * FROM remote_placeholder.invocations;
606 CREATE OR REPLACE VIEW remotes.outputs AS SELECT * FROM remote_placeholder.outputs;
607 CREATE OR REPLACE VIEW remotes.events AS SELECT * FROM remote_placeholder.events;
608
609 CREATE OR REPLACE VIEW main.sessions AS
610 SELECT *, 'local' as _source FROM local.sessions
611 UNION ALL BY NAME SELECT * FROM caches.sessions;
612 CREATE OR REPLACE VIEW main.invocations AS
613 SELECT *, 'local' as _source FROM local.invocations
614 UNION ALL BY NAME SELECT * FROM caches.invocations;
615 CREATE OR REPLACE VIEW main.outputs AS
616 SELECT *, 'local' as _source FROM local.outputs
617 UNION ALL BY NAME SELECT * FROM caches.outputs;
618 CREATE OR REPLACE VIEW main.events AS
619 SELECT *, 'local' as _source FROM local.events
620 UNION ALL BY NAME SELECT * FROM caches.events;
621
622 CREATE OR REPLACE VIEW unified.sessions AS
623 SELECT * FROM main.sessions UNION ALL BY NAME SELECT * FROM remotes.sessions;
624 CREATE OR REPLACE VIEW unified.invocations AS
625 SELECT * FROM main.invocations UNION ALL BY NAME SELECT * FROM remotes.invocations;
626 CREATE OR REPLACE VIEW unified.outputs AS
627 SELECT * FROM main.outputs UNION ALL BY NAME SELECT * FROM remotes.outputs;
628 CREATE OR REPLACE VIEW unified.events AS
629 SELECT * FROM main.events UNION ALL BY NAME SELECT * FROM remotes.events;
630
631 -- Qualified views: deduplicated with source list
632 CREATE OR REPLACE VIEW unified.qualified_sessions AS
633 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
634 FROM unified.sessions GROUP BY ALL;
635 CREATE OR REPLACE VIEW unified.qualified_invocations AS
636 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
637 FROM unified.invocations GROUP BY ALL;
638 CREATE OR REPLACE VIEW unified.qualified_outputs AS
639 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
640 FROM unified.outputs GROUP BY ALL;
641 CREATE OR REPLACE VIEW unified.qualified_events AS
642 SELECT * EXCLUDE (_source), list(DISTINCT _source) as _sources
643 FROM unified.events GROUP BY ALL;
644 "#,
645 )?;
646
647 Ok(())
648 }
649
650 fn setup_s3_credentials(&self, conn: &Connection) -> Result<()> {
653 let has_s3 = self.config.remotes.iter().any(|r| {
655 r.remote_type == crate::config::RemoteType::S3
656 });
657
658 if !has_s3 {
659 return Ok(());
660 }
661
662 for remote in &self.config.remotes {
664 if remote.remote_type == crate::config::RemoteType::S3 {
665 if let Some(provider) = &remote.credential_provider {
666 let secret_sql = format!(
667 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
668 remote.name, provider
669 );
670 if let Err(e) = conn.execute(&secret_sql, []) {
671 eprintln!("Warning: Failed to create S3 secret for {}: {}", remote.name, e);
672 }
673 }
674 }
675 }
676
677 Ok(())
678 }
679
680 fn setup_blob_resolution(&self, conn: &Connection) -> Result<()> {
687 let blob_roots = self.config.blob_roots();
688
689 let roots_sql: String = blob_roots
691 .iter()
692 .map(|r| format!("'{}'", r.replace('\'', "''")))
693 .collect::<Vec<_>>()
694 .join(", ");
695
696 conn.execute(&format!("SET VARIABLE blob_roots = [{}]", roots_sql), [])?;
698
699 conn.execute(
701 r#"CREATE OR REPLACE MACRO is_inline_data(ref) AS (
702 ref[:5] = 'data:' OR ref[:5] = 'data+'
703 )"#,
704 [],
705 )?;
706
707 conn.execute(
709 r#"CREATE OR REPLACE MACRO is_file_ref(ref) AS (
710 ref[:5] = 'file:'
711 )"#,
712 [],
713 )?;
714
715 conn.execute(
720 r#"CREATE OR REPLACE MACRO resolve_storage_ref(ref) AS (
721 CASE
722 WHEN is_inline_data(ref) THEN [ref]
723 WHEN is_file_ref(ref) THEN
724 [format('{}/{}*', root, ref[6:]) FOR root IN getvariable('blob_roots')]
725 ELSE [ref]
726 END
727 )"#,
728 [],
729 )?;
730
731 Ok(())
732 }
733
734 fn attach_remotes(&self, conn: &Connection) -> Result<()> {
737 for remote in self.config.auto_attach_remotes() {
738 let attach_sql = remote.attach_sql();
740 if let Err(e) = conn.execute(&attach_sql, []) {
741 eprintln!("Warning: Failed to attach remote {}: {}", remote.name, e);
742 }
743 }
744
745 Ok(())
746 }
747
748 fn create_remote_macros(&self, conn: &Connection) -> Result<()> {
756 let remotes = self.config.auto_attach_remotes();
757 if remotes.is_empty() {
758 return Ok(());
759 }
760
761 for remote in &remotes {
763 let schema = remote.quoted_schema_name();
764 let name = &remote.name;
765 let safe_name = name.replace(['-', '.'], "_");
767
768 for table in &["sessions", "invocations", "outputs", "events"] {
769 let macro_name = format!("\"remote_{safe_name}_{table}\"");
770 let sql = format!(
771 r#"CREATE OR REPLACE TEMPORARY MACRO {macro_name}() AS TABLE (
772 SELECT *, '{name}' as _source FROM {schema}.{table}
773 )"#,
774 macro_name = macro_name,
775 name = name,
776 schema = schema,
777 table = table
778 );
779 if let Err(e) = conn.execute(&sql, []) {
780 eprintln!("Warning: Failed to create macro {}: {}", macro_name, e);
781 }
782 }
783 }
784
785 for table in &["sessions", "invocations", "outputs", "events"] {
787 let mut union_parts: Vec<String> = remotes
788 .iter()
789 .map(|r| {
790 let safe_name = r.name.replace(['-', '.'], "_");
791 format!("SELECT * FROM \"remote_{safe_name}_{table}\"()", safe_name = safe_name, table = table)
792 })
793 .collect();
794
795 union_parts.push(format!("SELECT * FROM remote_placeholder.{}", table));
797
798 let sql = format!(
799 r#"CREATE OR REPLACE TEMPORARY MACRO remotes_{table}() AS TABLE (
800 {union}
801 )"#,
802 table = table,
803 union = union_parts.join(" UNION ALL BY NAME ")
804 );
805 if let Err(e) = conn.execute(&sql, []) {
806 eprintln!("Warning: Failed to create remotes_{} macro: {}", table, e);
807 }
808 }
809
810 Ok(())
811 }
812
813 fn create_cwd_macros(&self, conn: &Connection) -> Result<()> {
820 let cwd = std::env::current_dir()
821 .map(|p| p.to_string_lossy().to_string())
822 .unwrap_or_default();
823 let cwd_escaped = cwd.replace('\'', "''");
824
825 let macros = format!(
827 r#"
828 CREATE OR REPLACE TEMPORARY MACRO cwd_sessions() AS TABLE (
829 SELECT * FROM main.sessions WHERE cwd LIKE '{}%'
830 );
831 CREATE OR REPLACE TEMPORARY MACRO cwd_invocations() AS TABLE (
832 SELECT * FROM main.invocations WHERE cwd LIKE '{}%'
833 );
834 CREATE OR REPLACE TEMPORARY MACRO cwd_outputs() AS TABLE (
835 SELECT o.* FROM main.outputs o
836 JOIN main.invocations i ON o.invocation_id = i.id
837 WHERE i.cwd LIKE '{}%'
838 );
839 CREATE OR REPLACE TEMPORARY MACRO cwd_events() AS TABLE (
840 SELECT e.* FROM main.events e
841 JOIN main.invocations i ON e.invocation_id = i.id
842 WHERE i.cwd LIKE '{}%'
843 );
844 "#,
845 cwd_escaped, cwd_escaped, cwd_escaped, cwd_escaped
846 );
847
848 conn.execute_batch(¯os)?;
849 Ok(())
850 }
851
852 pub fn attach_remote(&self, conn: &Connection, remote: &crate::RemoteConfig) -> Result<()> {
854 if let Some(provider) = &remote.credential_provider {
856 if remote.remote_type == crate::config::RemoteType::S3 {
857 let secret_sql = format!(
858 "CREATE SECRET IF NOT EXISTS \"bird_{}\" (TYPE s3, PROVIDER {})",
859 remote.name, provider
860 );
861 conn.execute(&secret_sql, [])?;
862 }
863 }
864
865 conn.execute(&remote.attach_sql(), [])?;
867 Ok(())
868 }
869
870 pub fn detach_remote(&self, conn: &Connection, name: &str) -> Result<()> {
872 conn.execute(&format!("DETACH \"remote_{}\"", name), [])?;
873 Ok(())
874 }
875
876 pub fn test_remote(&self, remote: &crate::RemoteConfig) -> Result<()> {
878 let conn = self.connection_with_options(false)?;
879 self.attach_remote(&conn, remote)?;
880
881 let test_sql = format!(
883 "SELECT 1 FROM {}.invocations LIMIT 1",
884 remote.quoted_schema_name()
885 );
886 conn.execute(&test_sql, [])?;
887
888 Ok(())
889 }
890
891 pub fn config(&self) -> &Config {
893 &self.config
894 }
895
896 pub fn query(&self, sql: &str) -> Result<QueryResult> {
900 let conn = self.connection()?;
901 let mut stmt = conn.prepare(sql)?;
902
903 let mut rows_iter = stmt.query([])?;
905
906 let column_count = rows_iter.as_ref().map(|r| r.column_count()).unwrap_or(0);
908 let column_names: Vec<String> = if let Some(row_ref) = rows_iter.as_ref() {
909 (0..column_count)
910 .map(|i| {
911 row_ref
912 .column_name(i)
913 .map(|s| s.to_string())
914 .unwrap_or_else(|_| format!("col{}", i))
915 })
916 .collect()
917 } else {
918 Vec::new()
919 };
920
921 let mut result_rows = Vec::new();
923 while let Some(row) = rows_iter.next()? {
924 let mut values = Vec::with_capacity(column_count);
925 for i in 0..column_count {
926 let value = match row.get_ref(i)? {
928 ValueRef::Null => "NULL".to_string(),
929 ValueRef::Boolean(b) => b.to_string(),
930 ValueRef::TinyInt(n) => n.to_string(),
931 ValueRef::SmallInt(n) => n.to_string(),
932 ValueRef::Int(n) => n.to_string(),
933 ValueRef::BigInt(n) => n.to_string(),
934 ValueRef::HugeInt(n) => n.to_string(),
935 ValueRef::UTinyInt(n) => n.to_string(),
936 ValueRef::USmallInt(n) => n.to_string(),
937 ValueRef::UInt(n) => n.to_string(),
938 ValueRef::UBigInt(n) => n.to_string(),
939 ValueRef::Float(f) => f.to_string(),
940 ValueRef::Double(f) => f.to_string(),
941 ValueRef::Decimal(d) => d.to_string(),
942 ValueRef::Timestamp(unit, val) => {
943 let micros = match unit {
945 TimeUnit::Second => val * 1_000_000,
946 TimeUnit::Millisecond => val * 1_000,
947 TimeUnit::Microsecond => val,
948 TimeUnit::Nanosecond => val / 1_000,
949 };
950 DateTime::<Utc>::from_timestamp_micros(micros)
951 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
952 .unwrap_or_else(|| format!("<invalid timestamp {}>", val))
953 }
954 ValueRef::Date32(days) => {
955 NaiveDate::from_ymd_opt(1970, 1, 1)
957 .and_then(|epoch| epoch.checked_add_signed(TimeDelta::days(days as i64)))
958 .map(|d| d.format("%Y-%m-%d").to_string())
959 .unwrap_or_else(|| format!("<invalid date {}>", days))
960 }
961 ValueRef::Time64(unit, val) => {
962 let micros = match unit {
964 TimeUnit::Second => val * 1_000_000,
965 TimeUnit::Millisecond => val * 1_000,
966 TimeUnit::Microsecond => val,
967 TimeUnit::Nanosecond => val / 1_000,
968 };
969 let secs = (micros / 1_000_000) as u32;
970 let micro_part = (micros % 1_000_000) as u32;
971 NaiveTime::from_num_seconds_from_midnight_opt(secs, micro_part * 1000)
972 .map(|t| t.format("%H:%M:%S").to_string())
973 .unwrap_or_else(|| format!("<invalid time {}>", val))
974 }
975 ValueRef::Interval { months, days, nanos } => {
976 format!("{} months {} days {} ns", months, days, nanos)
977 }
978 ValueRef::Text(s) => String::from_utf8_lossy(s).to_string(),
979 ValueRef::Blob(b) => format!("<blob {} bytes>", b.len()),
980 other => {
981 let owned: Value = other.into();
983 format_value(&owned)
984 }
985 };
986 values.push(value);
987 }
988 result_rows.push(values);
989 }
990
991 Ok(QueryResult {
992 columns: column_names,
993 rows: result_rows,
994 })
995 }
996
997 pub fn last_invocation_with_output(
999 &self,
1000 ) -> Result<Option<(InvocationSummary, Option<OutputInfo>)>> {
1001 if let Some(inv) = self.last_invocation()? {
1002 let output = self.get_output(&inv.id)?;
1003 Ok(Some((inv, output)))
1004 } else {
1005 Ok(None)
1006 }
1007 }
1008
1009 pub fn write_batch(&self, batch: &InvocationBatch) -> Result<()> {
1015 let invocation = batch
1016 .invocation
1017 .as_ref()
1018 .ok_or_else(|| Error::Storage("Batch must contain an invocation".to_string()))?;
1019
1020 match self.config.storage_mode {
1021 StorageMode::Parquet => self.write_batch_parquet(batch, invocation),
1022 StorageMode::DuckDB => self.write_batch_duckdb(batch, invocation),
1023 }
1024 }
1025
1026 fn write_batch_parquet(
1028 &self,
1029 batch: &InvocationBatch,
1030 invocation: &InvocationRecord,
1031 ) -> Result<()> {
1032 if let Some(ref session) = batch.session {
1038 self.ensure_session(session)?;
1039 }
1040
1041 self.write_invocation(invocation)?;
1043
1044 let date = invocation.date();
1045 let inv_id = invocation.id;
1046
1047 for (stream, content) in &batch.outputs {
1049 self.store_output(
1050 inv_id,
1051 stream,
1052 content,
1053 date,
1054 invocation.executable.as_deref(),
1055 )?;
1056 }
1057
1058 if let Some(ref events) = batch.events {
1060 if !events.is_empty() {
1061 self.write_events(events)?;
1062 }
1063 }
1064
1065 Ok(())
1066 }
1067
1068 fn write_batch_duckdb(
1070 &self,
1071 batch: &InvocationBatch,
1072 invocation: &InvocationRecord,
1073 ) -> Result<()> {
1074 let conn = self.connection()?;
1075
1076 conn.execute("BEGIN TRANSACTION", [])?;
1078
1079 let result = self.write_batch_duckdb_inner(&conn, batch, invocation);
1080
1081 match result {
1082 Ok(()) => {
1083 conn.execute("COMMIT", [])?;
1084 Ok(())
1085 }
1086 Err(e) => {
1087 let _ = conn.execute("ROLLBACK", []);
1089 Err(e)
1090 }
1091 }
1092 }
1093
1094 fn write_batch_duckdb_inner(
1096 &self,
1097 conn: &Connection,
1098 batch: &InvocationBatch,
1099 invocation: &InvocationRecord,
1100 ) -> Result<()> {
1101 use base64::Engine;
1102
1103 let date = invocation.date();
1104 let inv_id = invocation.id;
1105
1106 if let Some(ref session) = batch.session {
1108 let exists: i64 = conn
1110 .query_row(
1111 "SELECT COUNT(*) FROM local.sessions WHERE session_id = ?",
1112 params![&session.session_id],
1113 |row| row.get(0),
1114 )
1115 .unwrap_or(0);
1116
1117 if exists == 0 {
1118 conn.execute(
1119 r#"INSERT INTO local.sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?)"#,
1120 params![
1121 session.session_id,
1122 session.client_id,
1123 session.invoker,
1124 session.invoker_pid,
1125 session.invoker_type,
1126 session.registered_at.to_rfc3339(),
1127 session.cwd,
1128 session.date.to_string(),
1129 ],
1130 )?;
1131 }
1132 }
1133
1134 conn.execute(
1136 r#"INSERT INTO local.invocations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1137 params![
1138 invocation.id.to_string(),
1139 invocation.session_id,
1140 invocation.timestamp.to_rfc3339(),
1141 invocation.duration_ms,
1142 invocation.cwd,
1143 invocation.cmd,
1144 invocation.executable,
1145 invocation.exit_code,
1146 invocation.format_hint,
1147 invocation.client_id,
1148 invocation.hostname,
1149 invocation.username,
1150 invocation.tag,
1151 date.to_string(),
1152 ],
1153 )?;
1154
1155 for (stream, content) in &batch.outputs {
1157 let hash = blake3::hash(content);
1159 let hash_hex = hash.to_hex().to_string();
1160
1161 let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
1163 let b64 = base64::engine::general_purpose::STANDARD.encode(content);
1165 let data_url = format!("data:application/octet-stream;base64,{}", b64);
1166 ("inline".to_string(), data_url)
1167 } else {
1168 let cmd_hint = invocation.executable.as_deref().unwrap_or("output");
1170 let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
1171
1172 if let Some(parent) = blob_path.parent() {
1173 fs::create_dir_all(parent)?;
1174 }
1175
1176 let rel_path = blob_path
1177 .strip_prefix(self.config.data_dir())
1178 .map(|p| p.to_string_lossy().to_string())
1179 .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
1180
1181 let wrote_new = atomic::write_file(&blob_path, content)?;
1183
1184 if wrote_new {
1185 conn.execute(
1186 "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
1187 params![&hash_hex, content.len() as i64, &rel_path],
1188 )?;
1189 } else {
1190 conn.execute(
1191 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
1192 params![&hash_hex],
1193 )?;
1194 }
1195
1196 ("blob".to_string(), format!("file://{}", rel_path))
1197 };
1198
1199 let output_id = uuid::Uuid::now_v7();
1201 conn.execute(
1202 r#"INSERT INTO local.outputs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1203 params![
1204 output_id.to_string(),
1205 inv_id.to_string(),
1206 stream,
1207 hash_hex,
1208 content.len() as i64,
1209 storage_type,
1210 storage_ref,
1211 Option::<String>::None, date.to_string(),
1213 ],
1214 )?;
1215 }
1216
1217 if let Some(ref events) = batch.events {
1219 for event in events {
1220 conn.execute(
1221 r#"INSERT INTO local.events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1222 params![
1223 event.id.to_string(),
1224 event.invocation_id.to_string(),
1225 event.client_id,
1226 event.hostname,
1227 event.event_type,
1228 event.severity,
1229 event.ref_file,
1230 event.ref_line,
1231 event.ref_column,
1232 event.message,
1233 event.error_code,
1234 event.test_name,
1235 event.status,
1236 event.format_used,
1237 event.date.to_string(),
1238 ],
1239 )?;
1240 }
1241 }
1242
1243 Ok(())
1244 }
1245
1246 pub fn load_format_hints(&self) -> Result<crate::FormatHints> {
1248 let path = self.config.format_hints_path();
1249
1250 if path.exists() {
1252 return crate::FormatHints::load(&path);
1253 }
1254
1255 let legacy_path = self.config.event_formats_path();
1257 if legacy_path.exists() {
1258 return crate::FormatHints::load(&legacy_path);
1259 }
1260
1261 Ok(crate::FormatHints::new())
1262 }
1263
1264 pub fn save_format_hints(&self, hints: &crate::FormatHints) -> Result<()> {
1266 hints.save(&self.config.format_hints_path())
1267 }
1268
1269 pub fn detect_format_for_command(&self, cmd: &str) -> Result<String> {
1278 let hints = self.load_format_hints()?;
1279 Ok(hints.detect(cmd).to_string())
1280 }
1281
1282 pub fn list_builtin_formats(&self) -> Result<Vec<BuiltinFormat>> {
1287 let conn = self.connection()?;
1288
1289 let mut stmt = conn.prepare(
1290 "SELECT format, description, priority FROM duck_hunt_formats() ORDER BY priority DESC, format"
1291 )?;
1292
1293 let rows = stmt.query_map([], |row| {
1294 Ok(BuiltinFormat {
1295 format: row.get(0)?,
1296 pattern: row.get::<_, String>(1)?, priority: row.get(2)?,
1298 })
1299 })?;
1300
1301 let results: Vec<_> = rows.filter_map(|r| r.ok()).collect();
1302 Ok(results)
1303 }
1304
1305 pub fn check_format(&self, cmd: &str) -> Result<FormatMatch> {
1311 let hints = self.load_format_hints()?;
1312
1313 for hint in hints.hints() {
1315 if crate::format_hints::pattern_matches(&hint.pattern, cmd) {
1316 return Ok(FormatMatch {
1317 format: hint.format.clone(),
1318 source: FormatSource::UserDefined {
1319 pattern: hint.pattern.clone(),
1320 priority: hint.priority,
1321 },
1322 });
1323 }
1324 }
1325
1326 Ok(FormatMatch {
1328 format: hints.default_format().to_string(),
1329 source: FormatSource::Default,
1330 })
1331 }
1332}
1333
1334#[derive(Debug, Clone)]
1336pub struct BuiltinFormat {
1337 pub format: String,
1338 pub pattern: String,
1339 pub priority: i32,
1340}
1341
1342#[derive(Debug, Clone)]
1344pub struct FormatMatch {
1345 pub format: String,
1346 pub source: FormatSource,
1347}
1348
1349#[derive(Debug, Clone)]
1351pub enum FormatSource {
1352 UserDefined { pattern: String, priority: i32 },
1353 Builtin { pattern: String, priority: i32 },
1354 Default,
1355}
1356
1357#[derive(Debug)]
1359pub struct QueryResult {
1360 pub columns: Vec<String>,
1361 pub rows: Vec<Vec<String>>,
1362}
1363
1364fn sanitize_filename(s: &str) -> String {
1366 s.chars()
1367 .map(|c| match c {
1368 '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
1369 ' ' => '-',
1370 c if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' => c,
1371 _ => '_',
1372 })
1373 .take(64)
1374 .collect()
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379 use super::*;
1380 use crate::init::initialize;
1381 use crate::schema::SessionRecord;
1382 use tempfile::TempDir;
1383
1384 fn setup_store() -> (TempDir, Store) {
1385 let tmp = TempDir::new().unwrap();
1386 let config = Config::with_root(tmp.path());
1387 initialize(&config).unwrap();
1388 let store = Store::open(config).unwrap();
1389 (tmp, store)
1390 }
1391
1392 fn setup_store_duckdb() -> (TempDir, Store) {
1393 let tmp = TempDir::new().unwrap();
1394 let config = Config::with_duckdb_mode(tmp.path());
1395 initialize(&config).unwrap();
1396 let store = Store::open(config).unwrap();
1397 (tmp, store)
1398 }
1399
1400 #[test]
1401 fn test_store_open_uninitialized_fails() {
1402 let tmp = TempDir::new().unwrap();
1403 let config = Config::with_root(tmp.path());
1404
1405 let result = Store::open(config);
1406 assert!(matches!(result, Err(Error::NotInitialized(_))));
1407 }
1408
1409 #[test]
1410 fn test_sanitize_filename() {
1411 assert_eq!(sanitize_filename("make test"), "make-test");
1412 assert_eq!(sanitize_filename("/usr/bin/gcc"), "_usr_bin_gcc");
1413 assert_eq!(sanitize_filename("a:b*c?d"), "a_b_c_d");
1414 }
1415
1416 #[test]
1419 fn test_batch_write_parquet_invocation_only() {
1420 let (_tmp, store) = setup_store();
1421
1422 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1423
1424 let batch = InvocationBatch::new(inv);
1425 store.write_batch(&batch).unwrap();
1426
1427 assert_eq!(store.invocation_count().unwrap(), 1);
1428 }
1429
1430 #[test]
1431 fn test_batch_write_parquet_with_output() {
1432 let (_tmp, store) = setup_store();
1433
1434 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1435 let inv_id = inv.id;
1436
1437 let batch = InvocationBatch::new(inv)
1438 .with_output("stdout", b"hello world\n".to_vec());
1439
1440 store.write_batch(&batch).unwrap();
1441
1442 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1443 assert_eq!(outputs.len(), 1);
1444 assert_eq!(outputs[0].stream, "stdout");
1445 }
1446
1447 #[test]
1448 fn test_batch_write_parquet_with_session() {
1449 let (_tmp, store) = setup_store();
1450
1451 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1452 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1453
1454 let batch = InvocationBatch::new(inv).with_session(session);
1455 store.write_batch(&batch).unwrap();
1456
1457 assert!(store.session_exists("test-session").unwrap());
1458 }
1459
1460 #[test]
1461 fn test_batch_write_parquet_full() {
1462 let (_tmp, store) = setup_store();
1463
1464 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1465 let inv_id = inv.id;
1466 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1467
1468 let batch = InvocationBatch::new(inv)
1469 .with_session(session)
1470 .with_output("stdout", b"Building...\n".to_vec())
1471 .with_output("stderr", b"error: failed\n".to_vec());
1472
1473 store.write_batch(&batch).unwrap();
1474
1475 assert_eq!(store.invocation_count().unwrap(), 1);
1476 assert!(store.session_exists("test-session").unwrap());
1477
1478 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1479 assert_eq!(outputs.len(), 2);
1480 }
1481
1482 #[test]
1485 fn test_batch_write_duckdb_invocation_only() {
1486 let (_tmp, store) = setup_store_duckdb();
1487
1488 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1489
1490 let batch = InvocationBatch::new(inv);
1491 store.write_batch(&batch).unwrap();
1492
1493 assert_eq!(store.invocation_count().unwrap(), 1);
1494 }
1495
1496 #[test]
1497 fn test_batch_write_duckdb_with_output() {
1498 let (_tmp, store) = setup_store_duckdb();
1499
1500 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1501 let inv_id = inv.id;
1502
1503 let batch = InvocationBatch::new(inv)
1504 .with_output("stdout", b"hello world\n".to_vec());
1505
1506 store.write_batch(&batch).unwrap();
1507
1508 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1509 assert_eq!(outputs.len(), 1);
1510 assert_eq!(outputs[0].stream, "stdout");
1511 }
1512
1513 #[test]
1514 fn test_batch_write_duckdb_with_session() {
1515 let (_tmp, store) = setup_store_duckdb();
1516
1517 let inv = InvocationRecord::new("test-session", "echo hello", "/home/user", 0, "test@client");
1518 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1519
1520 let batch = InvocationBatch::new(inv).with_session(session);
1521 store.write_batch(&batch).unwrap();
1522
1523 assert!(store.session_exists("test-session").unwrap());
1524 }
1525
1526 #[test]
1527 fn test_batch_write_duckdb_full() {
1528 let (_tmp, store) = setup_store_duckdb();
1529
1530 let inv = InvocationRecord::new("test-session", "make test", "/home/user", 1, "test@client");
1531 let inv_id = inv.id;
1532 let session = SessionRecord::new("test-session", "test@client", "bash", 12345, "shell");
1533
1534 let batch = InvocationBatch::new(inv)
1535 .with_session(session)
1536 .with_output("stdout", b"Building...\n".to_vec())
1537 .with_output("stderr", b"error: failed\n".to_vec());
1538
1539 store.write_batch(&batch).unwrap();
1540
1541 assert_eq!(store.invocation_count().unwrap(), 1);
1542 assert!(store.session_exists("test-session").unwrap());
1543
1544 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
1545 assert_eq!(outputs.len(), 2);
1546 }
1547
1548 #[test]
1549 fn test_batch_requires_invocation() {
1550 let (_tmp, store) = setup_store();
1551
1552 let batch = InvocationBatch::default();
1553 let result = store.write_batch(&batch);
1554
1555 assert!(result.is_err());
1556 }
1557
1558 #[test]
1561 fn test_ensure_extension_parquet() {
1562 let conn = duckdb::Connection::open_in_memory().unwrap();
1564 let result = ensure_extension(&conn, "parquet").unwrap();
1565 assert!(result, "parquet extension should be loadable");
1566 }
1567
1568 #[test]
1569 fn test_ensure_extension_icu() {
1570 let conn = duckdb::Connection::open_in_memory().unwrap();
1572 let result = ensure_extension(&conn, "icu").unwrap();
1573 assert!(result, "icu extension should be loadable");
1574 }
1575
1576 #[test]
1577 fn test_ensure_extension_community() {
1578 let conn = duckdb::Connection::open_in_memory().unwrap();
1580 conn.execute("SET allow_community_extensions = true", []).unwrap();
1581
1582 let result = ensure_extension(&conn, "scalarfs").unwrap();
1584 assert!(result, "scalarfs extension should be loadable from community");
1585
1586 let result = ensure_extension(&conn, "duck_hunt").unwrap();
1587 assert!(result, "duck_hunt extension should be loadable from community");
1588 }
1589
1590 #[test]
1591 fn test_ensure_extension_nonexistent() {
1592 let conn = duckdb::Connection::open_in_memory().unwrap();
1593 conn.execute("SET allow_community_extensions = true", []).unwrap();
1594
1595 let result = ensure_extension(&conn, "nonexistent_fake_extension_xyz").unwrap();
1597 assert!(!result, "nonexistent extension should return false");
1598 }
1599
1600 #[test]
1601 fn test_extension_loading_is_cached() {
1602 let conn = duckdb::Connection::open_in_memory().unwrap();
1604
1605 ensure_extension(&conn, "parquet").unwrap();
1607
1608 let start = std::time::Instant::now();
1610 ensure_extension(&conn, "parquet").unwrap();
1611 let elapsed = start.elapsed();
1612
1613 assert!(elapsed.as_millis() < 100, "cached extension load took {:?}", elapsed);
1615 }
1616}