Skip to main content

magic_bird/store/
remote.rs

1//! Remote sync operations (push/pull).
2//!
3//! Provides functionality to sync data between local and remote DuckDB databases.
4//!
5//! # Schema Architecture
6//!
7//! - **Push**: Reads from `local` schema, writes to `remote_<name>` schema tables
8//! - **Pull**: Reads from `remote_<name>` schema, writes to `cached_<name>` schema tables
9//!
10//! Remote databases have tables: `sessions`, `invocations`, `outputs`, `events`
11//! (no `_table` suffix - consistent naming across all schemas).
12
13use chrono::{NaiveDate, TimeDelta, Utc};
14use duckdb::Connection;
15
16use crate::{Error, RemoteConfig, Result};
17
18/// Statistics from a push operation.
19#[derive(Debug, Default)]
20pub struct PushStats {
21    pub sessions: usize,
22    pub invocations: usize,
23    pub outputs: usize,
24    pub events: usize,
25}
26
27impl std::fmt::Display for PushStats {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        write!(
30            f,
31            "{} sessions, {} invocations, {} outputs, {} events",
32            self.sessions, self.invocations, self.outputs, self.events
33        )
34    }
35}
36
37/// Statistics from a pull operation.
38#[derive(Debug, Default)]
39pub struct PullStats {
40    pub sessions: usize,
41    pub invocations: usize,
42    pub outputs: usize,
43    pub events: usize,
44}
45
46impl std::fmt::Display for PullStats {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(
49            f,
50            "{} sessions, {} invocations, {} outputs, {} events",
51            self.sessions, self.invocations, self.outputs, self.events
52        )
53    }
54}
55
56/// Options for push operation.
57#[derive(Debug, Default)]
58pub struct PushOptions {
59    /// Only push data since this date.
60    pub since: Option<NaiveDate>,
61    /// Show what would be pushed without actually pushing.
62    pub dry_run: bool,
63}
64
65/// Options for pull operation.
66#[derive(Debug, Default)]
67pub struct PullOptions {
68    /// Only pull data since this date.
69    pub since: Option<NaiveDate>,
70    /// Only pull data from this client.
71    pub client_id: Option<String>,
72}
73
74/// Parse a "since" string into a date.
75///
76/// Supports:
77/// - Duration: "7d", "2w", "1m" (days, weeks, months)
78/// - Date: "2024-01-15"
79pub fn parse_since(s: &str) -> Result<NaiveDate> {
80    let s = s.trim();
81
82    // Try duration first (7d, 2w, 1m)
83    if let Some(days) = parse_duration_days(s) {
84        let date = Utc::now().date_naive() - TimeDelta::days(days);
85        return Ok(date);
86    }
87
88    // Try date format (YYYY-MM-DD)
89    NaiveDate::parse_from_str(s, "%Y-%m-%d")
90        .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
91}
92
93/// Parse a duration string into days.
94fn parse_duration_days(s: &str) -> Option<i64> {
95    let s = s.trim().to_lowercase();
96
97    if let Some(num) = s.strip_suffix('d') {
98        num.parse::<i64>().ok()
99    } else if let Some(num) = s.strip_suffix('w') {
100        num.parse::<i64>().ok().map(|n| n * 7)
101    } else if let Some(num) = s.strip_suffix('m') {
102        num.parse::<i64>().ok().map(|n| n * 30)
103    } else {
104        None
105    }
106}
107
108/// Get the cached schema name for a remote (e.g., "cached_team" for remote "team").
109#[allow(dead_code)]
110pub fn cached_schema_name(remote_name: &str) -> String {
111    format!("cached_{}", remote_name)
112}
113
114/// Get the quoted cached schema name for SQL.
115pub fn quoted_cached_schema_name(remote_name: &str) -> String {
116    format!("\"cached_{}\"", remote_name)
117}
118
119impl super::Store {
120    /// Push local data to a remote.
121    ///
122    /// Reads from `local` schema, writes to remote's tables.
123    /// Only pushes records that don't already exist on the remote (by id).
124    pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
125        // Use connection without auto-attach to avoid conflicts and unnecessary views
126        let conn = self.connection_with_options(false)?;
127
128        // Attach only the target remote
129        self.attach_remote(&conn, remote)?;
130
131        let remote_schema = remote.quoted_schema_name();
132
133        // Ensure remote has the required tables
134        ensure_remote_schema(&conn, &remote_schema)?;
135
136        let mut stats = PushStats::default();
137
138        if opts.dry_run {
139            // Count what would be pushed
140            stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
141            stats.invocations = count_table_to_push(&conn, "invocations", &remote_schema, opts.since)?;
142            stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
143            stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
144        } else {
145            // Actually push in dependency order
146            stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
147            stats.invocations = push_table(&conn, "invocations", &remote_schema, opts.since)?;
148            stats.outputs = push_table(&conn, "outputs", &remote_schema, opts.since)?;
149            stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
150        }
151
152        Ok(stats)
153    }
154
155    /// Pull data from a remote into local cached_<name> schema.
156    ///
157    /// Reads from remote's tables, writes to `cached_<name>` schema.
158    /// Only pulls records that don't already exist in the cached schema (by id).
159    /// After pulling, rebuilds the `caches` union views.
160    pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
161        // Use connection without auto-attach to avoid conflicts
162        let conn = self.connection_with_options(false)?;
163
164        // Attach only the target remote
165        self.attach_remote(&conn, remote)?;
166
167        let remote_schema = remote.quoted_schema_name();
168        let cached_schema = quoted_cached_schema_name(&remote.name);
169
170        // Ensure cached schema exists with required tables
171        ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
172
173        // Pull in dependency order (sessions first, then invocations, outputs, events)
174        let stats = PullStats {
175            sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
176            invocations: pull_table(&conn, "invocations", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
177            outputs: pull_table(&conn, "outputs", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
178            events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
179        };
180
181        // Rebuild caches union views to include this cached schema
182        self.rebuild_caches_schema(&conn)?;
183
184        Ok(stats)
185    }
186
187    /// Rebuild the `caches` schema views to union all `cached_*` schemas.
188    ///
189    /// Uses explicit transaction for DDL safety. The caches.* views reference
190    /// local cached_* schemas (not attached databases), so they should be safe
191    /// to persist.
192    pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
193        // Find all cached_* schemas
194        let schemas: Vec<String> = conn
195            .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
196            .query_map([], |row| row.get(0))?
197            .filter_map(|r| r.ok())
198            .collect();
199
200        // Use transaction for DDL safety
201        conn.execute("BEGIN TRANSACTION", [])?;
202
203        let result = (|| -> std::result::Result<(), duckdb::Error> {
204            for table in &["sessions", "invocations", "outputs", "events"] {
205                let mut union_parts: Vec<String> = schemas
206                    .iter()
207                    .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
208                    .collect();
209
210                // Always include placeholder (ensures view is valid even with no cached schemas)
211                if !schemas.iter().any(|s| s == "cached_placeholder") {
212                    union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
213                }
214
215                let sql = format!(
216                    "CREATE OR REPLACE VIEW caches.{} AS {}",
217                    table,
218                    union_parts.join(" UNION ALL BY NAME ")
219                );
220                conn.execute(&sql, [])?;
221            }
222            Ok(())
223        })();
224
225        match result {
226            Ok(()) => {
227                conn.execute("COMMIT", [])?;
228                Ok(())
229            }
230            Err(e) => {
231                let _ = conn.execute("ROLLBACK", []);
232                Err(crate::Error::DuckDb(e))
233            }
234        }
235    }
236}
237
238/// Ensure the remote schema has the required tables.
239/// Tables use consistent naming (no `_table` suffix).
240fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
241    let sql = format!(
242        r#"
243        CREATE TABLE IF NOT EXISTS {schema}.sessions (
244            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
245            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
246        );
247        CREATE TABLE IF NOT EXISTS {schema}.invocations (
248            id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
249            cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
250            format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
251            tag VARCHAR, date DATE
252        );
253        CREATE TABLE IF NOT EXISTS {schema}.outputs (
254            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
255            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
256            content_type VARCHAR, date DATE
257        );
258        CREATE TABLE IF NOT EXISTS {schema}.events (
259            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
260            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
261            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
262            status VARCHAR, format_used VARCHAR, date DATE
263        );
264        "#,
265        schema = schema
266    );
267    conn.execute_batch(&sql)?;
268    Ok(())
269}
270
271/// Ensure the cached schema exists with required tables.
272/// Tables include a `_source` column to track which remote the data came from.
273fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
274    // Create the schema if it doesn't exist
275    conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
276
277    // Create tables with _source column
278    let sql = format!(
279        r#"
280        CREATE TABLE IF NOT EXISTS {schema}.sessions (
281            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
282            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
283            _source VARCHAR DEFAULT '{remote_name}'
284        );
285        CREATE TABLE IF NOT EXISTS {schema}.invocations (
286            id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
287            cwd VARCHAR, cmd VARCHAR, executable VARCHAR, exit_code INTEGER,
288            format_hint VARCHAR, client_id VARCHAR, hostname VARCHAR, username VARCHAR,
289            tag VARCHAR, date DATE,
290            _source VARCHAR DEFAULT '{remote_name}'
291        );
292        CREATE TABLE IF NOT EXISTS {schema}.outputs (
293            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
294            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
295            content_type VARCHAR, date DATE,
296            _source VARCHAR DEFAULT '{remote_name}'
297        );
298        CREATE TABLE IF NOT EXISTS {schema}.events (
299            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
300            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
301            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
302            status VARCHAR, format_used VARCHAR, date DATE,
303            _source VARCHAR DEFAULT '{remote_name}'
304        );
305        "#,
306        schema = schema,
307        remote_name = remote_name.replace('\'', "''")
308    );
309    conn.execute_batch(&sql)?;
310    Ok(())
311}
312
313/// Build the WHERE clause for time filtering.
314fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
315    since
316        .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
317        .unwrap_or_default()
318}
319
320/// Build the WHERE clause for client filtering.
321fn client_clause(client_id: Option<&str>) -> String {
322    client_id
323        .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
324        .unwrap_or_default()
325}
326
327/// Count sessions that would be pushed.
328/// Reads from `local` schema.
329fn count_sessions_to_push(
330    conn: &Connection,
331    remote_schema: &str,
332    since: Option<NaiveDate>,
333) -> Result<usize> {
334    let since_filter = since_clause(since, "i.timestamp");
335
336    let sql = format!(
337        r#"
338        SELECT COUNT(DISTINCT s.session_id)
339        FROM local.sessions s
340        JOIN local.invocations i ON i.session_id = s.session_id
341        WHERE NOT EXISTS (
342            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
343        )
344        {since}
345        "#,
346        remote = remote_schema,
347        since = since_filter,
348    );
349
350    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
351    Ok(count as usize)
352}
353
354/// Count records that would be pushed for a table.
355/// Reads from `local` schema.
356fn count_table_to_push(
357    conn: &Connection,
358    table: &str,
359    remote_schema: &str,
360    since: Option<NaiveDate>,
361) -> Result<usize> {
362    let sql = match table {
363        "invocations" => {
364            let since_filter = since_clause(since, "l.timestamp");
365            format!(
366                r#"
367                SELECT COUNT(*)
368                FROM local.{table} l
369                WHERE NOT EXISTS (
370                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
371                )
372                {since}
373                "#,
374                table = table,
375                remote = remote_schema,
376                since = since_filter,
377            )
378        }
379        "outputs" | "events" => {
380            let since_filter = since_clause(since, "i.timestamp");
381            format!(
382                r#"
383                SELECT COUNT(*)
384                FROM local.{table} l
385                JOIN local.invocations i ON i.id = l.invocation_id
386                WHERE NOT EXISTS (
387                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
388                )
389                {since}
390                "#,
391                table = table,
392                remote = remote_schema,
393                since = since_filter,
394            )
395        }
396        _ => {
397            format!(
398                r#"
399                SELECT COUNT(*)
400                FROM local.{table} l
401                WHERE NOT EXISTS (
402                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
403                )
404                "#,
405                table = table,
406                remote = remote_schema,
407            )
408        }
409    };
410
411    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
412    Ok(count as usize)
413}
414
415/// Push sessions from `local` to remote.
416fn push_sessions(
417    conn: &Connection,
418    remote_schema: &str,
419    since: Option<NaiveDate>,
420) -> Result<usize> {
421    let since_filter = since_clause(since, "i.timestamp");
422
423    let sql = format!(
424        r#"
425        INSERT INTO {remote}.sessions
426        SELECT DISTINCT s.*
427        FROM local.sessions s
428        JOIN local.invocations i ON i.session_id = s.session_id
429        WHERE NOT EXISTS (
430            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
431        )
432        {since}
433        "#,
434        remote = remote_schema,
435        since = since_filter,
436    );
437
438    let count = conn.execute(&sql, [])?;
439    Ok(count)
440}
441
442/// Push records from `local` to remote.
443fn push_table(
444    conn: &Connection,
445    table: &str,
446    remote_schema: &str,
447    since: Option<NaiveDate>,
448) -> Result<usize> {
449    let sql = match table {
450        "invocations" => {
451            let since_filter = since_clause(since, "l.timestamp");
452            format!(
453                r#"
454                INSERT INTO {remote}.{table}
455                SELECT *
456                FROM local.{table} l
457                WHERE NOT EXISTS (
458                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
459                )
460                {since}
461                "#,
462                table = table,
463                remote = remote_schema,
464                since = since_filter,
465            )
466        }
467        "outputs" | "events" => {
468            let since_filter = since_clause(since, "i.timestamp");
469            format!(
470                r#"
471                INSERT INTO {remote}.{table}
472                SELECT l.*
473                FROM local.{table} l
474                JOIN local.invocations i ON i.id = l.invocation_id
475                WHERE NOT EXISTS (
476                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
477                )
478                {since}
479                "#,
480                table = table,
481                remote = remote_schema,
482                since = since_filter,
483            )
484        }
485        _ => {
486            format!(
487                r#"
488                INSERT INTO {remote}.{table}
489                SELECT *
490                FROM local.{table} l
491                WHERE NOT EXISTS (
492                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
493                )
494                "#,
495                table = table,
496                remote = remote_schema,
497            )
498        }
499    };
500
501    let count = conn.execute(&sql, [])?;
502    Ok(count)
503}
504
505/// Pull sessions from remote into cached schema.
506fn pull_sessions(
507    conn: &Connection,
508    remote_schema: &str,
509    cached_schema: &str,
510    since: Option<NaiveDate>,
511    client_id: Option<&str>,
512) -> Result<usize> {
513    let since_filter = since_clause(since, "r.registered_at");
514    let client_filter = client_clause(client_id);
515
516    let sql = format!(
517        r#"
518        INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
519        SELECT r.*
520        FROM {remote}.sessions r
521        WHERE NOT EXISTS (
522            SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
523        )
524        {since}
525        {client}
526        "#,
527        cached = cached_schema,
528        remote = remote_schema,
529        since = since_filter,
530        client = client_filter,
531    );
532
533    let count = conn.execute(&sql, [])?;
534    Ok(count)
535}
536
537/// Pull records from remote into cached schema.
538fn pull_table(
539    conn: &Connection,
540    table: &str,
541    remote_schema: &str,
542    cached_schema: &str,
543    since: Option<NaiveDate>,
544    client_id: Option<&str>,
545) -> Result<usize> {
546    let client_filter = client_clause(client_id);
547
548    let sql = match table {
549        "invocations" => {
550            let since_filter = since_clause(since, "r.timestamp");
551            format!(
552                r#"
553                INSERT INTO {cached}.{table} (id, session_id, timestamp, duration_ms, cwd, cmd, executable, exit_code, format_hint, client_id, hostname, username, tag, date)
554                SELECT r.*
555                FROM {remote}.{table} r
556                WHERE NOT EXISTS (
557                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
558                )
559                {since}
560                {client}
561                "#,
562                table = table,
563                cached = cached_schema,
564                remote = remote_schema,
565                since = since_filter,
566                client = client_filter,
567            )
568        }
569        "outputs" => {
570            let since_filter = since_clause(since, "i.timestamp");
571            format!(
572                r#"
573                INSERT INTO {cached}.{table} (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
574                SELECT r.*
575                FROM {remote}.{table} r
576                JOIN {remote}.invocations i ON i.id = r.invocation_id
577                WHERE NOT EXISTS (
578                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
579                )
580                {since}
581                {client}
582                "#,
583                table = table,
584                cached = cached_schema,
585                remote = remote_schema,
586                since = since_filter,
587                client = if client_id.is_some() {
588                    format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
589                } else {
590                    String::new()
591                },
592            )
593        }
594        "events" => {
595            let since_filter = since_clause(since, "i.timestamp");
596            format!(
597                r#"
598                INSERT INTO {cached}.{table} (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
599                SELECT r.*
600                FROM {remote}.{table} r
601                JOIN {remote}.invocations i ON i.id = r.invocation_id
602                WHERE NOT EXISTS (
603                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
604                )
605                {since}
606                {client}
607                "#,
608                table = table,
609                cached = cached_schema,
610                remote = remote_schema,
611                since = since_filter,
612                client = if client_id.is_some() {
613                    format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
614                } else {
615                    String::new()
616                },
617            )
618        }
619        _ => {
620            format!(
621                r#"
622                INSERT INTO {cached}.{table}
623                SELECT r.*
624                FROM {remote}.{table} r
625                WHERE NOT EXISTS (
626                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
627                )
628                {client}
629                "#,
630                table = table,
631                cached = cached_schema,
632                remote = remote_schema,
633                client = client_filter,
634            )
635        }
636    };
637
638    let count = conn.execute(&sql, [])?;
639    Ok(count)
640}
641
642#[cfg(test)]
643mod tests {
644    use super::*;
645    use crate::config::{RemoteConfig, RemoteMode, RemoteType};
646    use crate::init::initialize;
647    use crate::schema::InvocationRecord;
648    use crate::store::{ConnectionOptions, Store};
649    use crate::Config;
650    use tempfile::TempDir;
651
652    fn setup_store_duckdb() -> (TempDir, Store) {
653        let tmp = TempDir::new().unwrap();
654        let config = Config::with_duckdb_mode(tmp.path());
655        initialize(&config).unwrap();
656        let store = Store::open(config).unwrap();
657        (tmp, store)
658    }
659
660    fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
661        RemoteConfig {
662            name: name.to_string(),
663            remote_type: RemoteType::File,
664            uri: path.to_string_lossy().to_string(),
665            mode: RemoteMode::ReadWrite,
666            auto_attach: true,
667            credential_provider: None,
668        }
669    }
670
671    // ===== Date parsing tests =====
672
673    #[test]
674    fn test_parse_since_days() {
675        let today = Utc::now().date_naive();
676        let result = parse_since("7d").unwrap();
677        assert_eq!(result, today - TimeDelta::days(7));
678    }
679
680    #[test]
681    fn test_parse_since_weeks() {
682        let today = Utc::now().date_naive();
683        let result = parse_since("2w").unwrap();
684        assert_eq!(result, today - TimeDelta::days(14));
685    }
686
687    #[test]
688    fn test_parse_since_months() {
689        let today = Utc::now().date_naive();
690        let result = parse_since("1m").unwrap();
691        assert_eq!(result, today - TimeDelta::days(30));
692    }
693
694    #[test]
695    fn test_parse_since_date() {
696        let result = parse_since("2024-01-15").unwrap();
697        assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
698    }
699
700    #[test]
701    fn test_parse_since_invalid() {
702        assert!(parse_since("invalid").is_err());
703    }
704
705    // ===== Push/Pull integration tests =====
706
707    #[test]
708    fn test_push_to_file_remote() {
709        let (tmp, store) = setup_store_duckdb();
710
711        // Write some local data
712        let inv = InvocationRecord::new(
713            "test-session",
714            "echo hello",
715            "/home/user",
716            0,
717            "test@client",
718        );
719        store.write_invocation(&inv).unwrap();
720
721        // Create a file remote
722        let remote_path = tmp.path().join("remote.duckdb");
723        let remote = create_file_remote("test", &remote_path);
724
725        // Push to remote
726        let stats = store.push(&remote, PushOptions::default()).unwrap();
727
728        assert_eq!(stats.invocations, 1);
729        assert!(remote_path.exists(), "Remote database file should be created");
730    }
731
732    #[test]
733    fn test_push_is_idempotent() {
734        let (tmp, store) = setup_store_duckdb();
735
736        // Write local data
737        let inv = InvocationRecord::new(
738            "test-session",
739            "echo hello",
740            "/home/user",
741            0,
742            "test@client",
743        );
744        store.write_invocation(&inv).unwrap();
745
746        // Create remote
747        let remote_path = tmp.path().join("remote.duckdb");
748        let remote = create_file_remote("test", &remote_path);
749
750        // Push twice
751        let stats1 = store.push(&remote, PushOptions::default()).unwrap();
752        let stats2 = store.push(&remote, PushOptions::default()).unwrap();
753
754        // First push should transfer data, second should be idempotent
755        assert_eq!(stats1.invocations, 1);
756        assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
757    }
758
759    #[test]
760    fn test_push_dry_run() {
761        let (tmp, store) = setup_store_duckdb();
762
763        // Write local data
764        let inv = InvocationRecord::new(
765            "test-session",
766            "echo hello",
767            "/home/user",
768            0,
769            "test@client",
770        );
771        store.write_invocation(&inv).unwrap();
772
773        // Create remote
774        let remote_path = tmp.path().join("remote.duckdb");
775        let remote = create_file_remote("test", &remote_path);
776
777        // Dry run push
778        let dry_stats = store
779            .push(
780                &remote,
781                PushOptions {
782                    dry_run: true,
783                    ..Default::default()
784                },
785            )
786            .unwrap();
787
788        assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
789
790        // Actual push should still transfer data (dry run didn't modify)
791        let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
792        assert_eq!(actual_stats.invocations, 1);
793    }
794
795    #[test]
796    fn test_pull_from_file_remote() {
797        let (tmp, store) = setup_store_duckdb();
798
799        // Write local data and push to remote
800        let inv = InvocationRecord::new(
801            "test-session",
802            "echo hello",
803            "/home/user",
804            0,
805            "test@client",
806        );
807        store.write_invocation(&inv).unwrap();
808
809        let remote_path = tmp.path().join("remote.duckdb");
810        let remote = create_file_remote("test", &remote_path);
811        store.push(&remote, PushOptions::default()).unwrap();
812
813        // Clear local data (simulate different client)
814        let conn = store.connection().unwrap();
815        conn.execute("DELETE FROM local.invocations", []).unwrap();
816        drop(conn);
817
818        // Pull from remote
819        let stats = store.pull(&remote, PullOptions::default()).unwrap();
820
821        assert_eq!(stats.invocations, 1, "Should pull the invocation back");
822
823        // Verify data in cached schema
824        let conn = store.connection().unwrap();
825        let count: i64 = conn
826            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
827            .unwrap();
828        assert_eq!(count, 1, "Data should be in cached schema");
829    }
830
831    #[test]
832    fn test_pull_is_idempotent() {
833        let (tmp, store) = setup_store_duckdb();
834
835        // Setup: write, push, clear local
836        let inv = InvocationRecord::new(
837            "test-session",
838            "echo hello",
839            "/home/user",
840            0,
841            "test@client",
842        );
843        store.write_invocation(&inv).unwrap();
844
845        let remote_path = tmp.path().join("remote.duckdb");
846        let remote = create_file_remote("test", &remote_path);
847        store.push(&remote, PushOptions::default()).unwrap();
848
849        // Pull twice
850        let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
851        let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
852
853        assert_eq!(stats1.invocations, 1);
854        assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
855    }
856
857    // ===== Remote name handling tests =====
858
859    #[test]
860    fn test_remote_name_with_hyphen() {
861        let (tmp, store) = setup_store_duckdb();
862
863        // Write local data
864        let inv = InvocationRecord::new(
865            "test-session",
866            "echo hello",
867            "/home/user",
868            0,
869            "test@client",
870        );
871        store.write_invocation(&inv).unwrap();
872
873        // Create remote with hyphen in name
874        let remote_path = tmp.path().join("my-team-remote.duckdb");
875        let remote = create_file_remote("my-team", &remote_path);
876
877        // Push should work despite hyphen
878        let stats = store.push(&remote, PushOptions::default()).unwrap();
879        assert_eq!(stats.invocations, 1);
880
881        // Pull should also work (hyphen in name handled correctly)
882        let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
883        // Pull brings data from remote into cached_my_team schema
884        assert_eq!(pull_stats.invocations, 1);
885    }
886
887    #[test]
888    fn test_remote_name_with_dots() {
889        let (tmp, store) = setup_store_duckdb();
890
891        let inv = InvocationRecord::new(
892            "test-session",
893            "echo hello",
894            "/home/user",
895            0,
896            "test@client",
897        );
898        store.write_invocation(&inv).unwrap();
899
900        // Remote with dots in name
901        let remote_path = tmp.path().join("team.v2.duckdb");
902        let remote = create_file_remote("team.v2", &remote_path);
903
904        let stats = store.push(&remote, PushOptions::default()).unwrap();
905        assert_eq!(stats.invocations, 1);
906    }
907
908    // ===== Connection options tests =====
909
910    #[test]
911    fn test_connection_minimal_vs_full() {
912        let (_tmp, store) = setup_store_duckdb();
913
914        // Minimal connection should work
915        let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
916        let count: i64 = conn_minimal
917            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
918            .unwrap();
919        assert_eq!(count, 0);
920        drop(conn_minimal);
921
922        // Full connection should also work
923        let conn_full = store.connect(ConnectionOptions::full()).unwrap();
924        let count: i64 = conn_full
925            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
926            .unwrap();
927        assert_eq!(count, 0);
928    }
929
930    #[test]
931    fn test_multiple_sequential_connections() {
932        let (_tmp, store) = setup_store_duckdb();
933
934        // Open and close multiple connections sequentially
935        // This tests for database corruption issues
936        for i in 0..5 {
937            let conn = store.connection().unwrap();
938            let count: i64 = conn
939                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
940                .unwrap();
941            assert_eq!(count, 0, "Connection {} should work", i);
942            drop(conn);
943        }
944
945        // Write some data
946        let inv = InvocationRecord::new(
947            "test-session",
948            "echo hello",
949            "/home/user",
950            0,
951            "test@client",
952        );
953        store.write_invocation(&inv).unwrap();
954
955        // More connections should still work and see the data
956        for i in 0..3 {
957            let conn = store.connection().unwrap();
958            let count: i64 = conn
959                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
960                .unwrap();
961            assert_eq!(count, 1, "Connection {} should see the data", i);
962            drop(conn);
963        }
964    }
965
966    // ===== Cached schema tests =====
967
968    #[test]
969    fn test_caches_schema_views_work() {
970        let (tmp, store) = setup_store_duckdb();
971
972        // Initially caches should be empty
973        let conn = store.connection().unwrap();
974        let count: i64 = conn
975            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
976            .unwrap();
977        assert_eq!(count, 0);
978        drop(conn);
979
980        // Write and push data
981        let inv = InvocationRecord::new(
982            "test-session",
983            "echo hello",
984            "/home/user",
985            0,
986            "test@client",
987        );
988        store.write_invocation(&inv).unwrap();
989
990        let remote_path = tmp.path().join("remote.duckdb");
991        let remote = create_file_remote("test", &remote_path);
992        store.push(&remote, PushOptions::default()).unwrap();
993
994        // Pull creates cached_test schema
995        store.pull(&remote, PullOptions::default()).unwrap();
996
997        // Rebuild caches views to include cached_test
998        let conn = store.connection().unwrap();
999        store.rebuild_caches_schema(&conn).unwrap();
1000
1001        // caches.invocations should now have data
1002        let count: i64 = conn
1003            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1004            .unwrap();
1005        assert_eq!(count, 1, "caches should include pulled data after rebuild");
1006    }
1007
1008    #[test]
1009    fn test_main_schema_unions_local_and_caches() {
1010        let (tmp, store) = setup_store_duckdb();
1011
1012        // Write local data
1013        let inv1 = InvocationRecord::new(
1014            "test-session",
1015            "local command",
1016            "/home/user",
1017            0,
1018            "local@client",
1019        );
1020        store.write_invocation(&inv1).unwrap();
1021
1022        // Push to remote, then pull (simulating another client's data)
1023        let remote_path = tmp.path().join("remote.duckdb");
1024        let remote = create_file_remote("team", &remote_path);
1025
1026        // Push local data to remote
1027        store.push(&remote, PushOptions::default()).unwrap();
1028
1029        // Delete local, pull from remote to create cached data
1030        let conn = store.connection().unwrap();
1031        conn.execute("DELETE FROM local.invocations", []).unwrap();
1032        drop(conn);
1033
1034        store.pull(&remote, PullOptions::default()).unwrap();
1035
1036        // Rebuild caches schema
1037        let conn = store.connection().unwrap();
1038        store.rebuild_caches_schema(&conn).unwrap();
1039
1040        // Write new local data
1041        let inv2 = InvocationRecord::new(
1042            "test-session-2",
1043            "new local command",
1044            "/home/user",
1045            0,
1046            "local@client",
1047        );
1048        drop(conn);
1049        store.write_invocation(&inv2).unwrap();
1050
1051        // main.invocations should have both local and cached
1052        let conn = store.connection().unwrap();
1053        let count: i64 = conn
1054            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1055            .unwrap();
1056        assert_eq!(count, 2, "main should union local + caches");
1057    }
1058}