1use std::fs;
20use std::path::{Path, PathBuf};
21
22use chrono::{NaiveDate, TimeDelta, Utc};
23use duckdb::Connection;
24
25use crate::config::RemoteType;
26use crate::{Error, RemoteConfig, Result};
27
28#[derive(Debug, Default)]
30pub struct BlobStats {
31 pub count: usize,
33 pub bytes: u64,
35 pub linked: usize,
37 pub copied: usize,
39 pub skipped: usize,
41}
42
43impl std::fmt::Display for BlobStats {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 if self.count == 0 {
46 write!(f, "0 blobs")
47 } else {
48 let kb = self.bytes / 1024;
49 write!(
50 f,
51 "{} blobs ({}KB, {} linked, {} copied, {} skipped)",
52 self.count, kb, self.linked, self.copied, self.skipped
53 )
54 }
55 }
56}
57
58#[derive(Debug, Default)]
60pub struct PushStats {
61 pub sessions: usize,
62 pub invocations: usize,
63 pub outputs: usize,
64 pub events: usize,
65 pub blobs: BlobStats,
66}
67
68impl std::fmt::Display for PushStats {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "{} sessions, {} invocations, {} outputs, {} events",
73 self.sessions, self.invocations, self.outputs, self.events
74 )?;
75 if self.blobs.count > 0 {
76 write!(f, ", {}", self.blobs)?;
77 }
78 Ok(())
79 }
80}
81
82#[derive(Debug, Default)]
84pub struct PullStats {
85 pub sessions: usize,
86 pub invocations: usize,
87 pub outputs: usize,
88 pub events: usize,
89 pub blobs: BlobStats,
90}
91
92impl std::fmt::Display for PullStats {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "{} sessions, {} invocations, {} outputs, {} events",
97 self.sessions, self.invocations, self.outputs, self.events
98 )?;
99 if self.blobs.count > 0 {
100 write!(f, ", {}", self.blobs)?;
101 }
102 Ok(())
103 }
104}
105
106#[derive(Debug, Default)]
108pub struct PushOptions {
109 pub since: Option<NaiveDate>,
111 pub dry_run: bool,
113 pub sync_blobs: bool,
115}
116
117#[derive(Debug, Default)]
119pub struct PullOptions {
120 pub since: Option<NaiveDate>,
122 pub client_id: Option<String>,
124 pub sync_blobs: bool,
126}
127
128pub fn parse_since(s: &str) -> Result<NaiveDate> {
134 let s = s.trim();
135
136 if let Some(days) = parse_duration_days(s) {
138 let date = Utc::now().date_naive() - TimeDelta::days(days);
139 return Ok(date);
140 }
141
142 NaiveDate::parse_from_str(s, "%Y-%m-%d")
144 .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
145}
146
147fn parse_duration_days(s: &str) -> Option<i64> {
149 let s = s.trim().to_lowercase();
150
151 if let Some(num) = s.strip_suffix('d') {
152 num.parse::<i64>().ok()
153 } else if let Some(num) = s.strip_suffix('w') {
154 num.parse::<i64>().ok().map(|n| n * 7)
155 } else if let Some(num) = s.strip_suffix('m') {
156 num.parse::<i64>().ok().map(|n| n * 30)
157 } else {
158 None
159 }
160}
161
162#[allow(dead_code)]
164pub fn cached_schema_name(remote_name: &str) -> String {
165 format!("cached_{}", remote_name)
166}
167
168pub fn quoted_cached_schema_name(remote_name: &str) -> String {
170 format!("\"cached_{}\"", remote_name)
171}
172
173fn file_remote_data_dir(remote: &RemoteConfig) -> Option<PathBuf> {
178 if remote.remote_type != RemoteType::File {
179 return None;
180 }
181
182 let db_path = remote.uri.strip_prefix("file://")?;
184 let db_path = Path::new(db_path);
185
186 db_path.parent().map(PathBuf::from)
189}
190
191#[derive(Debug)]
193struct BlobInfo {
194 content_hash: String,
195 storage_path: String,
196 byte_length: i64,
197}
198
199fn sync_blob_file(src: &Path, dst: &Path, stats: &mut BlobStats) -> Result<bool> {
204 if dst.exists() {
206 stats.skipped += 1;
207 return Ok(false);
208 }
209
210 if let Some(parent) = dst.parent() {
212 fs::create_dir_all(parent)?;
213 }
214
215 match fs::hard_link(src, dst) {
217 Ok(()) => {
218 stats.linked += 1;
219 stats.count += 1;
220 if let Ok(meta) = fs::metadata(dst) {
221 stats.bytes += meta.len();
222 }
223 Ok(true)
224 }
225 Err(_) => {
226 fs::copy(src, dst)?;
228 stats.copied += 1;
229 stats.count += 1;
230 if let Ok(meta) = fs::metadata(dst) {
231 stats.bytes += meta.len();
232 }
233 Ok(true)
234 }
235 }
236}
237
238impl super::Store {
239 pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
245 use crate::config::RemoteMode;
246
247 if remote.mode == RemoteMode::ReadOnly {
249 if opts.dry_run {
250 return Ok(PushStats::default());
252 } else {
253 return Err(Error::Config(format!(
254 "Cannot push to read-only remote '{}'",
255 remote.name
256 )));
257 }
258 }
259
260 let conn = self.connection_with_options(false)?;
262
263 self.attach_remote(&conn, remote)?;
265
266 let remote_schema = remote.quoted_schema_name();
267
268 ensure_remote_schema(&conn, &remote_schema)?;
270
271 let mut stats = PushStats::default();
272
273 if opts.dry_run {
274 stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
276 stats.invocations = count_table_to_push(&conn, "invocations", &remote_schema, opts.since)?;
277 stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
278 stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
279 if opts.sync_blobs {
280 stats.blobs = count_blobs_to_push(&conn, &remote_schema, opts.since)?;
281 }
282 } else {
283 if opts.sync_blobs {
285 stats.blobs = self.push_blobs(&conn, remote, &remote_schema, opts.since)?;
286 }
287
288 stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
290 stats.invocations = push_table(&conn, "invocations", &remote_schema, opts.since)?;
291 stats.outputs = push_outputs(&conn, &remote_schema, opts.since, opts.sync_blobs)?;
292 stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
293 }
294
295 Ok(stats)
296 }
297
298 fn push_blobs(
303 &self,
304 conn: &Connection,
305 remote: &RemoteConfig,
306 remote_schema: &str,
307 since: Option<NaiveDate>,
308 ) -> Result<BlobStats> {
309 let mut stats = BlobStats::default();
310
311 let remote_data_dir = match file_remote_data_dir(remote) {
313 Some(dir) => dir,
314 None => return Ok(stats), };
316
317 let blobs = get_blobs_to_push(conn, remote_schema, since)?;
319 if blobs.is_empty() {
320 return Ok(stats);
321 }
322
323 let local_data_dir = self.config.data_dir();
324
325 for blob in &blobs {
326 let src = local_data_dir.join(&blob.storage_path);
329 let dst = remote_data_dir.join(&blob.storage_path);
330
331 if !src.exists() {
332 continue;
334 }
335
336 sync_blob_file(&src, &dst, &mut stats)?;
338
339 let escaped_hash = blob.content_hash.replace('\'', "''");
341 let escaped_path = blob.storage_path.replace('\'', "''");
342 conn.execute(
343 &format!(
344 r#"
345 INSERT INTO {schema}.blob_registry (content_hash, byte_length, storage_path)
346 SELECT '{hash}', {len}, '{path}'
347 WHERE NOT EXISTS (
348 SELECT 1 FROM {schema}.blob_registry WHERE content_hash = '{hash}'
349 )
350 "#,
351 schema = remote_schema,
352 hash = escaped_hash,
353 len = blob.byte_length,
354 path = escaped_path,
355 ),
356 [],
357 )?;
358 }
359
360 Ok(stats)
361 }
362
363 pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
370 let conn = self.connection_with_options(false)?;
372
373 self.attach_remote(&conn, remote)?;
375
376 let remote_schema = remote.quoted_schema_name();
377 let cached_schema = quoted_cached_schema_name(&remote.name);
378
379 ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
381
382 let mut stats = PullStats {
384 sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
385 invocations: pull_table(&conn, "invocations", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
386 outputs: pull_outputs(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref(), opts.sync_blobs)?,
387 events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
388 blobs: BlobStats::default(),
389 };
390
391 if opts.sync_blobs {
393 stats.blobs = self.pull_blobs(&conn, remote, &remote_schema, &cached_schema)?;
394 }
395
396 self.rebuild_caches_schema(&conn)?;
398
399 Ok(stats)
400 }
401
402 fn pull_blobs(
407 &self,
408 conn: &Connection,
409 remote: &RemoteConfig,
410 remote_schema: &str,
411 cached_schema: &str,
412 ) -> Result<BlobStats> {
413 let mut stats = BlobStats::default();
414
415 let remote_data_dir = match file_remote_data_dir(remote) {
417 Some(dir) => dir,
418 None => return Ok(stats), };
420
421 let blobs = get_blobs_to_pull(conn, remote_schema, cached_schema)?;
423 if blobs.is_empty() {
424 return Ok(stats);
425 }
426
427 let local_data_dir = self.config.data_dir();
428
429 for blob in &blobs {
430 let src = remote_data_dir.join(&blob.storage_path);
433 let dst = local_data_dir.join(&blob.storage_path);
434
435 if !src.exists() {
436 continue;
438 }
439
440 let synced = sync_blob_file(&src, &dst, &mut stats)?;
442
443 if synced {
445 let escaped_hash = blob.content_hash.replace('\'', "''");
446 let escaped_path = blob.storage_path.replace('\'', "''");
447 conn.execute(
448 &format!(
449 r#"
450 INSERT INTO blob_registry (content_hash, byte_length, storage_path)
451 SELECT '{hash}', {len}, '{path}'
452 WHERE NOT EXISTS (
453 SELECT 1 FROM blob_registry WHERE content_hash = '{hash}'
454 )
455 "#,
456 hash = escaped_hash,
457 len = blob.byte_length,
458 path = escaped_path,
459 ),
460 [],
461 )?;
462 }
463 }
464
465 Ok(stats)
466 }
467
468 pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
474 let schemas: Vec<String> = conn
476 .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
477 .query_map([], |row| row.get(0))?
478 .filter_map(|r| r.ok())
479 .collect();
480
481 conn.execute("BEGIN TRANSACTION", [])?;
483
484 let result = (|| -> std::result::Result<(), duckdb::Error> {
485 for table in &["sessions", "invocations", "outputs", "events"] {
486 let mut union_parts: Vec<String> = schemas
487 .iter()
488 .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
489 .collect();
490
491 if !schemas.iter().any(|s| s == "cached_placeholder") {
493 union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
494 }
495
496 let sql = format!(
497 "CREATE OR REPLACE VIEW caches.{} AS {}",
498 table,
499 union_parts.join(" UNION ALL BY NAME ")
500 );
501 conn.execute(&sql, [])?;
502 }
503 Ok(())
504 })();
505
506 match result {
507 Ok(()) => {
508 conn.execute("COMMIT", [])?;
509 Ok(())
510 }
511 Err(e) => {
512 let _ = conn.execute("ROLLBACK", []);
513 Err(crate::Error::DuckDb(e))
514 }
515 }
516 }
517}
518
519fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
522 let sql = format!(
523 r#"
524 CREATE TABLE IF NOT EXISTS {schema}.sessions (
525 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
526 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
527 );
528 CREATE TABLE IF NOT EXISTS {schema}.invocations (
529 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
530 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR,
531 exit_code INTEGER, status VARCHAR, format_hint VARCHAR, client_id VARCHAR,
532 hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE
533 );
534 CREATE TABLE IF NOT EXISTS {schema}.outputs (
535 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
536 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
537 content_type VARCHAR, date DATE
538 );
539 CREATE TABLE IF NOT EXISTS {schema}.events (
540 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
541 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
542 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
543 status VARCHAR, format_used VARCHAR, date DATE
544 );
545 CREATE TABLE IF NOT EXISTS {schema}.blob_registry (
546 content_hash VARCHAR PRIMARY KEY,
547 byte_length BIGINT NOT NULL,
548 ref_count INTEGER DEFAULT 1,
549 first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
550 last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
551 storage_path VARCHAR NOT NULL
552 );
553 "#,
554 schema = schema
555 );
556 conn.execute_batch(&sql)?;
557 Ok(())
558}
559
560fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
563 conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
565
566 let sql = format!(
568 r#"
569 CREATE TABLE IF NOT EXISTS {schema}.sessions (
570 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
571 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
572 _source VARCHAR DEFAULT '{remote_name}'
573 );
574 CREATE TABLE IF NOT EXISTS {schema}.invocations (
575 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
576 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR,
577 exit_code INTEGER, status VARCHAR, format_hint VARCHAR, client_id VARCHAR,
578 hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE,
579 _source VARCHAR DEFAULT '{remote_name}'
580 );
581 CREATE TABLE IF NOT EXISTS {schema}.outputs (
582 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
583 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
584 content_type VARCHAR, date DATE,
585 _source VARCHAR DEFAULT '{remote_name}'
586 );
587 CREATE TABLE IF NOT EXISTS {schema}.events (
588 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591 status VARCHAR, format_used VARCHAR, date DATE,
592 _source VARCHAR DEFAULT '{remote_name}'
593 );
594 "#,
595 schema = schema,
596 remote_name = remote_name.replace('\'', "''")
597 );
598 conn.execute_batch(&sql)?;
599 Ok(())
600}
601
602fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
604 since
605 .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
606 .unwrap_or_default()
607}
608
609fn client_clause(client_id: Option<&str>) -> String {
611 client_id
612 .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
613 .unwrap_or_default()
614}
615
616fn count_sessions_to_push(
619 conn: &Connection,
620 remote_schema: &str,
621 since: Option<NaiveDate>,
622) -> Result<usize> {
623 let since_filter = since_clause(since, "i.timestamp");
624
625 let sql = format!(
626 r#"
627 SELECT COUNT(DISTINCT s.session_id)
628 FROM local.sessions s
629 JOIN local.invocations i ON i.session_id = s.session_id
630 WHERE NOT EXISTS (
631 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
632 )
633 {since}
634 "#,
635 remote = remote_schema,
636 since = since_filter,
637 );
638
639 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
640 Ok(count as usize)
641}
642
643fn count_table_to_push(
646 conn: &Connection,
647 table: &str,
648 remote_schema: &str,
649 since: Option<NaiveDate>,
650) -> Result<usize> {
651 let sql = match table {
652 "invocations" => {
653 let since_filter = since_clause(since, "l.timestamp");
654 format!(
655 r#"
656 SELECT COUNT(*)
657 FROM local.{table} l
658 WHERE NOT EXISTS (
659 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
660 )
661 {since}
662 "#,
663 table = table,
664 remote = remote_schema,
665 since = since_filter,
666 )
667 }
668 "outputs" | "events" => {
669 let since_filter = since_clause(since, "i.timestamp");
670 format!(
671 r#"
672 SELECT COUNT(*)
673 FROM local.{table} l
674 JOIN local.invocations i ON i.id = l.invocation_id
675 WHERE NOT EXISTS (
676 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
677 )
678 {since}
679 "#,
680 table = table,
681 remote = remote_schema,
682 since = since_filter,
683 )
684 }
685 _ => {
686 format!(
687 r#"
688 SELECT COUNT(*)
689 FROM local.{table} l
690 WHERE NOT EXISTS (
691 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
692 )
693 "#,
694 table = table,
695 remote = remote_schema,
696 )
697 }
698 };
699
700 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
701 Ok(count as usize)
702}
703
704fn push_sessions(
706 conn: &Connection,
707 remote_schema: &str,
708 since: Option<NaiveDate>,
709) -> Result<usize> {
710 let since_filter = since_clause(since, "i.timestamp");
711
712 let sql = format!(
713 r#"
714 INSERT INTO {remote}.sessions
715 SELECT DISTINCT s.*
716 FROM local.sessions s
717 JOIN local.invocations i ON i.session_id = s.session_id
718 WHERE NOT EXISTS (
719 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
720 )
721 {since}
722 "#,
723 remote = remote_schema,
724 since = since_filter,
725 );
726
727 let count = conn.execute(&sql, [])?;
728 Ok(count)
729}
730
731fn push_table(
733 conn: &Connection,
734 table: &str,
735 remote_schema: &str,
736 since: Option<NaiveDate>,
737) -> Result<usize> {
738 let sql = match table {
739 "invocations" => {
740 let since_filter = since_clause(since, "l.timestamp");
741 format!(
742 r#"
743 INSERT INTO {remote}.{table}
744 SELECT *
745 FROM local.{table} l
746 WHERE NOT EXISTS (
747 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
748 )
749 {since}
750 "#,
751 table = table,
752 remote = remote_schema,
753 since = since_filter,
754 )
755 }
756 "outputs" | "events" => {
757 let since_filter = since_clause(since, "i.timestamp");
758 format!(
759 r#"
760 INSERT INTO {remote}.{table}
761 SELECT l.*
762 FROM local.{table} l
763 JOIN local.invocations i ON i.id = l.invocation_id
764 WHERE NOT EXISTS (
765 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
766 )
767 {since}
768 "#,
769 table = table,
770 remote = remote_schema,
771 since = since_filter,
772 )
773 }
774 _ => {
775 format!(
776 r#"
777 INSERT INTO {remote}.{table}
778 SELECT *
779 FROM local.{table} l
780 WHERE NOT EXISTS (
781 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
782 )
783 "#,
784 table = table,
785 remote = remote_schema,
786 )
787 }
788 };
789
790 let count = conn.execute(&sql, [])?;
791 Ok(count)
792}
793
794fn pull_sessions(
796 conn: &Connection,
797 remote_schema: &str,
798 cached_schema: &str,
799 since: Option<NaiveDate>,
800 client_id: Option<&str>,
801) -> Result<usize> {
802 let since_filter = since_clause(since, "r.registered_at");
803 let client_filter = client_clause(client_id);
804
805 let sql = format!(
806 r#"
807 INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
808 SELECT r.*
809 FROM {remote}.sessions r
810 WHERE NOT EXISTS (
811 SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
812 )
813 {since}
814 {client}
815 "#,
816 cached = cached_schema,
817 remote = remote_schema,
818 since = since_filter,
819 client = client_filter,
820 );
821
822 let count = conn.execute(&sql, [])?;
823 Ok(count)
824}
825
826fn pull_table(
828 conn: &Connection,
829 table: &str,
830 remote_schema: &str,
831 cached_schema: &str,
832 since: Option<NaiveDate>,
833 client_id: Option<&str>,
834) -> Result<usize> {
835 let client_filter = client_clause(client_id);
836
837 let sql = match table {
838 "invocations" => {
839 let since_filter = since_clause(since, "r.timestamp");
840 format!(
841 r#"
842 INSERT INTO {cached}.{table} (id, session_id, timestamp, duration_ms, cwd, cmd, executable, runner_id, exit_code, status, format_hint, client_id, hostname, username, tag, date)
843 SELECT r.*
844 FROM {remote}.{table} r
845 WHERE NOT EXISTS (
846 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
847 )
848 {since}
849 {client}
850 "#,
851 table = table,
852 cached = cached_schema,
853 remote = remote_schema,
854 since = since_filter,
855 client = client_filter,
856 )
857 }
858 "outputs" => {
859 let since_filter = since_clause(since, "i.timestamp");
860 format!(
861 r#"
862 INSERT INTO {cached}.{table} (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
863 SELECT r.*
864 FROM {remote}.{table} r
865 JOIN {remote}.invocations i ON i.id = r.invocation_id
866 WHERE NOT EXISTS (
867 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
868 )
869 {since}
870 {client}
871 "#,
872 table = table,
873 cached = cached_schema,
874 remote = remote_schema,
875 since = since_filter,
876 client = if client_id.is_some() {
877 format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
878 } else {
879 String::new()
880 },
881 )
882 }
883 "events" => {
884 let since_filter = since_clause(since, "i.timestamp");
885 format!(
886 r#"
887 INSERT INTO {cached}.{table} (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
888 SELECT r.*
889 FROM {remote}.{table} r
890 JOIN {remote}.invocations i ON i.id = r.invocation_id
891 WHERE NOT EXISTS (
892 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
893 )
894 {since}
895 {client}
896 "#,
897 table = table,
898 cached = cached_schema,
899 remote = remote_schema,
900 since = since_filter,
901 client = if client_id.is_some() {
902 format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
903 } else {
904 String::new()
905 },
906 )
907 }
908 _ => {
909 format!(
910 r#"
911 INSERT INTO {cached}.{table}
912 SELECT r.*
913 FROM {remote}.{table} r
914 WHERE NOT EXISTS (
915 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
916 )
917 {client}
918 "#,
919 table = table,
920 cached = cached_schema,
921 remote = remote_schema,
922 client = client_filter,
923 )
924 }
925 };
926
927 let count = conn.execute(&sql, [])?;
928 Ok(count)
929}
930
931fn count_blobs_to_push(
933 conn: &Connection,
934 remote_schema: &str,
935 since: Option<NaiveDate>,
936) -> Result<BlobStats> {
937 let since_filter = since_clause(since, "i.timestamp");
938
939 let sql = format!(
940 r#"
941 SELECT COUNT(DISTINCT o.content_hash), COALESCE(SUM(o.byte_length), 0)
942 FROM local.outputs o
943 JOIN local.invocations i ON i.id = o.invocation_id
944 WHERE o.storage_type = 'blob'
945 AND NOT EXISTS (
946 SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
947 )
948 {since}
949 "#,
950 remote = remote_schema,
951 since = since_filter,
952 );
953
954 let (count, bytes): (i64, i64) = conn.query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
955 Ok(BlobStats {
956 count: count as usize,
957 bytes: bytes as u64,
958 ..Default::default()
959 })
960}
961
962fn get_blobs_to_push(
964 conn: &Connection,
965 remote_schema: &str,
966 since: Option<NaiveDate>,
967) -> Result<Vec<BlobInfo>> {
968 let since_filter = since_clause(since, "i.timestamp");
969
970 let sql = format!(
971 r#"
972 SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
973 FROM local.outputs o
974 JOIN local.invocations i ON i.id = o.invocation_id
975 JOIN blob_registry b ON b.content_hash = o.content_hash
976 WHERE o.storage_type = 'blob'
977 AND NOT EXISTS (
978 SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
979 )
980 {since}
981 "#,
982 remote = remote_schema,
983 since = since_filter,
984 );
985
986 let mut stmt = conn.prepare(&sql)?;
987 let blobs = stmt
988 .query_map([], |row| {
989 Ok(BlobInfo {
990 content_hash: row.get(0)?,
991 storage_path: row.get(1)?,
992 byte_length: row.get(2)?,
993 })
994 })?
995 .filter_map(|r| r.ok())
996 .collect();
997
998 Ok(blobs)
999}
1000
1001fn get_blobs_to_pull(
1003 conn: &Connection,
1004 remote_schema: &str,
1005 cached_schema: &str,
1006) -> Result<Vec<BlobInfo>> {
1007 let sql = format!(
1008 r#"
1009 SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1010 FROM {cached}.outputs o
1011 JOIN {remote}.blob_registry b ON b.content_hash = o.content_hash
1012 WHERE o.storage_type = 'blob'
1013 AND NOT EXISTS (
1014 SELECT 1 FROM blob_registry r WHERE r.content_hash = o.content_hash
1015 )
1016 "#,
1017 cached = cached_schema,
1018 remote = remote_schema,
1019 );
1020
1021 let mut stmt = conn.prepare(&sql)?;
1022 let blobs = stmt
1023 .query_map([], |row| {
1024 Ok(BlobInfo {
1025 content_hash: row.get(0)?,
1026 storage_path: row.get(1)?,
1027 byte_length: row.get(2)?,
1028 })
1029 })?
1030 .filter_map(|r| r.ok())
1031 .collect();
1032
1033 Ok(blobs)
1034}
1035
1036fn push_outputs(
1038 conn: &Connection,
1039 remote_schema: &str,
1040 since: Option<NaiveDate>,
1041 _sync_blobs: bool,
1042) -> Result<usize> {
1043 let since_filter = since_clause(since, "i.timestamp");
1044
1045 let sql = format!(
1048 r#"
1049 INSERT INTO {remote}.outputs
1050 SELECT l.*
1051 FROM local.outputs l
1052 JOIN local.invocations i ON i.id = l.invocation_id
1053 WHERE NOT EXISTS (
1054 SELECT 1 FROM {remote}.outputs r WHERE r.id = l.id
1055 )
1056 {since}
1057 "#,
1058 remote = remote_schema,
1059 since = since_filter,
1060 );
1061
1062 let count = conn.execute(&sql, [])?;
1063 Ok(count)
1064}
1065
1066fn pull_outputs(
1068 conn: &Connection,
1069 remote_schema: &str,
1070 cached_schema: &str,
1071 since: Option<NaiveDate>,
1072 client_id: Option<&str>,
1073 _sync_blobs: bool,
1074) -> Result<usize> {
1075 let since_filter = since_clause(since, "i.timestamp");
1076 let client_filter = if client_id.is_some() {
1077 format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
1078 } else {
1079 String::new()
1080 };
1081
1082 let sql = format!(
1085 r#"
1086 INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1087 SELECT r.*
1088 FROM {remote}.outputs r
1089 JOIN {remote}.invocations i ON i.id = r.invocation_id
1090 WHERE NOT EXISTS (
1091 SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1092 )
1093 {since}
1094 {client}
1095 "#,
1096 cached = cached_schema,
1097 remote = remote_schema,
1098 since = since_filter,
1099 client = client_filter,
1100 );
1101
1102 let count = conn.execute(&sql, [])?;
1103 Ok(count)
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::*;
1109 use crate::config::{RemoteConfig, RemoteMode, RemoteType};
1110 use crate::init::initialize;
1111 use crate::schema::InvocationRecord;
1112 use crate::store::{ConnectionOptions, Store};
1113 use crate::Config;
1114 use tempfile::TempDir;
1115
1116 fn setup_store_duckdb() -> (TempDir, Store) {
1117 let tmp = TempDir::new().unwrap();
1118 let config = Config::with_duckdb_mode(tmp.path());
1119 initialize(&config).unwrap();
1120 let store = Store::open(config).unwrap();
1121 (tmp, store)
1122 }
1123
1124 fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
1125 RemoteConfig {
1126 name: name.to_string(),
1127 remote_type: RemoteType::File,
1128 uri: path.to_string_lossy().to_string(),
1129 mode: RemoteMode::ReadWrite,
1130 auto_attach: true,
1131 credential_provider: None,
1132 }
1133 }
1134
1135 #[test]
1138 fn test_parse_since_days() {
1139 let today = Utc::now().date_naive();
1140 let result = parse_since("7d").unwrap();
1141 assert_eq!(result, today - TimeDelta::days(7));
1142 }
1143
1144 #[test]
1145 fn test_parse_since_weeks() {
1146 let today = Utc::now().date_naive();
1147 let result = parse_since("2w").unwrap();
1148 assert_eq!(result, today - TimeDelta::days(14));
1149 }
1150
1151 #[test]
1152 fn test_parse_since_months() {
1153 let today = Utc::now().date_naive();
1154 let result = parse_since("1m").unwrap();
1155 assert_eq!(result, today - TimeDelta::days(30));
1156 }
1157
1158 #[test]
1159 fn test_parse_since_date() {
1160 let result = parse_since("2024-01-15").unwrap();
1161 assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
1162 }
1163
1164 #[test]
1165 fn test_parse_since_invalid() {
1166 assert!(parse_since("invalid").is_err());
1167 }
1168
1169 #[test]
1172 fn test_push_to_file_remote() {
1173 let (tmp, store) = setup_store_duckdb();
1174
1175 let inv = InvocationRecord::new(
1177 "test-session",
1178 "echo hello",
1179 "/home/user",
1180 0,
1181 "test@client",
1182 );
1183 store.write_invocation(&inv).unwrap();
1184
1185 let remote_path = tmp.path().join("remote.duckdb");
1187 let remote = create_file_remote("test", &remote_path);
1188
1189 let stats = store.push(&remote, PushOptions::default()).unwrap();
1191
1192 assert_eq!(stats.invocations, 1);
1193 assert!(remote_path.exists(), "Remote database file should be created");
1194 }
1195
1196 #[test]
1197 fn test_push_is_idempotent() {
1198 let (tmp, store) = setup_store_duckdb();
1199
1200 let inv = InvocationRecord::new(
1202 "test-session",
1203 "echo hello",
1204 "/home/user",
1205 0,
1206 "test@client",
1207 );
1208 store.write_invocation(&inv).unwrap();
1209
1210 let remote_path = tmp.path().join("remote.duckdb");
1212 let remote = create_file_remote("test", &remote_path);
1213
1214 let stats1 = store.push(&remote, PushOptions::default()).unwrap();
1216 let stats2 = store.push(&remote, PushOptions::default()).unwrap();
1217
1218 assert_eq!(stats1.invocations, 1);
1220 assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
1221 }
1222
1223 #[test]
1224 fn test_push_dry_run() {
1225 let (tmp, store) = setup_store_duckdb();
1226
1227 let inv = InvocationRecord::new(
1229 "test-session",
1230 "echo hello",
1231 "/home/user",
1232 0,
1233 "test@client",
1234 );
1235 store.write_invocation(&inv).unwrap();
1236
1237 let remote_path = tmp.path().join("remote.duckdb");
1239 let remote = create_file_remote("test", &remote_path);
1240
1241 let dry_stats = store
1243 .push(
1244 &remote,
1245 PushOptions {
1246 dry_run: true,
1247 ..Default::default()
1248 },
1249 )
1250 .unwrap();
1251
1252 assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
1253
1254 let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
1256 assert_eq!(actual_stats.invocations, 1);
1257 }
1258
1259 #[test]
1260 fn test_pull_from_file_remote() {
1261 let (tmp, store) = setup_store_duckdb();
1262
1263 let inv = InvocationRecord::new(
1265 "test-session",
1266 "echo hello",
1267 "/home/user",
1268 0,
1269 "test@client",
1270 );
1271 store.write_invocation(&inv).unwrap();
1272
1273 let remote_path = tmp.path().join("remote.duckdb");
1274 let remote = create_file_remote("test", &remote_path);
1275 store.push(&remote, PushOptions::default()).unwrap();
1276
1277 let conn = store.connection().unwrap();
1279 conn.execute("DELETE FROM local.invocations", []).unwrap();
1280 drop(conn);
1281
1282 let stats = store.pull(&remote, PullOptions::default()).unwrap();
1284
1285 assert_eq!(stats.invocations, 1, "Should pull the invocation back");
1286
1287 let conn = store.connection().unwrap();
1289 let count: i64 = conn
1290 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1291 .unwrap();
1292 assert_eq!(count, 1, "Data should be in cached schema");
1293 }
1294
1295 #[test]
1296 fn test_pull_is_idempotent() {
1297 let (tmp, store) = setup_store_duckdb();
1298
1299 let inv = InvocationRecord::new(
1301 "test-session",
1302 "echo hello",
1303 "/home/user",
1304 0,
1305 "test@client",
1306 );
1307 store.write_invocation(&inv).unwrap();
1308
1309 let remote_path = tmp.path().join("remote.duckdb");
1310 let remote = create_file_remote("test", &remote_path);
1311 store.push(&remote, PushOptions::default()).unwrap();
1312
1313 let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
1315 let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
1316
1317 assert_eq!(stats1.invocations, 1);
1318 assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
1319 }
1320
1321 #[test]
1324 fn test_remote_name_with_hyphen() {
1325 let (tmp, store) = setup_store_duckdb();
1326
1327 let inv = InvocationRecord::new(
1329 "test-session",
1330 "echo hello",
1331 "/home/user",
1332 0,
1333 "test@client",
1334 );
1335 store.write_invocation(&inv).unwrap();
1336
1337 let remote_path = tmp.path().join("my-team-remote.duckdb");
1339 let remote = create_file_remote("my-team", &remote_path);
1340
1341 let stats = store.push(&remote, PushOptions::default()).unwrap();
1343 assert_eq!(stats.invocations, 1);
1344
1345 let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
1347 assert_eq!(pull_stats.invocations, 1);
1349 }
1350
1351 #[test]
1352 fn test_remote_name_with_dots() {
1353 let (tmp, store) = setup_store_duckdb();
1354
1355 let inv = InvocationRecord::new(
1356 "test-session",
1357 "echo hello",
1358 "/home/user",
1359 0,
1360 "test@client",
1361 );
1362 store.write_invocation(&inv).unwrap();
1363
1364 let remote_path = tmp.path().join("team.v2.duckdb");
1366 let remote = create_file_remote("team.v2", &remote_path);
1367
1368 let stats = store.push(&remote, PushOptions::default()).unwrap();
1369 assert_eq!(stats.invocations, 1);
1370 }
1371
1372 #[test]
1375 fn test_connection_minimal_vs_full() {
1376 let (_tmp, store) = setup_store_duckdb();
1377
1378 let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
1380 let count: i64 = conn_minimal
1381 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1382 .unwrap();
1383 assert_eq!(count, 0);
1384 drop(conn_minimal);
1385
1386 let conn_full = store.connect(ConnectionOptions::full()).unwrap();
1388 let count: i64 = conn_full
1389 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1390 .unwrap();
1391 assert_eq!(count, 0);
1392 }
1393
1394 #[test]
1395 fn test_multiple_sequential_connections() {
1396 let (_tmp, store) = setup_store_duckdb();
1397
1398 for i in 0..5 {
1401 let conn = store.connection().unwrap();
1402 let count: i64 = conn
1403 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1404 .unwrap();
1405 assert_eq!(count, 0, "Connection {} should work", i);
1406 drop(conn);
1407 }
1408
1409 let inv = InvocationRecord::new(
1411 "test-session",
1412 "echo hello",
1413 "/home/user",
1414 0,
1415 "test@client",
1416 );
1417 store.write_invocation(&inv).unwrap();
1418
1419 for i in 0..3 {
1421 let conn = store.connection().unwrap();
1422 let count: i64 = conn
1423 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1424 .unwrap();
1425 assert_eq!(count, 1, "Connection {} should see the data", i);
1426 drop(conn);
1427 }
1428 }
1429
1430 #[test]
1433 fn test_caches_schema_views_work() {
1434 let (tmp, store) = setup_store_duckdb();
1435
1436 let conn = store.connection().unwrap();
1438 let count: i64 = conn
1439 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1440 .unwrap();
1441 assert_eq!(count, 0);
1442 drop(conn);
1443
1444 let inv = InvocationRecord::new(
1446 "test-session",
1447 "echo hello",
1448 "/home/user",
1449 0,
1450 "test@client",
1451 );
1452 store.write_invocation(&inv).unwrap();
1453
1454 let remote_path = tmp.path().join("remote.duckdb");
1455 let remote = create_file_remote("test", &remote_path);
1456 store.push(&remote, PushOptions::default()).unwrap();
1457
1458 store.pull(&remote, PullOptions::default()).unwrap();
1460
1461 let conn = store.connection().unwrap();
1463 store.rebuild_caches_schema(&conn).unwrap();
1464
1465 let count: i64 = conn
1467 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1468 .unwrap();
1469 assert_eq!(count, 1, "caches should include pulled data after rebuild");
1470 }
1471
1472 #[test]
1473 fn test_main_schema_unions_local_and_caches() {
1474 let (tmp, store) = setup_store_duckdb();
1475
1476 let inv1 = InvocationRecord::new(
1478 "test-session",
1479 "local command",
1480 "/home/user",
1481 0,
1482 "local@client",
1483 );
1484 store.write_invocation(&inv1).unwrap();
1485
1486 let remote_path = tmp.path().join("remote.duckdb");
1488 let remote = create_file_remote("team", &remote_path);
1489
1490 store.push(&remote, PushOptions::default()).unwrap();
1492
1493 let conn = store.connection().unwrap();
1495 conn.execute("DELETE FROM local.invocations", []).unwrap();
1496 drop(conn);
1497
1498 store.pull(&remote, PullOptions::default()).unwrap();
1499
1500 let conn = store.connection().unwrap();
1502 store.rebuild_caches_schema(&conn).unwrap();
1503
1504 let inv2 = InvocationRecord::new(
1506 "test-session-2",
1507 "new local command",
1508 "/home/user",
1509 0,
1510 "local@client",
1511 );
1512 drop(conn);
1513 store.write_invocation(&inv2).unwrap();
1514
1515 let conn = store.connection().unwrap();
1517 let count: i64 = conn
1518 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1519 .unwrap();
1520 assert_eq!(count, 2, "main should union local + caches");
1521 }
1522
1523 fn setup_store_parquet() -> (TempDir, Store) {
1527 let tmp = TempDir::new().unwrap();
1528 let config = Config::with_root(tmp.path()); initialize(&config).unwrap();
1530 let store = Store::open(config).unwrap();
1531 (tmp, store)
1532 }
1533
1534 #[test]
1535 fn test_heterogeneous_parquet_local_duckdb_remote() {
1536 let (_local_tmp, local_store) = setup_store_parquet();
1538
1539 let remote_tmp = TempDir::new().unwrap();
1541 let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1542 initialize(&remote_config).unwrap();
1543 let remote_store = Store::open(remote_config).unwrap();
1544
1545 let remote_inv = InvocationRecord::new(
1547 "remote-session",
1548 "remote command",
1549 "/home/remote",
1550 0,
1551 "remote@client",
1552 );
1553 remote_store.write_invocation(&remote_inv).unwrap();
1554
1555 let remote_conn = remote_store.connection().unwrap();
1557 let remote_count: i64 = remote_conn
1558 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1559 .unwrap();
1560 assert_eq!(remote_count, 1, "Remote should have data in local schema");
1561 drop(remote_conn);
1562
1563 let local_inv = InvocationRecord::new(
1565 "local-session",
1566 "local command",
1567 "/home/local",
1568 0,
1569 "local@client",
1570 );
1571 local_store.write_invocation(&local_inv).unwrap();
1572
1573 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1575 let remote_config = RemoteConfig {
1576 name: "duckdb-store".to_string(),
1577 remote_type: RemoteType::File,
1578 uri: format!("file://{}", remote_db_path.display()),
1579 mode: RemoteMode::ReadOnly,
1580 auto_attach: true,
1581 credential_provider: None,
1582 };
1583
1584 let conn = local_store.connection_with_options(false).unwrap();
1586 local_store.attach_remote(&conn, &remote_config).unwrap();
1587
1588 let schema = remote_config.quoted_schema_name();
1591 let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1592 assert_eq!(table_prefix, "local.", "Should detect DuckDB mode remote has local. prefix");
1593
1594 let remote_count: i64 = conn
1596 .query_row(
1597 &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1598 [],
1599 |r| r.get(0),
1600 )
1601 .unwrap();
1602 assert_eq!(remote_count, 1, "Should be able to query DuckDB remote from Parquet local");
1603 }
1604
1605 #[test]
1606 fn test_heterogeneous_duckdb_local_parquet_remote() {
1607 let (_local_tmp, local_store) = setup_store_duckdb();
1609
1610 let remote_tmp = TempDir::new().unwrap();
1612 let remote_config = Config::with_root(remote_tmp.path());
1613 initialize(&remote_config).unwrap();
1614 let remote_store = Store::open(remote_config).unwrap();
1615
1616 let remote_inv = InvocationRecord::new(
1618 "remote-session",
1619 "remote command",
1620 "/home/remote",
1621 0,
1622 "remote@client",
1623 );
1624 remote_store.write_invocation(&remote_inv).unwrap();
1625
1626 let local_inv = InvocationRecord::new(
1628 "local-session",
1629 "local command",
1630 "/home/local",
1631 0,
1632 "local@client",
1633 );
1634 local_store.write_invocation(&local_inv).unwrap();
1635
1636 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1638 let remote_config = RemoteConfig {
1639 name: "parquet-store".to_string(),
1640 remote_type: RemoteType::File,
1641 uri: format!("file://{}", remote_db_path.display()),
1642 mode: RemoteMode::ReadOnly,
1643 auto_attach: true,
1644 credential_provider: None,
1645 };
1646
1647 let conn = local_store.connection_with_options(false).unwrap();
1649 local_store.attach_remote(&conn, &remote_config).unwrap();
1650
1651 let schema = remote_config.quoted_schema_name();
1653 let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1654 assert_eq!(table_prefix, "local.", "BIRD databases have local schema in both modes");
1655
1656 let remote_count: i64 = conn
1658 .query_row(
1659 &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1660 [],
1661 |r| r.get(0),
1662 )
1663 .unwrap();
1664 assert_eq!(remote_count, 1, "Should be able to query Parquet remote from DuckDB local");
1665 }
1666
1667 #[test]
1668 fn test_heterogeneous_unified_views() {
1669 let (local_tmp, local_store) = setup_store_parquet();
1671
1672 let remote_tmp = TempDir::new().unwrap();
1674 let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1675 initialize(&remote_config).unwrap();
1676 let remote_store = Store::open(remote_config).unwrap();
1677
1678 let remote_inv = InvocationRecord::new(
1680 "remote-session",
1681 "remote-specific-cmd",
1682 "/home/remote",
1683 42,
1684 "remote@client",
1685 );
1686 remote_store.write_invocation(&remote_inv).unwrap();
1687
1688 let local_inv = InvocationRecord::new(
1690 "local-session",
1691 "local-specific-cmd",
1692 "/home/local",
1693 0,
1694 "local@client",
1695 );
1696 local_store.write_invocation(&local_inv).unwrap();
1697
1698 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1700 let mut config = Config::with_root(local_tmp.path());
1701 config.remotes.push(RemoteConfig {
1702 name: "heterogeneous-test".to_string(),
1703 remote_type: RemoteType::File,
1704 uri: format!("file://{}", remote_db_path.display()),
1705 mode: RemoteMode::ReadOnly,
1706 auto_attach: true,
1707 credential_provider: None,
1708 });
1709
1710 let store = Store::open(config).unwrap();
1712
1713 let conn = store.connection().unwrap();
1715
1716 let local_count: i64 = conn
1718 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1719 .unwrap();
1720 assert_eq!(local_count, 1, "Local should have 1 record");
1721
1722 let unified_count: i64 = conn
1724 .query_row("SELECT COUNT(*) FROM unified.invocations", [], |r| r.get(0))
1725 .unwrap();
1726 assert_eq!(unified_count, 2, "Unified view should have local + remote records");
1727
1728 let cmds: Vec<String> = conn
1730 .prepare("SELECT cmd FROM unified.invocations ORDER BY cmd")
1731 .unwrap()
1732 .query_map([], |r| r.get(0))
1733 .unwrap()
1734 .filter_map(|r| r.ok())
1735 .collect();
1736 assert_eq!(cmds.len(), 2);
1737 assert!(cmds.contains(&"local-specific-cmd".to_string()));
1738 assert!(cmds.contains(&"remote-specific-cmd".to_string()));
1739 }
1740
1741 #[test]
1742 fn test_detect_remote_table_path_standalone_db() {
1743 let (_tmp, store) = setup_store_duckdb();
1745
1746 let standalone_tmp = TempDir::new().unwrap();
1748 let standalone_db_path = standalone_tmp.path().join("standalone.duckdb");
1749 {
1750 let conn = duckdb::Connection::open(&standalone_db_path).unwrap();
1751 conn.execute(
1752 "CREATE TABLE invocations (id UUID, cmd VARCHAR)",
1753 [],
1754 )
1755 .unwrap();
1756 conn.execute(
1757 "INSERT INTO invocations VALUES (gen_random_uuid(), 'test')",
1758 [],
1759 )
1760 .unwrap();
1761 }
1762
1763 let remote = RemoteConfig {
1765 name: "standalone".to_string(),
1766 remote_type: RemoteType::File,
1767 uri: format!("file://{}", standalone_db_path.display()),
1768 mode: RemoteMode::ReadOnly,
1769 auto_attach: true,
1770 credential_provider: None,
1771 };
1772
1773 let conn = store.connection_with_options(false).unwrap();
1774 store.attach_remote(&conn, &remote).unwrap();
1775
1776 let schema = remote.quoted_schema_name();
1778 let table_prefix = store.detect_remote_table_path(&conn, &schema);
1779 assert_eq!(table_prefix, "", "Standalone DB should have no prefix");
1780
1781 let count: i64 = conn
1783 .query_row(
1784 &format!("SELECT COUNT(*) FROM {}.invocations", schema),
1785 [],
1786 |r| r.get(0),
1787 )
1788 .unwrap();
1789 assert_eq!(count, 1);
1790 }
1791
1792 #[test]
1793 fn test_push_to_readonly_remote_fails() {
1794 let (_tmp, store) = setup_store_duckdb();
1795
1796 let inv = InvocationRecord::new(
1798 "test-session",
1799 "echo hello",
1800 "/home/user",
1801 0,
1802 "test@client",
1803 );
1804 store.write_invocation(&inv).unwrap();
1805
1806 let remote_tmp = TempDir::new().unwrap();
1808 let remote_path = remote_tmp.path().join("remote.duckdb");
1809 let remote = RemoteConfig {
1810 name: "readonly".to_string(),
1811 remote_type: RemoteType::File,
1812 uri: format!("file://{}", remote_path.display()),
1813 mode: RemoteMode::ReadOnly,
1814 auto_attach: true,
1815 credential_provider: None,
1816 };
1817
1818 let result = store.push(&remote, PushOptions::default());
1820 assert!(result.is_err(), "Push to read-only remote should fail");
1821 assert!(
1822 result.unwrap_err().to_string().contains("Cannot push to read-only"),
1823 "Error should mention read-only"
1824 );
1825 }
1826
1827 #[test]
1828 fn test_push_to_readonly_remote_dry_run_returns_empty() {
1829 let (_tmp, store) = setup_store_duckdb();
1830
1831 let inv = InvocationRecord::new(
1833 "test-session",
1834 "echo hello",
1835 "/home/user",
1836 0,
1837 "test@client",
1838 );
1839 store.write_invocation(&inv).unwrap();
1840
1841 let remote_tmp = TempDir::new().unwrap();
1843 let remote_path = remote_tmp.path().join("remote.duckdb");
1844 let remote = RemoteConfig {
1845 name: "readonly".to_string(),
1846 remote_type: RemoteType::File,
1847 uri: format!("file://{}", remote_path.display()),
1848 mode: RemoteMode::ReadOnly,
1849 auto_attach: true,
1850 credential_provider: None,
1851 };
1852
1853 let stats = store.push(&remote, PushOptions { dry_run: true, ..Default::default() }).unwrap();
1855 assert_eq!(stats.invocations, 0);
1856 assert_eq!(stats.sessions, 0);
1857 }
1858}