1use chrono::{NaiveDate, TimeDelta, Utc};
14use duckdb::Connection;
15
16use crate::{Error, RemoteConfig, Result};
17
18#[derive(Debug, Default)]
20pub struct PushStats {
21 pub sessions: usize,
22 pub invocations: usize,
23 pub outputs: usize,
24 pub events: usize,
25}
26
27impl std::fmt::Display for PushStats {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 write!(
30 f,
31 "{} sessions, {} invocations, {} outputs, {} events",
32 self.sessions, self.invocations, self.outputs, self.events
33 )
34 }
35}
36
37#[derive(Debug, Default)]
39pub struct PullStats {
40 pub sessions: usize,
41 pub invocations: usize,
42 pub outputs: usize,
43 pub events: usize,
44}
45
46impl std::fmt::Display for PullStats {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(
49 f,
50 "{} sessions, {} invocations, {} outputs, {} events",
51 self.sessions, self.invocations, self.outputs, self.events
52 )
53 }
54}
55
56#[derive(Debug, Default)]
58pub struct PushOptions {
59 pub since: Option<NaiveDate>,
61 pub dry_run: bool,
63}
64
65#[derive(Debug, Default)]
67pub struct PullOptions {
68 pub since: Option<NaiveDate>,
70 pub client_id: Option<String>,
72}
73
74pub fn parse_since(s: &str) -> Result<NaiveDate> {
80 let s = s.trim();
81
82 if let Some(days) = parse_duration_days(s) {
84 let date = Utc::now().date_naive() - TimeDelta::days(days);
85 return Ok(date);
86 }
87
88 NaiveDate::parse_from_str(s, "%Y-%m-%d")
90 .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
91}
92
93fn parse_duration_days(s: &str) -> Option<i64> {
95 let s = s.trim().to_lowercase();
96
97 if let Some(num) = s.strip_suffix('d') {
98 num.parse::<i64>().ok()
99 } else if let Some(num) = s.strip_suffix('w') {
100 num.parse::<i64>().ok().map(|n| n * 7)
101 } else if let Some(num) = s.strip_suffix('m') {
102 num.parse::<i64>().ok().map(|n| n * 30)
103 } else {
104 None
105 }
106}
107
108#[allow(dead_code)]
110pub fn cached_schema_name(remote_name: &str) -> String {
111 format!("cached_{}", remote_name)
112}
113
114pub fn quoted_cached_schema_name(remote_name: &str) -> String {
116 format!("\"cached_{}\"", remote_name)
117}
118
119impl super::Store {
120 pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
125 let conn = self.connection_with_options(false)?;
127
128 self.attach_remote(&conn, remote)?;
130
131 let remote_schema = remote.quoted_schema_name();
132
133 ensure_remote_schema(&conn, &remote_schema)?;
135
136 let mut stats = PushStats::default();
137
138 if opts.dry_run {
139 stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
141 stats.invocations = count_table_to_push(&conn, "invocations", &remote_schema, opts.since)?;
142 stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
143 stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
144 } else {
145 stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
147 stats.invocations = push_table(&conn, "invocations", &remote_schema, opts.since)?;
148 stats.outputs = push_table(&conn, "outputs", &remote_schema, opts.since)?;
149 stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
150 }
151
152 Ok(stats)
153 }
154
155 pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
161 let conn = self.connection_with_options(false)?;
163
164 self.attach_remote(&conn, remote)?;
166
167 let remote_schema = remote.quoted_schema_name();
168 let cached_schema = quoted_cached_schema_name(&remote.name);
169
170 ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
172
173 let stats = PullStats {
175 sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
176 invocations: pull_table(&conn, "invocations", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
177 outputs: pull_table(&conn, "outputs", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
178 events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
179 };
180
181 self.rebuild_caches_schema(&conn)?;
183
184 Ok(stats)
185 }
186
187 pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
193 let schemas: Vec<String> = conn
195 .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
196 .query_map([], |row| row.get(0))?
197 .filter_map(|r| r.ok())
198 .collect();
199
200 conn.execute("BEGIN TRANSACTION", [])?;
202
203 let result = (|| -> std::result::Result<(), duckdb::Error> {
204 for table in &["sessions", "invocations", "outputs", "events"] {
205 let mut union_parts: Vec<String> = schemas
206 .iter()
207 .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
208 .collect();
209
210 if !schemas.iter().any(|s| s == "cached_placeholder") {
212 union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
213 }
214
215 let sql = format!(
216 "CREATE OR REPLACE VIEW caches.{} AS {}",
217 table,
218 union_parts.join(" UNION ALL BY NAME ")
219 );
220 conn.execute(&sql, [])?;
221 }
222 Ok(())
223 })();
224
225 match result {
226 Ok(()) => {
227 conn.execute("COMMIT", [])?;
228 Ok(())
229 }
230 Err(e) => {
231 let _ = conn.execute("ROLLBACK", []);
232 Err(crate::Error::DuckDb(e))
233 }
234 }
235 }
236}
237
238fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
241 let sql = format!(
242 r#"
243 CREATE TABLE IF NOT EXISTS {schema}.sessions (
244 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
245 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
246 );
247 CREATE TABLE IF NOT EXISTS {schema}.invocations (
248 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
249 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
250 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
251 tag VARCHAR, date DATE
252 );
253 CREATE TABLE IF NOT EXISTS {schema}.outputs (
254 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
255 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
256 content_type VARCHAR, date DATE
257 );
258 CREATE TABLE IF NOT EXISTS {schema}.events (
259 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
260 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
261 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
262 status VARCHAR, format_used VARCHAR, date DATE
263 );
264 "#,
265 schema = schema
266 );
267 conn.execute_batch(&sql)?;
268 Ok(())
269}
270
271fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
274 conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
276
277 let sql = format!(
279 r#"
280 CREATE TABLE IF NOT EXISTS {schema}.sessions (
281 session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
282 invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
283 _source VARCHAR DEFAULT '{remote_name}'
284 );
285 CREATE TABLE IF NOT EXISTS {schema}.invocations (
286 id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
287 cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
288 format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
289 tag VARCHAR, date DATE,
290 _source VARCHAR DEFAULT '{remote_name}'
291 );
292 CREATE TABLE IF NOT EXISTS {schema}.outputs (
293 id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
294 byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
295 content_type VARCHAR, date DATE,
296 _source VARCHAR DEFAULT '{remote_name}'
297 );
298 CREATE TABLE IF NOT EXISTS {schema}.events (
299 id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
300 event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
301 ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
302 status VARCHAR, format_used VARCHAR, date DATE,
303 _source VARCHAR DEFAULT '{remote_name}'
304 );
305 "#,
306 schema = schema,
307 remote_name = remote_name.replace('\'', "''")
308 );
309 conn.execute_batch(&sql)?;
310 Ok(())
311}
312
313fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
315 since
316 .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
317 .unwrap_or_default()
318}
319
320fn client_clause(client_id: Option<&str>) -> String {
322 client_id
323 .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
324 .unwrap_or_default()
325}
326
327fn count_sessions_to_push(
330 conn: &Connection,
331 remote_schema: &str,
332 since: Option<NaiveDate>,
333) -> Result<usize> {
334 let since_filter = since_clause(since, "i.timestamp");
335
336 let sql = format!(
337 r#"
338 SELECT COUNT(DISTINCT s.session_id)
339 FROM local.sessions s
340 JOIN local.invocations i ON i.session_id = s.session_id
341 WHERE NOT EXISTS (
342 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
343 )
344 {since}
345 "#,
346 remote = remote_schema,
347 since = since_filter,
348 );
349
350 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
351 Ok(count as usize)
352}
353
354fn count_table_to_push(
357 conn: &Connection,
358 table: &str,
359 remote_schema: &str,
360 since: Option<NaiveDate>,
361) -> Result<usize> {
362 let sql = match table {
363 "invocations" => {
364 let since_filter = since_clause(since, "l.timestamp");
365 format!(
366 r#"
367 SELECT COUNT(*)
368 FROM local.{table} l
369 WHERE NOT EXISTS (
370 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
371 )
372 {since}
373 "#,
374 table = table,
375 remote = remote_schema,
376 since = since_filter,
377 )
378 }
379 "outputs" | "events" => {
380 let since_filter = since_clause(since, "i.timestamp");
381 format!(
382 r#"
383 SELECT COUNT(*)
384 FROM local.{table} l
385 JOIN local.invocations i ON i.id = l.invocation_id
386 WHERE NOT EXISTS (
387 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
388 )
389 {since}
390 "#,
391 table = table,
392 remote = remote_schema,
393 since = since_filter,
394 )
395 }
396 _ => {
397 format!(
398 r#"
399 SELECT COUNT(*)
400 FROM local.{table} l
401 WHERE NOT EXISTS (
402 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
403 )
404 "#,
405 table = table,
406 remote = remote_schema,
407 )
408 }
409 };
410
411 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
412 Ok(count as usize)
413}
414
415fn push_sessions(
417 conn: &Connection,
418 remote_schema: &str,
419 since: Option<NaiveDate>,
420) -> Result<usize> {
421 let since_filter = since_clause(since, "i.timestamp");
422
423 let sql = format!(
424 r#"
425 INSERT INTO {remote}.sessions
426 SELECT DISTINCT s.*
427 FROM local.sessions s
428 JOIN local.invocations i ON i.session_id = s.session_id
429 WHERE NOT EXISTS (
430 SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
431 )
432 {since}
433 "#,
434 remote = remote_schema,
435 since = since_filter,
436 );
437
438 let count = conn.execute(&sql, [])?;
439 Ok(count)
440}
441
442fn push_table(
444 conn: &Connection,
445 table: &str,
446 remote_schema: &str,
447 since: Option<NaiveDate>,
448) -> Result<usize> {
449 let sql = match table {
450 "invocations" => {
451 let since_filter = since_clause(since, "l.timestamp");
452 format!(
453 r#"
454 INSERT INTO {remote}.{table}
455 SELECT *
456 FROM local.{table} l
457 WHERE NOT EXISTS (
458 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
459 )
460 {since}
461 "#,
462 table = table,
463 remote = remote_schema,
464 since = since_filter,
465 )
466 }
467 "outputs" | "events" => {
468 let since_filter = since_clause(since, "i.timestamp");
469 format!(
470 r#"
471 INSERT INTO {remote}.{table}
472 SELECT l.*
473 FROM local.{table} l
474 JOIN local.invocations i ON i.id = l.invocation_id
475 WHERE NOT EXISTS (
476 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
477 )
478 {since}
479 "#,
480 table = table,
481 remote = remote_schema,
482 since = since_filter,
483 )
484 }
485 _ => {
486 format!(
487 r#"
488 INSERT INTO {remote}.{table}
489 SELECT *
490 FROM local.{table} l
491 WHERE NOT EXISTS (
492 SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
493 )
494 "#,
495 table = table,
496 remote = remote_schema,
497 )
498 }
499 };
500
501 let count = conn.execute(&sql, [])?;
502 Ok(count)
503}
504
505fn pull_sessions(
507 conn: &Connection,
508 remote_schema: &str,
509 cached_schema: &str,
510 since: Option<NaiveDate>,
511 client_id: Option<&str>,
512) -> Result<usize> {
513 let since_filter = since_clause(since, "r.registered_at");
514 let client_filter = client_clause(client_id);
515
516 let sql = format!(
517 r#"
518 INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
519 SELECT r.*
520 FROM {remote}.sessions r
521 WHERE NOT EXISTS (
522 SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
523 )
524 {since}
525 {client}
526 "#,
527 cached = cached_schema,
528 remote = remote_schema,
529 since = since_filter,
530 client = client_filter,
531 );
532
533 let count = conn.execute(&sql, [])?;
534 Ok(count)
535}
536
537fn pull_table(
539 conn: &Connection,
540 table: &str,
541 remote_schema: &str,
542 cached_schema: &str,
543 since: Option<NaiveDate>,
544 client_id: Option<&str>,
545) -> Result<usize> {
546 let client_filter = client_clause(client_id);
547
548 let sql = match table {
549 "invocations" => {
550 let since_filter = since_clause(since, "r.timestamp");
551 format!(
552 r#"
553 INSERT INTO {cached}.{table} (id, session_id, timestamp, duration_ms, cwd, cmd, executable, exit_code, format_hint, client_id, hostname, username, tag, date)
554 SELECT r.*
555 FROM {remote}.{table} r
556 WHERE NOT EXISTS (
557 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
558 )
559 {since}
560 {client}
561 "#,
562 table = table,
563 cached = cached_schema,
564 remote = remote_schema,
565 since = since_filter,
566 client = client_filter,
567 )
568 }
569 "outputs" => {
570 let since_filter = since_clause(since, "i.timestamp");
571 format!(
572 r#"
573 INSERT INTO {cached}.{table} (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
574 SELECT r.*
575 FROM {remote}.{table} r
576 JOIN {remote}.invocations i ON i.id = r.invocation_id
577 WHERE NOT EXISTS (
578 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
579 )
580 {since}
581 {client}
582 "#,
583 table = table,
584 cached = cached_schema,
585 remote = remote_schema,
586 since = since_filter,
587 client = if client_id.is_some() {
588 format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
589 } else {
590 String::new()
591 },
592 )
593 }
594 "events" => {
595 let since_filter = since_clause(since, "i.timestamp");
596 format!(
597 r#"
598 INSERT INTO {cached}.{table} (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
599 SELECT r.*
600 FROM {remote}.{table} r
601 JOIN {remote}.invocations i ON i.id = r.invocation_id
602 WHERE NOT EXISTS (
603 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
604 )
605 {since}
606 {client}
607 "#,
608 table = table,
609 cached = cached_schema,
610 remote = remote_schema,
611 since = since_filter,
612 client = if client_id.is_some() {
613 format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
614 } else {
615 String::new()
616 },
617 )
618 }
619 _ => {
620 format!(
621 r#"
622 INSERT INTO {cached}.{table}
623 SELECT r.*
624 FROM {remote}.{table} r
625 WHERE NOT EXISTS (
626 SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
627 )
628 {client}
629 "#,
630 table = table,
631 cached = cached_schema,
632 remote = remote_schema,
633 client = client_filter,
634 )
635 }
636 };
637
638 let count = conn.execute(&sql, [])?;
639 Ok(count)
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use crate::config::{RemoteConfig, RemoteMode, RemoteType};
646 use crate::init::initialize;
647 use crate::schema::InvocationRecord;
648 use crate::store::{ConnectionOptions, Store};
649 use crate::Config;
650 use tempfile::TempDir;
651
652 fn setup_store_duckdb() -> (TempDir, Store) {
653 let tmp = TempDir::new().unwrap();
654 let config = Config::with_duckdb_mode(tmp.path());
655 initialize(&config).unwrap();
656 let store = Store::open(config).unwrap();
657 (tmp, store)
658 }
659
660 fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
661 RemoteConfig {
662 name: name.to_string(),
663 remote_type: RemoteType::File,
664 uri: path.to_string_lossy().to_string(),
665 mode: RemoteMode::ReadWrite,
666 auto_attach: true,
667 credential_provider: None,
668 }
669 }
670
671 #[test]
674 fn test_parse_since_days() {
675 let today = Utc::now().date_naive();
676 let result = parse_since("7d").unwrap();
677 assert_eq!(result, today - TimeDelta::days(7));
678 }
679
680 #[test]
681 fn test_parse_since_weeks() {
682 let today = Utc::now().date_naive();
683 let result = parse_since("2w").unwrap();
684 assert_eq!(result, today - TimeDelta::days(14));
685 }
686
687 #[test]
688 fn test_parse_since_months() {
689 let today = Utc::now().date_naive();
690 let result = parse_since("1m").unwrap();
691 assert_eq!(result, today - TimeDelta::days(30));
692 }
693
694 #[test]
695 fn test_parse_since_date() {
696 let result = parse_since("2024-01-15").unwrap();
697 assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
698 }
699
700 #[test]
701 fn test_parse_since_invalid() {
702 assert!(parse_since("invalid").is_err());
703 }
704
705 #[test]
708 fn test_push_to_file_remote() {
709 let (tmp, store) = setup_store_duckdb();
710
711 let inv = InvocationRecord::new(
713 "test-session",
714 "echo hello",
715 "/home/user",
716 0,
717 "test@client",
718 );
719 store.write_invocation(&inv).unwrap();
720
721 let remote_path = tmp.path().join("remote.duckdb");
723 let remote = create_file_remote("test", &remote_path);
724
725 let stats = store.push(&remote, PushOptions::default()).unwrap();
727
728 assert_eq!(stats.invocations, 1);
729 assert!(remote_path.exists(), "Remote database file should be created");
730 }
731
732 #[test]
733 fn test_push_is_idempotent() {
734 let (tmp, store) = setup_store_duckdb();
735
736 let inv = InvocationRecord::new(
738 "test-session",
739 "echo hello",
740 "/home/user",
741 0,
742 "test@client",
743 );
744 store.write_invocation(&inv).unwrap();
745
746 let remote_path = tmp.path().join("remote.duckdb");
748 let remote = create_file_remote("test", &remote_path);
749
750 let stats1 = store.push(&remote, PushOptions::default()).unwrap();
752 let stats2 = store.push(&remote, PushOptions::default()).unwrap();
753
754 assert_eq!(stats1.invocations, 1);
756 assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
757 }
758
759 #[test]
760 fn test_push_dry_run() {
761 let (tmp, store) = setup_store_duckdb();
762
763 let inv = InvocationRecord::new(
765 "test-session",
766 "echo hello",
767 "/home/user",
768 0,
769 "test@client",
770 );
771 store.write_invocation(&inv).unwrap();
772
773 let remote_path = tmp.path().join("remote.duckdb");
775 let remote = create_file_remote("test", &remote_path);
776
777 let dry_stats = store
779 .push(
780 &remote,
781 PushOptions {
782 dry_run: true,
783 ..Default::default()
784 },
785 )
786 .unwrap();
787
788 assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
789
790 let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
792 assert_eq!(actual_stats.invocations, 1);
793 }
794
795 #[test]
796 fn test_pull_from_file_remote() {
797 let (tmp, store) = setup_store_duckdb();
798
799 let inv = InvocationRecord::new(
801 "test-session",
802 "echo hello",
803 "/home/user",
804 0,
805 "test@client",
806 );
807 store.write_invocation(&inv).unwrap();
808
809 let remote_path = tmp.path().join("remote.duckdb");
810 let remote = create_file_remote("test", &remote_path);
811 store.push(&remote, PushOptions::default()).unwrap();
812
813 let conn = store.connection().unwrap();
815 conn.execute("DELETE FROM local.invocations", []).unwrap();
816 drop(conn);
817
818 let stats = store.pull(&remote, PullOptions::default()).unwrap();
820
821 assert_eq!(stats.invocations, 1, "Should pull the invocation back");
822
823 let conn = store.connection().unwrap();
825 let count: i64 = conn
826 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
827 .unwrap();
828 assert_eq!(count, 1, "Data should be in cached schema");
829 }
830
831 #[test]
832 fn test_pull_is_idempotent() {
833 let (tmp, store) = setup_store_duckdb();
834
835 let inv = InvocationRecord::new(
837 "test-session",
838 "echo hello",
839 "/home/user",
840 0,
841 "test@client",
842 );
843 store.write_invocation(&inv).unwrap();
844
845 let remote_path = tmp.path().join("remote.duckdb");
846 let remote = create_file_remote("test", &remote_path);
847 store.push(&remote, PushOptions::default()).unwrap();
848
849 let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
851 let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
852
853 assert_eq!(stats1.invocations, 1);
854 assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
855 }
856
857 #[test]
860 fn test_remote_name_with_hyphen() {
861 let (tmp, store) = setup_store_duckdb();
862
863 let inv = InvocationRecord::new(
865 "test-session",
866 "echo hello",
867 "/home/user",
868 0,
869 "test@client",
870 );
871 store.write_invocation(&inv).unwrap();
872
873 let remote_path = tmp.path().join("my-team-remote.duckdb");
875 let remote = create_file_remote("my-team", &remote_path);
876
877 let stats = store.push(&remote, PushOptions::default()).unwrap();
879 assert_eq!(stats.invocations, 1);
880
881 let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
883 assert_eq!(pull_stats.invocations, 1);
885 }
886
887 #[test]
888 fn test_remote_name_with_dots() {
889 let (tmp, store) = setup_store_duckdb();
890
891 let inv = InvocationRecord::new(
892 "test-session",
893 "echo hello",
894 "/home/user",
895 0,
896 "test@client",
897 );
898 store.write_invocation(&inv).unwrap();
899
900 let remote_path = tmp.path().join("team.v2.duckdb");
902 let remote = create_file_remote("team.v2", &remote_path);
903
904 let stats = store.push(&remote, PushOptions::default()).unwrap();
905 assert_eq!(stats.invocations, 1);
906 }
907
908 #[test]
911 fn test_connection_minimal_vs_full() {
912 let (_tmp, store) = setup_store_duckdb();
913
914 let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
916 let count: i64 = conn_minimal
917 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
918 .unwrap();
919 assert_eq!(count, 0);
920 drop(conn_minimal);
921
922 let conn_full = store.connect(ConnectionOptions::full()).unwrap();
924 let count: i64 = conn_full
925 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
926 .unwrap();
927 assert_eq!(count, 0);
928 }
929
930 #[test]
931 fn test_multiple_sequential_connections() {
932 let (_tmp, store) = setup_store_duckdb();
933
934 for i in 0..5 {
937 let conn = store.connection().unwrap();
938 let count: i64 = conn
939 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
940 .unwrap();
941 assert_eq!(count, 0, "Connection {} should work", i);
942 drop(conn);
943 }
944
945 let inv = InvocationRecord::new(
947 "test-session",
948 "echo hello",
949 "/home/user",
950 0,
951 "test@client",
952 );
953 store.write_invocation(&inv).unwrap();
954
955 for i in 0..3 {
957 let conn = store.connection().unwrap();
958 let count: i64 = conn
959 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
960 .unwrap();
961 assert_eq!(count, 1, "Connection {} should see the data", i);
962 drop(conn);
963 }
964 }
965
966 #[test]
969 fn test_caches_schema_views_work() {
970 let (tmp, store) = setup_store_duckdb();
971
972 let conn = store.connection().unwrap();
974 let count: i64 = conn
975 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
976 .unwrap();
977 assert_eq!(count, 0);
978 drop(conn);
979
980 let inv = InvocationRecord::new(
982 "test-session",
983 "echo hello",
984 "/home/user",
985 0,
986 "test@client",
987 );
988 store.write_invocation(&inv).unwrap();
989
990 let remote_path = tmp.path().join("remote.duckdb");
991 let remote = create_file_remote("test", &remote_path);
992 store.push(&remote, PushOptions::default()).unwrap();
993
994 store.pull(&remote, PullOptions::default()).unwrap();
996
997 let conn = store.connection().unwrap();
999 store.rebuild_caches_schema(&conn).unwrap();
1000
1001 let count: i64 = conn
1003 .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1004 .unwrap();
1005 assert_eq!(count, 1, "caches should include pulled data after rebuild");
1006 }
1007
1008 #[test]
1009 fn test_main_schema_unions_local_and_caches() {
1010 let (tmp, store) = setup_store_duckdb();
1011
1012 let inv1 = InvocationRecord::new(
1014 "test-session",
1015 "local command",
1016 "/home/user",
1017 0,
1018 "local@client",
1019 );
1020 store.write_invocation(&inv1).unwrap();
1021
1022 let remote_path = tmp.path().join("remote.duckdb");
1024 let remote = create_file_remote("team", &remote_path);
1025
1026 store.push(&remote, PushOptions::default()).unwrap();
1028
1029 let conn = store.connection().unwrap();
1031 conn.execute("DELETE FROM local.invocations", []).unwrap();
1032 drop(conn);
1033
1034 store.pull(&remote, PullOptions::default()).unwrap();
1035
1036 let conn = store.connection().unwrap();
1038 store.rebuild_caches_schema(&conn).unwrap();
1039
1040 let inv2 = InvocationRecord::new(
1042 "test-session-2",
1043 "new local command",
1044 "/home/user",
1045 0,
1046 "local@client",
1047 );
1048 drop(conn);
1049 store.write_invocation(&inv2).unwrap();
1050
1051 let conn = store.connection().unwrap();
1053 let count: i64 = conn
1054 .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1055 .unwrap();
1056 assert_eq!(count, 2, "main should union local + caches");
1057 }
1058}