1use std::fs;
20use std::path::{Path, PathBuf};
21
22use chrono::{NaiveDate, TimeDelta, Utc};
23use duckdb::Connection;
24
25use crate::config::RemoteType;
26use crate::{Error, RemoteConfig, Result};
27
28#[derive(Debug, Default)]
30pub struct BlobStats {
31 pub count: usize,
33 pub bytes: u64,
35 pub linked: usize,
37 pub copied: usize,
39 pub skipped: usize,
41}
42
43impl std::fmt::Display for BlobStats {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 if self.count == 0 {
46 write!(f, "0 blobs")
47 } else {
48 let kb = self.bytes / 1024;
49 write!(
50 f,
51 "{} blobs ({}KB, {} linked, {} copied, {} skipped)",
52 self.count, kb, self.linked, self.copied, self.skipped
53 )
54 }
55 }
56}
57
58#[derive(Debug, Default)]
60pub struct PushStats {
61 pub sessions: usize,
62 pub invocations: usize,
63 pub outputs: usize,
64 pub events: usize,
65 pub blobs: BlobStats,
66}
67
68impl std::fmt::Display for PushStats {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "{} sessions, {} invocations, {} outputs, {} events",
73 self.sessions, self.invocations, self.outputs, self.events
74 )?;
75 if self.blobs.count > 0 {
76 write!(f, ", {}", self.blobs)?;
77 }
78 Ok(())
79 }
80}
81
82#[derive(Debug, Default)]
84pub struct PullStats {
85 pub sessions: usize,
86 pub invocations: usize,
87 pub outputs: usize,
88 pub events: usize,
89 pub blobs: BlobStats,
90}
91
92impl std::fmt::Display for PullStats {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "{} sessions, {} invocations, {} outputs, {} events",
97 self.sessions, self.invocations, self.outputs, self.events
98 )?;
99 if self.blobs.count > 0 {
100 write!(f, ", {}", self.blobs)?;
101 }
102 Ok(())
103 }
104}
105
106#[derive(Debug, Default)]
108pub struct PushOptions {
109 pub since: Option<NaiveDate>,
111 pub dry_run: bool,
113 pub sync_blobs: bool,
115}
116
117#[derive(Debug, Default)]
119pub struct PullOptions {
120 pub since: Option<NaiveDate>,
122 pub client_id: Option<String>,
124 pub sync_blobs: bool,
126}
127
128pub fn parse_since(s: &str) -> Result<NaiveDate> {
134 let s = s.trim();
135
136 if let Some(days) = parse_duration_days(s) {
138 let date = Utc::now().date_naive() - TimeDelta::days(days);
139 return Ok(date);
140 }
141
142 NaiveDate::parse_from_str(s, "%Y-%m-%d")
144 .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
145}
146
147fn parse_duration_days(s: &str) -> Option<i64> {
149 let s = s.trim().to_lowercase();
150
151 if let Some(num) = s.strip_suffix('d') {
152 num.parse::<i64>().ok()
153 } else if let Some(num) = s.strip_suffix('w') {
154 num.parse::<i64>().ok().map(|n| n * 7)
155 } else if let Some(num) = s.strip_suffix('m') {
156 num.parse::<i64>().ok().map(|n| n * 30)
157 } else {
158 None
159 }
160}
161
162#[allow(dead_code)]
164pub fn cached_schema_name(remote_name: &str) -> String {
165 format!("cached_{}", remote_name)
166}
167
168pub fn quoted_cached_schema_name(remote_name: &str) -> String {
170 format!("\"cached_{}\"", remote_name)
171}
172
173fn file_remote_data_dir(remote: &RemoteConfig) -> Option<PathBuf> {
178 if remote.remote_type != RemoteType::File {
179 return None;
180 }
181
182 let db_path = remote.uri.strip_prefix("file://")?;
184 let db_path = Path::new(db_path);
185
186 db_path.parent().map(PathBuf::from)
189}
190
191#[derive(Debug)]
193struct BlobInfo {
194 content_hash: String,
195 storage_path: String,
196 byte_length: i64,
197}
198
199fn sync_blob_file(src: &Path, dst: &Path, stats: &mut BlobStats) -> Result<bool> {
204 if dst.exists() {
206 stats.skipped += 1;
207 return Ok(false);
208 }
209
210 if let Some(parent) = dst.parent() {
212 fs::create_dir_all(parent)?;
213 }
214
215 match fs::hard_link(src, dst) {
217 Ok(()) => {
218 stats.linked += 1;
219 stats.count += 1;
220 if let Ok(meta) = fs::metadata(dst) {
221 stats.bytes += meta.len();
222 }
223 Ok(true)
224 }
225 Err(_) => {
226 fs::copy(src, dst)?;
228 stats.copied += 1;
229 stats.count += 1;
230 if let Ok(meta) = fs::metadata(dst) {
231 stats.bytes += meta.len();
232 }
233 Ok(true)
234 }
235 }
236}
237
238impl super::Store {
239 pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
245 use crate::config::RemoteMode;
246
247 if remote.mode == RemoteMode::ReadOnly {
249 if opts.dry_run {
250 return Ok(PushStats::default());
252 } else {
253 return Err(Error::Config(format!(
254 "Cannot push to read-only remote '{}'",
255 remote.name
256 )));
257 }
258 }
259
260 let conn = self.connection_with_options(false)?;
262
263 self.attach_remote(&conn, remote)?;
265
266 let remote_schema = remote.quoted_schema_name();
267
268 ensure_remote_schema(&conn, &remote_schema)?;
270
271 let mut stats = PushStats::default();
272
273 if opts.dry_run {
274 stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
276 stats.invocations = count_table_to_push(&conn, "attempts", &remote_schema, opts.since)?;
278 stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
279 stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
280 if opts.sync_blobs {
281 stats.blobs = count_blobs_to_push(&conn, &remote_schema, opts.since)?;
282 }
283 } else {
284 if opts.sync_blobs {
286 stats.blobs = self.push_blobs(&conn, remote, &remote_schema, opts.since)?;
287 }
288
289 stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
291 stats.invocations = push_table(&conn, "attempts", &remote_schema, opts.since)?;
293 let _ = push_table(&conn, "outcomes", &remote_schema, opts.since)?;
294 stats.outputs = push_outputs(&conn, &remote_schema, opts.since, opts.sync_blobs)?;
295 stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
296 }
297
298 Ok(stats)
299 }
300
301 fn push_blobs(
306 &self,
307 conn: &Connection,
308 remote: &RemoteConfig,
309 remote_schema: &str,
310 since: Option<NaiveDate>,
311 ) -> Result<BlobStats> {
312 let mut stats = BlobStats::default();
313
314 let remote_data_dir = match file_remote_data_dir(remote) {
316 Some(dir) => dir,
317 None => return Ok(stats), };
319
320 let blobs = get_blobs_to_push(conn, remote_schema, since)?;
322 if blobs.is_empty() {
323 return Ok(stats);
324 }
325
326 let local_data_dir = self.config.data_dir();
327
328 for blob in &blobs {
329 let src = local_data_dir.join(&blob.storage_path);
332 let dst = remote_data_dir.join(&blob.storage_path);
333
334 if !src.exists() {
335 continue;
337 }
338
339 sync_blob_file(&src, &dst, &mut stats)?;
341
342 let escaped_hash = blob.content_hash.replace('\'', "''");
344 let escaped_path = blob.storage_path.replace('\'', "''");
345 conn.execute(
346 &format!(
347 r#"
348 INSERT INTO {schema}.blob_registry (content_hash, byte_length, storage_path)
349 SELECT '{hash}', {len}, '{path}'
350 WHERE NOT EXISTS (
351 SELECT 1 FROM {schema}.blob_registry WHERE content_hash = '{hash}'
352 )
353 "#,
354 schema = remote_schema,
355 hash = escaped_hash,
356 len = blob.byte_length,
357 path = escaped_path,
358 ),
359 [],
360 )?;
361 }
362
363 Ok(stats)
364 }
365
366 pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
373 let conn = self.connection_with_options(false)?;
375
376 self.attach_remote(&conn, remote)?;
378
379 let remote_schema = remote.quoted_schema_name();
380 let cached_schema = quoted_cached_schema_name(&remote.name);
381
382 ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
384
385 let attempts_pulled = pull_table(&conn, "attempts", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?;
388 let _ = pull_table(&conn, "outcomes", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?;
389 let mut stats = PullStats {
390 sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
391 invocations: attempts_pulled,
392 outputs: pull_outputs(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref(), opts.sync_blobs)?,
393 events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
394 blobs: BlobStats::default(),
395 };
396
397 if opts.sync_blobs {
399 stats.blobs = self.pull_blobs(&conn, remote, &remote_schema, &cached_schema)?;
400 }
401
402 self.rebuild_caches_schema(&conn)?;
404
405 Ok(stats)
406 }
407
408 fn pull_blobs(
413 &self,
414 conn: &Connection,
415 remote: &RemoteConfig,
416 remote_schema: &str,
417 cached_schema: &str,
418 ) -> Result<BlobStats> {
419 let mut stats = BlobStats::default();
420
421 let remote_data_dir = match file_remote_data_dir(remote) {
423 Some(dir) => dir,
424 None => return Ok(stats), };
426
427 let blobs = get_blobs_to_pull(conn, remote_schema, cached_schema)?;
429 if blobs.is_empty() {
430 return Ok(stats);
431 }
432
433 let local_data_dir = self.config.data_dir();
434
435 for blob in &blobs {
436 let src = remote_data_dir.join(&blob.storage_path);
439 let dst = local_data_dir.join(&blob.storage_path);
440
441 if !src.exists() {
442 continue;
444 }
445
446 let synced = sync_blob_file(&src, &dst, &mut stats)?;
448
449 if synced {
451 let escaped_hash = blob.content_hash.replace('\'', "''");
452 let escaped_path = blob.storage_path.replace('\'', "''");
453 conn.execute(
454 &format!(
455 r#"
456 INSERT INTO blob_registry (content_hash, byte_length, storage_path)
457 SELECT '{hash}', {len}, '{path}'
458 WHERE NOT EXISTS (
459 SELECT 1 FROM blob_registry WHERE content_hash = '{hash}'
460 )
461 "#,
462 hash = escaped_hash,
463 len = blob.byte_length,
464 path = escaped_path,
465 ),
466 [],
467 )?;
468 }
469 }
470
471 Ok(stats)
472 }
473
474 pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
481 let schemas: Vec<String> = conn
483 .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
484 .query_map([], |row| row.get(0))?
485 .filter_map(|r| r.ok())
486 .collect();
487
488 conn.execute("BEGIN TRANSACTION", [])?;
490
491 let result = (|| -> std::result::Result<(), duckdb::Error> {
492 for table in &["sessions", "attempts", "outcomes", "outputs", "events"] {
494 let mut union_parts: Vec<String> = schemas
495 .iter()
496 .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
497 .collect();
498
499 if !schemas.iter().any(|s| s == "cached_placeholder") {
501 union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
502 }
503
504 let sql = format!(
505 "CREATE OR REPLACE VIEW caches.{} AS {}",
506 table,
507 union_parts.join(" UNION ALL BY NAME ")
508 );
509 conn.execute(&sql, [])?;
510 }
511
512 conn.execute(
516 r#"
517 CREATE OR REPLACE VIEW caches.invocations AS
518 SELECT
519 a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
520 a.tag, a.source_client, a.machine_id, a.hostname,
521 a.executable, a.format_hint,
522 o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
523 a.date,
524 CASE
525 WHEN o.attempt_id IS NULL THEN 'pending'
526 WHEN o.exit_code IS NULL THEN 'orphaned'
527 ELSE 'completed'
528 END AS status,
529 a._source
530 FROM caches.attempts a
531 LEFT JOIN caches.outcomes o ON a.id = o.attempt_id
532 "#,
533 [],
534 )?;
535
536 Ok(())
537 })();
538
539 match result {
540 Ok(()) => {
541 conn.execute("COMMIT", [])?;
542 Ok(())
543 }
544 Err(e) => {
545 let _ = conn.execute("ROLLBACK", []);
546 Err(crate::Error::DuckDb(e))
547 }
548 }
549 }
550}
551
552fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
556 let sql = format!(
557 r#"
558 CREATE TABLE IF NOT EXISTS {schema}.sessions (
559 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
560 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
561 );
562 -- V5: attempts table (invocation start)
563 CREATE TABLE IF NOT EXISTS {schema}.attempts (
564 id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
565 tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
566 executable VARCHAR, format_hint VARCHAR, metadata MAP(VARCHAR, JSON), date DATE
567 );
568 -- V5: outcomes table (invocation completion)
569 CREATE TABLE IF NOT EXISTS {schema}.outcomes (
570 attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
571 signal INTEGER, timeout BOOLEAN, metadata MAP(VARCHAR, JSON), date DATE
572 );
573 -- V5: invocations VIEW for compatibility
574 -- Note: metadata not included due to MAP_CONCAT complexity; use attempts/outcomes directly
575 CREATE OR REPLACE VIEW {schema}.invocations AS
576 SELECT
577 a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
578 a.tag, a.source_client, a.machine_id, a.hostname,
579 a.executable, a.format_hint,
580 o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
581 a.date,
582 CASE
583 WHEN o.attempt_id IS NULL THEN 'pending'
584 WHEN o.exit_code IS NULL THEN 'orphaned'
585 ELSE 'completed'
586 END AS status
587 FROM {schema}.attempts a
588 LEFT JOIN {schema}.outcomes o ON a.id = o.attempt_id;
589 CREATE TABLE IF NOT EXISTS {schema}.outputs (
590 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
591 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
592 content_type VARCHAR, date DATE
593 );
594 CREATE TABLE IF NOT EXISTS {schema}.events (
595 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
596 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
597 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
598 status VARCHAR, format_used VARCHAR, date DATE
599 );
600 CREATE TABLE IF NOT EXISTS {schema}.blob_registry (
601 content_hash VARCHAR PRIMARY KEY,
602 byte_length BIGINT NOT NULL,
603 ref_count INTEGER DEFAULT 1,
604 first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
605 last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
606 storage_path VARCHAR NOT NULL
607 );
608 "#,
609 schema = schema
610 );
611 conn.execute_batch(&sql)?;
612 Ok(())
613}
614
615fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
619 conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
621
622 let sql = format!(
624 r#"
625 CREATE TABLE IF NOT EXISTS {schema}.sessions (
626 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
627 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
628 _source VARCHAR DEFAULT '{remote_name}'
629 );
630 -- V5: attempts table (invocation start)
631 CREATE TABLE IF NOT EXISTS {schema}.attempts (
632 id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
633 tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
634 executable VARCHAR, format_hint VARCHAR, metadata MAP(VARCHAR, JSON), date DATE,
635 _source VARCHAR DEFAULT '{remote_name}'
636 );
637 -- V5: outcomes table (invocation completion)
638 CREATE TABLE IF NOT EXISTS {schema}.outcomes (
639 attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
640 signal INTEGER, timeout BOOLEAN, metadata MAP(VARCHAR, JSON), date DATE,
641 _source VARCHAR DEFAULT '{remote_name}'
642 );
643 -- V5: invocations VIEW for compatibility
644 -- Note: metadata not included due to MAP_CONCAT complexity; use attempts/outcomes directly
645 CREATE OR REPLACE VIEW {schema}.invocations AS
646 SELECT
647 a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
648 a.tag, a.source_client, a.machine_id, a.hostname,
649 a.executable, a.format_hint,
650 o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
651 a.date,
652 CASE
653 WHEN o.attempt_id IS NULL THEN 'pending'
654 WHEN o.exit_code IS NULL THEN 'orphaned'
655 ELSE 'completed'
656 END AS status,
657 a._source
658 FROM {schema}.attempts a
659 LEFT JOIN {schema}.outcomes o ON a.id = o.attempt_id;
660 CREATE TABLE IF NOT EXISTS {schema}.outputs (
661 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
662 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
663 content_type VARCHAR, date DATE,
664 _source VARCHAR DEFAULT '{remote_name}'
665 );
666 CREATE TABLE IF NOT EXISTS {schema}.events (
667 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
668 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
669 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
670 status VARCHAR, format_used VARCHAR, date DATE,
671 _source VARCHAR DEFAULT '{remote_name}'
672 );
673 "#,
674 schema = schema,
675 remote_name = remote_name.replace('\'', "''")
676 );
677 conn.execute_batch(&sql)?;
678 Ok(())
679}
680
681fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
683 since
684 .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
685 .unwrap_or_default()
686}
687
688fn client_clause(client_id: Option<&str>) -> String {
690 client_id
691 .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
692 .unwrap_or_default()
693}
694
695fn count_sessions_to_push(
699 conn: &Connection,
700 remote_schema: &str,
701 since: Option<NaiveDate>,
702) -> Result<usize> {
703 let since_filter = since_clause(since, "a.timestamp");
704
705 let sql = format!(
706 r#"
707 SELECT COUNT(DISTINCT s.session_id)
708 FROM local.sessions s
709 JOIN local.attempts a ON a.session_id = s.session_id
710 WHERE NOT EXISTS (
711 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
712 )
713 {since}
714 "#,
715 remote = remote_schema,
716 since = since_filter,
717 );
718
719 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
720 Ok(count as usize)
721}
722
723fn count_table_to_push(
727 conn: &Connection,
728 table: &str,
729 remote_schema: &str,
730 since: Option<NaiveDate>,
731) -> Result<usize> {
732 let sql = match table {
733 "attempts" => {
735 let since_filter = since_clause(since, "l.timestamp");
736 format!(
737 r#"
738 SELECT COUNT(*)
739 FROM local.attempts l
740 WHERE NOT EXISTS (
741 SELECT 1 FROM {remote}.attempts r WHERE r.id = l.id
742 )
743 {since}
744 "#,
745 remote = remote_schema,
746 since = since_filter,
747 )
748 }
749 "outcomes" => {
751 let since_filter = since_clause(since, "a.timestamp");
752 format!(
753 r#"
754 SELECT COUNT(*)
755 FROM local.outcomes l
756 JOIN local.attempts a ON a.id = l.attempt_id
757 WHERE NOT EXISTS (
758 SELECT 1 FROM {remote}.outcomes r WHERE r.attempt_id = l.attempt_id
759 )
760 {since}
761 "#,
762 remote = remote_schema,
763 since = since_filter,
764 )
765 }
766 "outputs" | "events" => {
767 let since_filter = since_clause(since, "a.timestamp");
769 format!(
770 r#"
771 SELECT COUNT(*)
772 FROM local.{table} l
773 JOIN local.attempts a ON a.id = l.invocation_id
774 WHERE NOT EXISTS (
775 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
776 )
777 {since}
778 "#,
779 table = table,
780 remote = remote_schema,
781 since = since_filter,
782 )
783 }
784 _ => {
785 format!(
786 r#"
787 SELECT COUNT(*)
788 FROM local.{table} l
789 WHERE NOT EXISTS (
790 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
791 )
792 "#,
793 table = table,
794 remote = remote_schema,
795 )
796 }
797 };
798
799 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
800 Ok(count as usize)
801}
802
803fn push_sessions(
806 conn: &Connection,
807 remote_schema: &str,
808 since: Option<NaiveDate>,
809) -> Result<usize> {
810 let since_filter = since_clause(since, "a.timestamp");
811
812 let sql = format!(
813 r#"
814 INSERT INTO {remote}.sessions
815 SELECT DISTINCT s.*
816 FROM local.sessions s
817 JOIN local.attempts a ON a.session_id = s.session_id
818 WHERE NOT EXISTS (
819 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
820 )
821 {since}
822 "#,
823 remote = remote_schema,
824 since = since_filter,
825 );
826
827 let count = conn.execute(&sql, [])?;
828 Ok(count)
829}
830
831fn push_table(
834 conn: &Connection,
835 table: &str,
836 remote_schema: &str,
837 since: Option<NaiveDate>,
838) -> Result<usize> {
839 let sql = match table {
840 "attempts" => {
842 let since_filter = since_clause(since, "l.timestamp");
843 format!(
844 r#"
845 INSERT INTO {remote}.attempts
846 SELECT *
847 FROM local.attempts l
848 WHERE NOT EXISTS (
849 SELECT 1 FROM {remote}.attempts r WHERE r.id = l.id
850 )
851 {since}
852 "#,
853 remote = remote_schema,
854 since = since_filter,
855 )
856 }
857 "outcomes" => {
859 let since_filter = since_clause(since, "a.timestamp");
860 format!(
861 r#"
862 INSERT INTO {remote}.outcomes
863 SELECT l.*
864 FROM local.outcomes l
865 JOIN local.attempts a ON a.id = l.attempt_id
866 WHERE NOT EXISTS (
867 SELECT 1 FROM {remote}.outcomes r WHERE r.attempt_id = l.attempt_id
868 )
869 {since}
870 "#,
871 remote = remote_schema,
872 since = since_filter,
873 )
874 }
875 "outputs" | "events" => {
876 let since_filter = since_clause(since, "a.timestamp");
878 format!(
879 r#"
880 INSERT INTO {remote}.{table}
881 SELECT l.*
882 FROM local.{table} l
883 JOIN local.attempts a ON a.id = l.invocation_id
884 WHERE NOT EXISTS (
885 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
886 )
887 {since}
888 "#,
889 table = table,
890 remote = remote_schema,
891 since = since_filter,
892 )
893 }
894 _ => {
895 format!(
896 r#"
897 INSERT INTO {remote}.{table}
898 SELECT *
899 FROM local.{table} l
900 WHERE NOT EXISTS (
901 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
902 )
903 "#,
904 table = table,
905 remote = remote_schema,
906 )
907 }
908 };
909
910 let count = conn.execute(&sql, [])?;
911 Ok(count)
912}
913
914fn pull_sessions(
916 conn: &Connection,
917 remote_schema: &str,
918 cached_schema: &str,
919 since: Option<NaiveDate>,
920 client_id: Option<&str>,
921) -> Result<usize> {
922 let since_filter = since_clause(since, "r.registered_at");
923 let client_filter = client_clause(client_id);
924
925 let sql = format!(
926 r#"
927 INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
928 SELECT r.*
929 FROM {remote}.sessions r
930 WHERE NOT EXISTS (
931 SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
932 )
933 {since}
934 {client}
935 "#,
936 cached = cached_schema,
937 remote = remote_schema,
938 since = since_filter,
939 client = client_filter,
940 );
941
942 let count = conn.execute(&sql, [])?;
943 Ok(count)
944}
945
946fn pull_table(
949 conn: &Connection,
950 table: &str,
951 remote_schema: &str,
952 cached_schema: &str,
953 since: Option<NaiveDate>,
954 client_id: Option<&str>,
955) -> Result<usize> {
956 let client_filter = client_clause(client_id);
957
958 let sql = match table {
959 "attempts" => {
961 let since_filter = since_clause(since, "r.timestamp");
962 format!(
963 r#"
964 INSERT INTO {cached}.attempts (id, timestamp, cmd, cwd, session_id, tag, source_client, machine_id, hostname, executable, format_hint, metadata, date)
965 SELECT r.*
966 FROM {remote}.attempts r
967 WHERE NOT EXISTS (
968 SELECT 1 FROM {cached}.attempts l WHERE l.id = r.id
969 )
970 {since}
971 {client}
972 "#,
973 cached = cached_schema,
974 remote = remote_schema,
975 since = since_filter,
976 client = if client_id.is_some() {
977 format!("AND r.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
978 } else {
979 String::new()
980 },
981 )
982 }
983 "outcomes" => {
985 let since_filter = since_clause(since, "a.timestamp");
986 format!(
987 r#"
988 INSERT INTO {cached}.outcomes (attempt_id, completed_at, exit_code, duration_ms, signal, timeout, metadata, date)
989 SELECT r.*
990 FROM {remote}.outcomes r
991 JOIN {remote}.attempts a ON a.id = r.attempt_id
992 WHERE NOT EXISTS (
993 SELECT 1 FROM {cached}.outcomes l WHERE l.attempt_id = r.attempt_id
994 )
995 {since}
996 {client}
997 "#,
998 cached = cached_schema,
999 remote = remote_schema,
1000 since = since_filter,
1001 client = if client_id.is_some() {
1002 format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1003 } else {
1004 String::new()
1005 },
1006 )
1007 }
1008 "outputs" => {
1010 let since_filter = since_clause(since, "a.timestamp");
1011 format!(
1012 r#"
1013 INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1014 SELECT r.*
1015 FROM {remote}.outputs r
1016 JOIN {remote}.attempts a ON a.id = r.invocation_id
1017 WHERE NOT EXISTS (
1018 SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1019 )
1020 {since}
1021 {client}
1022 "#,
1023 cached = cached_schema,
1024 remote = remote_schema,
1025 since = since_filter,
1026 client = if client_id.is_some() {
1027 format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1028 } else {
1029 String::new()
1030 },
1031 )
1032 }
1033 "events" => {
1034 let since_filter = since_clause(since, "a.timestamp");
1035 format!(
1036 r#"
1037 INSERT INTO {cached}.events (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
1038 SELECT r.*
1039 FROM {remote}.events r
1040 JOIN {remote}.attempts a ON a.id = r.invocation_id
1041 WHERE NOT EXISTS (
1042 SELECT 1 FROM {cached}.events l WHERE l.id = r.id
1043 )
1044 {since}
1045 {client}
1046 "#,
1047 cached = cached_schema,
1048 remote = remote_schema,
1049 since = since_filter,
1050 client = if client_id.is_some() {
1051 format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1052 } else {
1053 String::new()
1054 },
1055 )
1056 }
1057 _ => {
1058 format!(
1059 r#"
1060 INSERT INTO {cached}.{table}
1061 SELECT r.*
1062 FROM {remote}.{table} r
1063 WHERE NOT EXISTS (
1064 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
1065 )
1066 {client}
1067 "#,
1068 table = table,
1069 cached = cached_schema,
1070 remote = remote_schema,
1071 client = client_filter,
1072 )
1073 }
1074 };
1075
1076 let count = conn.execute(&sql, [])?;
1077 Ok(count)
1078}
1079
1080fn count_blobs_to_push(
1083 conn: &Connection,
1084 remote_schema: &str,
1085 since: Option<NaiveDate>,
1086) -> Result<BlobStats> {
1087 let since_filter = since_clause(since, "a.timestamp");
1088
1089 let sql = format!(
1090 r#"
1091 SELECT COUNT(DISTINCT o.content_hash), COALESCE(SUM(o.byte_length), 0)
1092 FROM local.outputs o
1093 JOIN local.attempts a ON a.id = o.invocation_id
1094 WHERE o.storage_type = 'blob'
1095 AND NOT EXISTS (
1096 SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
1097 )
1098 {since}
1099 "#,
1100 remote = remote_schema,
1101 since = since_filter,
1102 );
1103
1104 let (count, bytes): (i64, i64) = conn.query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1105 Ok(BlobStats {
1106 count: count as usize,
1107 bytes: bytes as u64,
1108 ..Default::default()
1109 })
1110}
1111
1112fn get_blobs_to_push(
1115 conn: &Connection,
1116 remote_schema: &str,
1117 since: Option<NaiveDate>,
1118) -> Result<Vec<BlobInfo>> {
1119 let since_filter = since_clause(since, "a.timestamp");
1120
1121 let sql = format!(
1122 r#"
1123 SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1124 FROM local.outputs o
1125 JOIN local.attempts a ON a.id = o.invocation_id
1126 JOIN blob_registry b ON b.content_hash = o.content_hash
1127 WHERE o.storage_type = 'blob'
1128 AND NOT EXISTS (
1129 SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
1130 )
1131 {since}
1132 "#,
1133 remote = remote_schema,
1134 since = since_filter,
1135 );
1136
1137 let mut stmt = conn.prepare(&sql)?;
1138 let blobs = stmt
1139 .query_map([], |row| {
1140 Ok(BlobInfo {
1141 content_hash: row.get(0)?,
1142 storage_path: row.get(1)?,
1143 byte_length: row.get(2)?,
1144 })
1145 })?
1146 .filter_map(|r| r.ok())
1147 .collect();
1148
1149 Ok(blobs)
1150}
1151
1152fn get_blobs_to_pull(
1154 conn: &Connection,
1155 remote_schema: &str,
1156 cached_schema: &str,
1157) -> Result<Vec<BlobInfo>> {
1158 let sql = format!(
1159 r#"
1160 SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1161 FROM {cached}.outputs o
1162 JOIN {remote}.blob_registry b ON b.content_hash = o.content_hash
1163 WHERE o.storage_type = 'blob'
1164 AND NOT EXISTS (
1165 SELECT 1 FROM blob_registry r WHERE r.content_hash = o.content_hash
1166 )
1167 "#,
1168 cached = cached_schema,
1169 remote = remote_schema,
1170 );
1171
1172 let mut stmt = conn.prepare(&sql)?;
1173 let blobs = stmt
1174 .query_map([], |row| {
1175 Ok(BlobInfo {
1176 content_hash: row.get(0)?,
1177 storage_path: row.get(1)?,
1178 byte_length: row.get(2)?,
1179 })
1180 })?
1181 .filter_map(|r| r.ok())
1182 .collect();
1183
1184 Ok(blobs)
1185}
1186
1187fn push_outputs(
1190 conn: &Connection,
1191 remote_schema: &str,
1192 since: Option<NaiveDate>,
1193 _sync_blobs: bool,
1194) -> Result<usize> {
1195 let since_filter = since_clause(since, "a.timestamp");
1196
1197 let sql = format!(
1200 r#"
1201 INSERT INTO {remote}.outputs
1202 SELECT l.*
1203 FROM local.outputs l
1204 JOIN local.attempts a ON a.id = l.invocation_id
1205 WHERE NOT EXISTS (
1206 SELECT 1 FROM {remote}.outputs r WHERE r.id = l.id
1207 )
1208 {since}
1209 "#,
1210 remote = remote_schema,
1211 since = since_filter,
1212 );
1213
1214 let count = conn.execute(&sql, [])?;
1215 Ok(count)
1216}
1217
1218fn pull_outputs(
1221 conn: &Connection,
1222 remote_schema: &str,
1223 cached_schema: &str,
1224 since: Option<NaiveDate>,
1225 client_id: Option<&str>,
1226 _sync_blobs: bool,
1227) -> Result<usize> {
1228 let since_filter = since_clause(since, "a.timestamp");
1229 let client_filter = if client_id.is_some() {
1230 format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1231 } else {
1232 String::new()
1233 };
1234
1235 let sql = format!(
1238 r#"
1239 INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1240 SELECT r.*
1241 FROM {remote}.outputs r
1242 JOIN {remote}.attempts a ON a.id = r.invocation_id
1243 WHERE NOT EXISTS (
1244 SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1245 )
1246 {since}
1247 {client}
1248 "#,
1249 cached = cached_schema,
1250 remote = remote_schema,
1251 since = since_filter,
1252 client = client_filter,
1253 );
1254
1255 let count = conn.execute(&sql, [])?;
1256 Ok(count)
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261 use super::*;
1262 use crate::config::{RemoteConfig, RemoteMode, RemoteType};
1263 use crate::init::initialize;
1264 use crate::schema::InvocationRecord;
1265 use crate::store::{ConnectionOptions, Store};
1266 use crate::Config;
1267 use tempfile::TempDir;
1268
1269 fn setup_store_duckdb() -> (TempDir, Store) {
1270 let tmp = TempDir::new().unwrap();
1271 let config = Config::with_duckdb_mode(tmp.path());
1272 initialize(&config).unwrap();
1273 let store = Store::open(config).unwrap();
1274 (tmp, store)
1275 }
1276
1277 fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
1278 RemoteConfig {
1279 name: name.to_string(),
1280 remote_type: RemoteType::File,
1281 uri: path.to_string_lossy().to_string(),
1282 mode: RemoteMode::ReadWrite,
1283 auto_attach: true,
1284 credential_provider: None,
1285 }
1286 }
1287
1288 #[test]
1291 fn test_parse_since_days() {
1292 let today = Utc::now().date_naive();
1293 let result = parse_since("7d").unwrap();
1294 assert_eq!(result, today - TimeDelta::days(7));
1295 }
1296
1297 #[test]
1298 fn test_parse_since_weeks() {
1299 let today = Utc::now().date_naive();
1300 let result = parse_since("2w").unwrap();
1301 assert_eq!(result, today - TimeDelta::days(14));
1302 }
1303
1304 #[test]
1305 fn test_parse_since_months() {
1306 let today = Utc::now().date_naive();
1307 let result = parse_since("1m").unwrap();
1308 assert_eq!(result, today - TimeDelta::days(30));
1309 }
1310
1311 #[test]
1312 fn test_parse_since_date() {
1313 let result = parse_since("2024-01-15").unwrap();
1314 assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
1315 }
1316
1317 #[test]
1318 fn test_parse_since_invalid() {
1319 assert!(parse_since("invalid").is_err());
1320 }
1321
1322 #[test]
1325 fn test_push_to_file_remote() {
1326 let (tmp, store) = setup_store_duckdb();
1327
1328 let inv = InvocationRecord::new(
1330 "test-session",
1331 "echo hello",
1332 "/home/user",
1333 0,
1334 "test@client",
1335 );
1336 store.write_invocation(&inv).unwrap();
1337
1338 let remote_path = tmp.path().join("remote.duckdb");
1340 let remote = create_file_remote("test", &remote_path);
1341
1342 let stats = store.push(&remote, PushOptions::default()).unwrap();
1344
1345 assert_eq!(stats.invocations, 1);
1346 assert!(remote_path.exists(), "Remote database file should be created");
1347 }
1348
1349 #[test]
1350 fn test_push_is_idempotent() {
1351 let (tmp, store) = setup_store_duckdb();
1352
1353 let inv = InvocationRecord::new(
1355 "test-session",
1356 "echo hello",
1357 "/home/user",
1358 0,
1359 "test@client",
1360 );
1361 store.write_invocation(&inv).unwrap();
1362
1363 let remote_path = tmp.path().join("remote.duckdb");
1365 let remote = create_file_remote("test", &remote_path);
1366
1367 let stats1 = store.push(&remote, PushOptions::default()).unwrap();
1369 let stats2 = store.push(&remote, PushOptions::default()).unwrap();
1370
1371 assert_eq!(stats1.invocations, 1);
1373 assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
1374 }
1375
1376 #[test]
1377 fn test_push_dry_run() {
1378 let (tmp, store) = setup_store_duckdb();
1379
1380 let inv = InvocationRecord::new(
1382 "test-session",
1383 "echo hello",
1384 "/home/user",
1385 0,
1386 "test@client",
1387 );
1388 store.write_invocation(&inv).unwrap();
1389
1390 let remote_path = tmp.path().join("remote.duckdb");
1392 let remote = create_file_remote("test", &remote_path);
1393
1394 let dry_stats = store
1396 .push(
1397 &remote,
1398 PushOptions {
1399 dry_run: true,
1400 ..Default::default()
1401 },
1402 )
1403 .unwrap();
1404
1405 assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
1406
1407 let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
1409 assert_eq!(actual_stats.invocations, 1);
1410 }
1411
1412 #[test]
1413 fn test_pull_from_file_remote() {
1414 let (tmp, store) = setup_store_duckdb();
1415
1416 let inv = InvocationRecord::new(
1418 "test-session",
1419 "echo hello",
1420 "/home/user",
1421 0,
1422 "test@client",
1423 );
1424 store.write_invocation(&inv).unwrap();
1425
1426 let remote_path = tmp.path().join("remote.duckdb");
1427 let remote = create_file_remote("test", &remote_path);
1428 store.push(&remote, PushOptions::default()).unwrap();
1429
1430 let conn = store.connection().unwrap();
1432 conn.execute("DELETE FROM local.outcomes; DELETE FROM local.attempts", []).unwrap();
1433 drop(conn);
1434
1435 let stats = store.pull(&remote, PullOptions::default()).unwrap();
1437
1438 assert_eq!(stats.invocations, 1, "Should pull the invocation back");
1439
1440 let conn = store.connection().unwrap();
1442 let count: i64 = conn
1443 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1444 .unwrap();
1445 assert_eq!(count, 1, "Data should be in cached schema");
1446 }
1447
1448 #[test]
1449 fn test_pull_is_idempotent() {
1450 let (tmp, store) = setup_store_duckdb();
1451
1452 let inv = InvocationRecord::new(
1454 "test-session",
1455 "echo hello",
1456 "/home/user",
1457 0,
1458 "test@client",
1459 );
1460 store.write_invocation(&inv).unwrap();
1461
1462 let remote_path = tmp.path().join("remote.duckdb");
1463 let remote = create_file_remote("test", &remote_path);
1464 store.push(&remote, PushOptions::default()).unwrap();
1465
1466 let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
1468 let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
1469
1470 assert_eq!(stats1.invocations, 1);
1471 assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
1472 }
1473
1474 #[test]
1477 fn test_remote_name_with_hyphen() {
1478 let (tmp, store) = setup_store_duckdb();
1479
1480 let inv = InvocationRecord::new(
1482 "test-session",
1483 "echo hello",
1484 "/home/user",
1485 0,
1486 "test@client",
1487 );
1488 store.write_invocation(&inv).unwrap();
1489
1490 let remote_path = tmp.path().join("my-team-remote.duckdb");
1492 let remote = create_file_remote("my-team", &remote_path);
1493
1494 let stats = store.push(&remote, PushOptions::default()).unwrap();
1496 assert_eq!(stats.invocations, 1);
1497
1498 let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
1500 assert_eq!(pull_stats.invocations, 1);
1502 }
1503
1504 #[test]
1505 fn test_remote_name_with_dots() {
1506 let (tmp, store) = setup_store_duckdb();
1507
1508 let inv = InvocationRecord::new(
1509 "test-session",
1510 "echo hello",
1511 "/home/user",
1512 0,
1513 "test@client",
1514 );
1515 store.write_invocation(&inv).unwrap();
1516
1517 let remote_path = tmp.path().join("team.v2.duckdb");
1519 let remote = create_file_remote("team.v2", &remote_path);
1520
1521 let stats = store.push(&remote, PushOptions::default()).unwrap();
1522 assert_eq!(stats.invocations, 1);
1523 }
1524
1525 #[test]
1528 fn test_connection_minimal_vs_full() {
1529 let (_tmp, store) = setup_store_duckdb();
1530
1531 let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
1533 let count: i64 = conn_minimal
1534 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1535 .unwrap();
1536 assert_eq!(count, 0);
1537 drop(conn_minimal);
1538
1539 let conn_full = store.connect(ConnectionOptions::full()).unwrap();
1541 let count: i64 = conn_full
1542 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1543 .unwrap();
1544 assert_eq!(count, 0);
1545 }
1546
1547 #[test]
1548 fn test_multiple_sequential_connections() {
1549 let (_tmp, store) = setup_store_duckdb();
1550
1551 for i in 0..5 {
1554 let conn = store.connection().unwrap();
1555 let count: i64 = conn
1556 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1557 .unwrap();
1558 assert_eq!(count, 0, "Connection {} should work", i);
1559 drop(conn);
1560 }
1561
1562 let inv = InvocationRecord::new(
1564 "test-session",
1565 "echo hello",
1566 "/home/user",
1567 0,
1568 "test@client",
1569 );
1570 store.write_invocation(&inv).unwrap();
1571
1572 for i in 0..3 {
1574 let conn = store.connection().unwrap();
1575 let count: i64 = conn
1576 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1577 .unwrap();
1578 assert_eq!(count, 1, "Connection {} should see the data", i);
1579 drop(conn);
1580 }
1581 }
1582
1583 #[test]
1586 fn test_caches_schema_views_work() {
1587 let (tmp, store) = setup_store_duckdb();
1588
1589 let conn = store.connection().unwrap();
1591 let count: i64 = conn
1592 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1593 .unwrap();
1594 assert_eq!(count, 0);
1595 drop(conn);
1596
1597 let inv = InvocationRecord::new(
1599 "test-session",
1600 "echo hello",
1601 "/home/user",
1602 0,
1603 "test@client",
1604 );
1605 store.write_invocation(&inv).unwrap();
1606
1607 let remote_path = tmp.path().join("remote.duckdb");
1608 let remote = create_file_remote("test", &remote_path);
1609 store.push(&remote, PushOptions::default()).unwrap();
1610
1611 store.pull(&remote, PullOptions::default()).unwrap();
1613
1614 let conn = store.connection().unwrap();
1616 store.rebuild_caches_schema(&conn).unwrap();
1617
1618 let count: i64 = conn
1620 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1621 .unwrap();
1622 assert_eq!(count, 1, "caches should include pulled data after rebuild");
1623 }
1624
1625 #[test]
1626 fn test_main_schema_unions_local_and_caches() {
1627 let (tmp, store) = setup_store_duckdb();
1628
1629 let inv1 = InvocationRecord::new(
1631 "test-session",
1632 "local command",
1633 "/home/user",
1634 0,
1635 "local@client",
1636 );
1637 store.write_invocation(&inv1).unwrap();
1638
1639 let remote_path = tmp.path().join("remote.duckdb");
1641 let remote = create_file_remote("team", &remote_path);
1642
1643 store.push(&remote, PushOptions::default()).unwrap();
1645
1646 let conn = store.connection().unwrap();
1648 conn.execute("DELETE FROM local.outcomes; DELETE FROM local.attempts", []).unwrap();
1649 drop(conn);
1650
1651 store.pull(&remote, PullOptions::default()).unwrap();
1652
1653 let conn = store.connection().unwrap();
1655 store.rebuild_caches_schema(&conn).unwrap();
1656
1657 let inv2 = InvocationRecord::new(
1659 "test-session-2",
1660 "new local command",
1661 "/home/user",
1662 0,
1663 "local@client",
1664 );
1665 drop(conn);
1666 store.write_invocation(&inv2).unwrap();
1667
1668 let conn = store.connection().unwrap();
1670 let count: i64 = conn
1671 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1672 .unwrap();
1673 assert_eq!(count, 2, "main should union local + caches");
1674 }
1675
1676 fn setup_store_parquet() -> (TempDir, Store) {
1680 let tmp = TempDir::new().unwrap();
1681 let config = Config::with_root(tmp.path()); initialize(&config).unwrap();
1683 let store = Store::open(config).unwrap();
1684 (tmp, store)
1685 }
1686
1687 #[test]
1688 fn test_heterogeneous_parquet_local_duckdb_remote() {
1689 let (_local_tmp, local_store) = setup_store_parquet();
1691
1692 let remote_tmp = TempDir::new().unwrap();
1694 let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1695 initialize(&remote_config).unwrap();
1696 let remote_store = Store::open(remote_config).unwrap();
1697
1698 let remote_inv = InvocationRecord::new(
1700 "remote-session",
1701 "remote command",
1702 "/home/remote",
1703 0,
1704 "remote@client",
1705 );
1706 remote_store.write_invocation(&remote_inv).unwrap();
1707
1708 let remote_conn = remote_store.connection().unwrap();
1710 let remote_count: i64 = remote_conn
1711 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1712 .unwrap();
1713 assert_eq!(remote_count, 1, "Remote should have data in local schema");
1714 drop(remote_conn);
1715
1716 let local_inv = InvocationRecord::new(
1718 "local-session",
1719 "local command",
1720 "/home/local",
1721 0,
1722 "local@client",
1723 );
1724 local_store.write_invocation(&local_inv).unwrap();
1725
1726 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1728 let remote_config = RemoteConfig {
1729 name: "duckdb-store".to_string(),
1730 remote_type: RemoteType::File,
1731 uri: format!("file://{}", remote_db_path.display()),
1732 mode: RemoteMode::ReadOnly,
1733 auto_attach: true,
1734 credential_provider: None,
1735 };
1736
1737 let conn = local_store.connection_with_options(false).unwrap();
1739 local_store.attach_remote(&conn, &remote_config).unwrap();
1740
1741 let schema = remote_config.quoted_schema_name();
1744 let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1745 assert_eq!(table_prefix, "local.", "Should detect DuckDB mode remote has local. prefix");
1746
1747 let remote_count: i64 = conn
1749 .query_row(
1750 &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1751 [],
1752 |r| r.get(0),
1753 )
1754 .unwrap();
1755 assert_eq!(remote_count, 1, "Should be able to query DuckDB remote from Parquet local");
1756 }
1757
1758 #[test]
1759 fn test_heterogeneous_duckdb_local_parquet_remote() {
1760 let (_local_tmp, local_store) = setup_store_duckdb();
1762
1763 let remote_tmp = TempDir::new().unwrap();
1765 let remote_config = Config::with_root(remote_tmp.path());
1766 initialize(&remote_config).unwrap();
1767 let remote_store = Store::open(remote_config).unwrap();
1768
1769 let remote_inv = InvocationRecord::new(
1771 "remote-session",
1772 "remote command",
1773 "/home/remote",
1774 0,
1775 "remote@client",
1776 );
1777 remote_store.write_invocation(&remote_inv).unwrap();
1778
1779 let local_inv = InvocationRecord::new(
1781 "local-session",
1782 "local command",
1783 "/home/local",
1784 0,
1785 "local@client",
1786 );
1787 local_store.write_invocation(&local_inv).unwrap();
1788
1789 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1791 let remote_config = RemoteConfig {
1792 name: "parquet-store".to_string(),
1793 remote_type: RemoteType::File,
1794 uri: format!("file://{}", remote_db_path.display()),
1795 mode: RemoteMode::ReadOnly,
1796 auto_attach: true,
1797 credential_provider: None,
1798 };
1799
1800 let conn = local_store.connection_with_options(false).unwrap();
1802 local_store.attach_remote(&conn, &remote_config).unwrap();
1803
1804 let schema = remote_config.quoted_schema_name();
1806 let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1807 assert_eq!(table_prefix, "local.", "BIRD databases have local schema in both modes");
1808
1809 let remote_count: i64 = conn
1811 .query_row(
1812 &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1813 [],
1814 |r| r.get(0),
1815 )
1816 .unwrap();
1817 assert_eq!(remote_count, 1, "Should be able to query Parquet remote from DuckDB local");
1818 }
1819
1820 #[test]
1821 fn test_heterogeneous_unified_views() {
1822 let (local_tmp, local_store) = setup_store_parquet();
1824
1825 let remote_tmp = TempDir::new().unwrap();
1827 let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1828 initialize(&remote_config).unwrap();
1829 let remote_store = Store::open(remote_config).unwrap();
1830
1831 let remote_inv = InvocationRecord::new(
1833 "remote-session",
1834 "remote-specific-cmd",
1835 "/home/remote",
1836 42,
1837 "remote@client",
1838 );
1839 remote_store.write_invocation(&remote_inv).unwrap();
1840
1841 let local_inv = InvocationRecord::new(
1843 "local-session",
1844 "local-specific-cmd",
1845 "/home/local",
1846 0,
1847 "local@client",
1848 );
1849 local_store.write_invocation(&local_inv).unwrap();
1850
1851 let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1853 let mut config = Config::with_root(local_tmp.path());
1854 config.remotes.push(RemoteConfig {
1855 name: "heterogeneous-test".to_string(),
1856 remote_type: RemoteType::File,
1857 uri: format!("file://{}", remote_db_path.display()),
1858 mode: RemoteMode::ReadOnly,
1859 auto_attach: true,
1860 credential_provider: None,
1861 });
1862
1863 let store = Store::open(config).unwrap();
1865
1866 let conn = store.connection().unwrap();
1868
1869 let local_count: i64 = conn
1871 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1872 .unwrap();
1873 assert_eq!(local_count, 1, "Local should have 1 record");
1874
1875 let unified_count: i64 = conn
1877 .query_row("SELECT COUNT(*) FROM unified.invocations", [], |r| r.get(0))
1878 .unwrap();
1879 assert_eq!(unified_count, 2, "Unified view should have local + remote records");
1880
1881 let cmds: Vec<String> = conn
1883 .prepare("SELECT cmd FROM unified.invocations ORDER BY cmd")
1884 .unwrap()
1885 .query_map([], |r| r.get(0))
1886 .unwrap()
1887 .filter_map(|r| r.ok())
1888 .collect();
1889 assert_eq!(cmds.len(), 2);
1890 assert!(cmds.contains(&"local-specific-cmd".to_string()));
1891 assert!(cmds.contains(&"remote-specific-cmd".to_string()));
1892 }
1893
1894 #[test]
1895 fn test_detect_remote_table_path_standalone_db() {
1896 let (_tmp, store) = setup_store_duckdb();
1898
1899 let standalone_tmp = TempDir::new().unwrap();
1901 let standalone_db_path = standalone_tmp.path().join("standalone.duckdb");
1902 {
1903 let conn = duckdb::Connection::open(&standalone_db_path).unwrap();
1904 conn.execute(
1905 "CREATE TABLE invocations (id UUID, cmd VARCHAR)",
1906 [],
1907 )
1908 .unwrap();
1909 conn.execute(
1910 "INSERT INTO invocations VALUES (gen_random_uuid(), 'test')",
1911 [],
1912 )
1913 .unwrap();
1914 }
1915
1916 let remote = RemoteConfig {
1918 name: "standalone".to_string(),
1919 remote_type: RemoteType::File,
1920 uri: format!("file://{}", standalone_db_path.display()),
1921 mode: RemoteMode::ReadOnly,
1922 auto_attach: true,
1923 credential_provider: None,
1924 };
1925
1926 let conn = store.connection_with_options(false).unwrap();
1927 store.attach_remote(&conn, &remote).unwrap();
1928
1929 let schema = remote.quoted_schema_name();
1931 let table_prefix = store.detect_remote_table_path(&conn, &schema);
1932 assert_eq!(table_prefix, "", "Standalone DB should have no prefix");
1933
1934 let count: i64 = conn
1936 .query_row(
1937 &format!("SELECT COUNT(*) FROM {}.invocations", schema),
1938 [],
1939 |r| r.get(0),
1940 )
1941 .unwrap();
1942 assert_eq!(count, 1);
1943 }
1944
1945 #[test]
1946 fn test_push_to_readonly_remote_fails() {
1947 let (_tmp, store) = setup_store_duckdb();
1948
1949 let inv = InvocationRecord::new(
1951 "test-session",
1952 "echo hello",
1953 "/home/user",
1954 0,
1955 "test@client",
1956 );
1957 store.write_invocation(&inv).unwrap();
1958
1959 let remote_tmp = TempDir::new().unwrap();
1961 let remote_path = remote_tmp.path().join("remote.duckdb");
1962 let remote = RemoteConfig {
1963 name: "readonly".to_string(),
1964 remote_type: RemoteType::File,
1965 uri: format!("file://{}", remote_path.display()),
1966 mode: RemoteMode::ReadOnly,
1967 auto_attach: true,
1968 credential_provider: None,
1969 };
1970
1971 let result = store.push(&remote, PushOptions::default());
1973 assert!(result.is_err(), "Push to read-only remote should fail");
1974 assert!(
1975 result.unwrap_err().to_string().contains("Cannot push to read-only"),
1976 "Error should mention read-only"
1977 );
1978 }
1979
1980 #[test]
1981 fn test_push_to_readonly_remote_dry_run_returns_empty() {
1982 let (_tmp, store) = setup_store_duckdb();
1983
1984 let inv = InvocationRecord::new(
1986 "test-session",
1987 "echo hello",
1988 "/home/user",
1989 0,
1990 "test@client",
1991 );
1992 store.write_invocation(&inv).unwrap();
1993
1994 let remote_tmp = TempDir::new().unwrap();
1996 let remote_path = remote_tmp.path().join("remote.duckdb");
1997 let remote = RemoteConfig {
1998 name: "readonly".to_string(),
1999 remote_type: RemoteType::File,
2000 uri: format!("file://{}", remote_path.display()),
2001 mode: RemoteMode::ReadOnly,
2002 auto_attach: true,
2003 credential_provider: None,
2004 };
2005
2006 let stats = store.push(&remote, PushOptions { dry_run: true, ..Default::default() }).unwrap();
2008 assert_eq!(stats.invocations, 0);
2009 assert_eq!(stats.sessions, 0);
2010 }
2011}