Skip to main content

magic_bird/store/
remote.rs

1//! Remote sync operations (push/pull).
2//!
3//! Provides functionality to sync data between local and remote DuckDB databases.
4//!
5//! # Schema Architecture
6//!
7//! - **Push**: Reads from `local` schema, writes to `remote_<name>` schema tables
8//! - **Pull**: Reads from `remote_<name>` schema, writes to `cached_<name>` schema tables
9//!
10//! Remote databases have tables: `sessions`, `invocations`, `outputs`, `events`
11//! (no `_table` suffix - consistent naming across all schemas).
12//!
13//! # Blob Sync
14//!
15//! When `sync_blobs` is enabled, blob files (outputs stored as file:// refs) are
16//! also synced. For file remotes, we prefer hard links (fast, no disk duplication)
17//! and fall back to copying when hard links fail (cross-filesystem).
18
19use std::fs;
20use std::path::{Path, PathBuf};
21
22use chrono::{NaiveDate, TimeDelta, Utc};
23use duckdb::Connection;
24
25use crate::config::RemoteType;
26use crate::{Error, RemoteConfig, Result};
27
28/// Statistics from blob sync operations.
29#[derive(Debug, Default)]
30pub struct BlobStats {
31    /// Number of blobs synced.
32    pub count: usize,
33    /// Total bytes synced.
34    pub bytes: u64,
35    /// Number of blobs hard-linked (no copy needed).
36    pub linked: usize,
37    /// Number of blobs copied (hard link failed).
38    pub copied: usize,
39    /// Number of blobs skipped (already exist).
40    pub skipped: usize,
41}
42
43impl std::fmt::Display for BlobStats {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        if self.count == 0 {
46            write!(f, "0 blobs")
47        } else {
48            let kb = self.bytes / 1024;
49            write!(
50                f,
51                "{} blobs ({}KB, {} linked, {} copied, {} skipped)",
52                self.count, kb, self.linked, self.copied, self.skipped
53            )
54        }
55    }
56}
57
58/// Statistics from a push operation.
59#[derive(Debug, Default)]
60pub struct PushStats {
61    pub sessions: usize,
62    pub invocations: usize,
63    pub outputs: usize,
64    pub events: usize,
65    pub blobs: BlobStats,
66}
67
68impl std::fmt::Display for PushStats {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "{} sessions, {} invocations, {} outputs, {} events",
73            self.sessions, self.invocations, self.outputs, self.events
74        )?;
75        if self.blobs.count > 0 {
76            write!(f, ", {}", self.blobs)?;
77        }
78        Ok(())
79    }
80}
81
82/// Statistics from a pull operation.
83#[derive(Debug, Default)]
84pub struct PullStats {
85    pub sessions: usize,
86    pub invocations: usize,
87    pub outputs: usize,
88    pub events: usize,
89    pub blobs: BlobStats,
90}
91
92impl std::fmt::Display for PullStats {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "{} sessions, {} invocations, {} outputs, {} events",
97            self.sessions, self.invocations, self.outputs, self.events
98        )?;
99        if self.blobs.count > 0 {
100            write!(f, ", {}", self.blobs)?;
101        }
102        Ok(())
103    }
104}
105
106/// Options for push operation.
107#[derive(Debug, Default)]
108pub struct PushOptions {
109    /// Only push data since this date.
110    pub since: Option<NaiveDate>,
111    /// Show what would be pushed without actually pushing.
112    pub dry_run: bool,
113    /// Sync blob files (not just metadata).
114    pub sync_blobs: bool,
115}
116
117/// Options for pull operation.
118#[derive(Debug, Default)]
119pub struct PullOptions {
120    /// Only pull data since this date.
121    pub since: Option<NaiveDate>,
122    /// Only pull data from this client.
123    pub client_id: Option<String>,
124    /// Sync blob files (not just metadata).
125    pub sync_blobs: bool,
126}
127
128/// Parse a "since" string into a date.
129///
130/// Supports:
131/// - Duration: "7d", "2w", "1m" (days, weeks, months)
132/// - Date: "2024-01-15"
133pub fn parse_since(s: &str) -> Result<NaiveDate> {
134    let s = s.trim();
135
136    // Try duration first (7d, 2w, 1m)
137    if let Some(days) = parse_duration_days(s) {
138        let date = Utc::now().date_naive() - TimeDelta::days(days);
139        return Ok(date);
140    }
141
142    // Try date format (YYYY-MM-DD)
143    NaiveDate::parse_from_str(s, "%Y-%m-%d")
144        .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
145}
146
147/// Parse a duration string into days.
148fn parse_duration_days(s: &str) -> Option<i64> {
149    let s = s.trim().to_lowercase();
150
151    if let Some(num) = s.strip_suffix('d') {
152        num.parse::<i64>().ok()
153    } else if let Some(num) = s.strip_suffix('w') {
154        num.parse::<i64>().ok().map(|n| n * 7)
155    } else if let Some(num) = s.strip_suffix('m') {
156        num.parse::<i64>().ok().map(|n| n * 30)
157    } else {
158        None
159    }
160}
161
162/// Get the cached schema name for a remote (e.g., "cached_team" for remote "team").
163#[allow(dead_code)]
164pub fn cached_schema_name(remote_name: &str) -> String {
165    format!("cached_{}", remote_name)
166}
167
168/// Get the quoted cached schema name for SQL.
169pub fn quoted_cached_schema_name(remote_name: &str) -> String {
170    format!("\"cached_{}\"", remote_name)
171}
172
173/// Get the data directory for a file remote.
174///
175/// For a remote URI like `file:///path/to/remote.duckdb`, returns `/path/to`.
176/// This is where blob paths (like `recent/blobs/content/...`) are relative to.
177fn file_remote_data_dir(remote: &RemoteConfig) -> Option<PathBuf> {
178    if remote.remote_type != RemoteType::File {
179        return None;
180    }
181
182    // Parse file:// URI to get the database path
183    let db_path = remote.uri.strip_prefix("file://")?;
184    let db_path = Path::new(db_path);
185
186    // Data directory is the parent of the .duckdb file
187    // e.g., /path/to/remote.duckdb -> /path/to
188    db_path.parent().map(PathBuf::from)
189}
190
191/// Information about a blob to sync.
192#[derive(Debug)]
193struct BlobInfo {
194    content_hash: String,
195    storage_path: String,
196    byte_length: i64,
197}
198
199/// Sync a single blob file using hard link or copy.
200///
201/// Returns `Ok(true)` if the blob was synced (linked or copied),
202/// `Ok(false)` if it already exists at destination.
203fn sync_blob_file(src: &Path, dst: &Path, stats: &mut BlobStats) -> Result<bool> {
204    // Check if destination already exists
205    if dst.exists() {
206        stats.skipped += 1;
207        return Ok(false);
208    }
209
210    // Ensure parent directory exists
211    if let Some(parent) = dst.parent() {
212        fs::create_dir_all(parent)?;
213    }
214
215    // Try hard link first (fast, no disk duplication)
216    match fs::hard_link(src, dst) {
217        Ok(()) => {
218            stats.linked += 1;
219            stats.count += 1;
220            if let Ok(meta) = fs::metadata(dst) {
221                stats.bytes += meta.len();
222            }
223            Ok(true)
224        }
225        Err(_) => {
226            // Fall back to copy (cross-filesystem)
227            fs::copy(src, dst)?;
228            stats.copied += 1;
229            stats.count += 1;
230            if let Ok(meta) = fs::metadata(dst) {
231                stats.bytes += meta.len();
232            }
233            Ok(true)
234        }
235    }
236}
237
238impl super::Store {
239    /// Push local data to a remote.
240    ///
241    /// Reads from `local` schema, writes to remote's tables.
242    /// Only pushes records that don't already exist on the remote (by id).
243    /// When `sync_blobs` is enabled, also syncs blob files for file remotes.
244    pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
245        use crate::config::RemoteMode;
246
247        // Read-only remotes can't be pushed to - return empty stats for dry_run
248        if remote.mode == RemoteMode::ReadOnly {
249            if opts.dry_run {
250                // Nothing to push to a read-only remote
251                return Ok(PushStats::default());
252            } else {
253                return Err(Error::Config(format!(
254                    "Cannot push to read-only remote '{}'",
255                    remote.name
256                )));
257            }
258        }
259
260        // Use connection without auto-attach to avoid conflicts and unnecessary views
261        let conn = self.connection_with_options(false)?;
262
263        // Attach only the target remote
264        self.attach_remote(&conn, remote)?;
265
266        let remote_schema = remote.quoted_schema_name();
267
268        // Ensure remote has the required tables (including blob_registry)
269        ensure_remote_schema(&conn, &remote_schema)?;
270
271        let mut stats = PushStats::default();
272
273        if opts.dry_run {
274            // Count what would be pushed
275            stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
276            stats.invocations = count_table_to_push(&conn, "invocations", &remote_schema, opts.since)?;
277            stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
278            stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
279            if opts.sync_blobs {
280                stats.blobs = count_blobs_to_push(&conn, &remote_schema, opts.since)?;
281            }
282        } else {
283            // Sync blobs first (before pushing output metadata)
284            if opts.sync_blobs {
285                stats.blobs = self.push_blobs(&conn, remote, &remote_schema, opts.since)?;
286            }
287
288            // Actually push in dependency order
289            stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
290            stats.invocations = push_table(&conn, "invocations", &remote_schema, opts.since)?;
291            stats.outputs = push_outputs(&conn, &remote_schema, opts.since, opts.sync_blobs)?;
292            stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
293        }
294
295        Ok(stats)
296    }
297
298    /// Push blob files to a file remote.
299    ///
300    /// Syncs blob files using hard links (preferred) or copies (fallback).
301    /// Also syncs blob_registry entries.
302    fn push_blobs(
303        &self,
304        conn: &Connection,
305        remote: &RemoteConfig,
306        remote_schema: &str,
307        since: Option<NaiveDate>,
308    ) -> Result<BlobStats> {
309        let mut stats = BlobStats::default();
310
311        // Only file remotes support blob sync for now
312        let remote_data_dir = match file_remote_data_dir(remote) {
313            Some(dir) => dir,
314            None => return Ok(stats), // Not a file remote, skip blob sync
315        };
316
317        // Find blobs that need to be synced
318        let blobs = get_blobs_to_push(conn, remote_schema, since)?;
319        if blobs.is_empty() {
320            return Ok(stats);
321        }
322
323        let local_data_dir = self.config.data_dir();
324
325        for blob in &blobs {
326            // Build source and destination paths
327            // storage_path is relative to data_dir (e.g., "recent/blobs/content/ab/hash.bin")
328            let src = local_data_dir.join(&blob.storage_path);
329            let dst = remote_data_dir.join(&blob.storage_path);
330
331            if !src.exists() {
332                // Source blob missing, skip
333                continue;
334            }
335
336            // Sync the blob file
337            sync_blob_file(&src, &dst, &mut stats)?;
338
339            // Sync blob_registry entry
340            let escaped_hash = blob.content_hash.replace('\'', "''");
341            let escaped_path = blob.storage_path.replace('\'', "''");
342            conn.execute(
343                &format!(
344                    r#"
345                    INSERT INTO {schema}.blob_registry (content_hash, byte_length, storage_path)
346                    SELECT '{hash}', {len}, '{path}'
347                    WHERE NOT EXISTS (
348                        SELECT 1 FROM {schema}.blob_registry WHERE content_hash = '{hash}'
349                    )
350                    "#,
351                    schema = remote_schema,
352                    hash = escaped_hash,
353                    len = blob.byte_length,
354                    path = escaped_path,
355                ),
356                [],
357            )?;
358        }
359
360        Ok(stats)
361    }
362
363    /// Pull data from a remote into local cached_<name> schema.
364    ///
365    /// Reads from remote's tables, writes to `cached_<name>` schema.
366    /// Only pulls records that don't already exist in the cached schema (by id).
367    /// After pulling, rebuilds the `caches` union views.
368    /// When `sync_blobs` is enabled, also syncs blob files for file remotes.
369    pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
370        // Use connection without auto-attach to avoid conflicts
371        let conn = self.connection_with_options(false)?;
372
373        // Attach only the target remote
374        self.attach_remote(&conn, remote)?;
375
376        let remote_schema = remote.quoted_schema_name();
377        let cached_schema = quoted_cached_schema_name(&remote.name);
378
379        // Ensure cached schema exists with required tables
380        ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
381
382        // Pull in dependency order (sessions first, then invocations, outputs, events)
383        let mut stats = PullStats {
384            sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
385            invocations: pull_table(&conn, "invocations", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
386            outputs: pull_outputs(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref(), opts.sync_blobs)?,
387            events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
388            blobs: BlobStats::default(),
389        };
390
391        // Sync blob files after pulling output metadata
392        if opts.sync_blobs {
393            stats.blobs = self.pull_blobs(&conn, remote, &remote_schema, &cached_schema)?;
394        }
395
396        // Rebuild caches union views to include this cached schema
397        self.rebuild_caches_schema(&conn)?;
398
399        Ok(stats)
400    }
401
402    /// Pull blob files from a file remote.
403    ///
404    /// Syncs blob files using hard links (preferred) or copies (fallback).
405    /// Also registers blobs in the local blob_registry.
406    fn pull_blobs(
407        &self,
408        conn: &Connection,
409        remote: &RemoteConfig,
410        remote_schema: &str,
411        cached_schema: &str,
412    ) -> Result<BlobStats> {
413        let mut stats = BlobStats::default();
414
415        // Only file remotes support blob sync for now
416        let remote_data_dir = match file_remote_data_dir(remote) {
417            Some(dir) => dir,
418            None => return Ok(stats), // Not a file remote, skip blob sync
419        };
420
421        // Find blobs that were pulled (in cached outputs but not in local blob_registry)
422        let blobs = get_blobs_to_pull(conn, remote_schema, cached_schema)?;
423        if blobs.is_empty() {
424            return Ok(stats);
425        }
426
427        let local_data_dir = self.config.data_dir();
428
429        for blob in &blobs {
430            // Build source and destination paths
431            // storage_path is relative to data_dir (e.g., "recent/blobs/content/ab/hash.bin")
432            let src = remote_data_dir.join(&blob.storage_path);
433            let dst = local_data_dir.join(&blob.storage_path);
434
435            if !src.exists() {
436                // Source blob missing on remote, skip
437                continue;
438            }
439
440            // Sync the blob file
441            let synced = sync_blob_file(&src, &dst, &mut stats)?;
442
443            // Register in local blob_registry if we synced a new blob
444            if synced {
445                let escaped_hash = blob.content_hash.replace('\'', "''");
446                let escaped_path = blob.storage_path.replace('\'', "''");
447                conn.execute(
448                    &format!(
449                        r#"
450                        INSERT INTO blob_registry (content_hash, byte_length, storage_path)
451                        SELECT '{hash}', {len}, '{path}'
452                        WHERE NOT EXISTS (
453                            SELECT 1 FROM blob_registry WHERE content_hash = '{hash}'
454                        )
455                        "#,
456                        hash = escaped_hash,
457                        len = blob.byte_length,
458                        path = escaped_path,
459                    ),
460                    [],
461                )?;
462            }
463        }
464
465        Ok(stats)
466    }
467
468    /// Rebuild the `caches` schema views to union all `cached_*` schemas.
469    ///
470    /// Uses explicit transaction for DDL safety. The caches.* views reference
471    /// local cached_* schemas (not attached databases), so they should be safe
472    /// to persist.
473    pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
474        // Find all cached_* schemas
475        let schemas: Vec<String> = conn
476            .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
477            .query_map([], |row| row.get(0))?
478            .filter_map(|r| r.ok())
479            .collect();
480
481        // Use transaction for DDL safety
482        conn.execute("BEGIN TRANSACTION", [])?;
483
484        let result = (|| -> std::result::Result<(), duckdb::Error> {
485            for table in &["sessions", "invocations", "outputs", "events"] {
486                let mut union_parts: Vec<String> = schemas
487                    .iter()
488                    .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
489                    .collect();
490
491                // Always include placeholder (ensures view is valid even with no cached schemas)
492                if !schemas.iter().any(|s| s == "cached_placeholder") {
493                    union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
494                }
495
496                let sql = format!(
497                    "CREATE OR REPLACE VIEW caches.{} AS {}",
498                    table,
499                    union_parts.join(" UNION ALL BY NAME ")
500                );
501                conn.execute(&sql, [])?;
502            }
503            Ok(())
504        })();
505
506        match result {
507            Ok(()) => {
508                conn.execute("COMMIT", [])?;
509                Ok(())
510            }
511            Err(e) => {
512                let _ = conn.execute("ROLLBACK", []);
513                Err(crate::Error::DuckDb(e))
514            }
515        }
516    }
517}
518
519/// Ensure the remote schema has the required tables.
520/// Tables use consistent naming (no `_table` suffix).
521fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
522    let sql = format!(
523        r#"
524        CREATE TABLE IF NOT EXISTS {schema}.sessions (
525            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
526            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
527        );
528        CREATE TABLE IF NOT EXISTS {schema}.invocations (
529            id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
530            cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR,
531            exit_code INTEGER, status VARCHAR, format_hint VARCHAR, client_id VARCHAR,
532            hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE
533        );
534        CREATE TABLE IF NOT EXISTS {schema}.outputs (
535            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
536            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
537            content_type VARCHAR, date DATE
538        );
539        CREATE TABLE IF NOT EXISTS {schema}.events (
540            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
541            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
542            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
543            status VARCHAR, format_used VARCHAR, date DATE
544        );
545        CREATE TABLE IF NOT EXISTS {schema}.blob_registry (
546            content_hash VARCHAR PRIMARY KEY,
547            byte_length BIGINT NOT NULL,
548            ref_count INTEGER DEFAULT 1,
549            first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
550            last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
551            storage_path VARCHAR NOT NULL
552        );
553        "#,
554        schema = schema
555    );
556    conn.execute_batch(&sql)?;
557    Ok(())
558}
559
560/// Ensure the cached schema exists with required tables.
561/// Tables include a `_source` column to track which remote the data came from.
562fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
563    // Create the schema if it doesn't exist
564    conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
565
566    // Create tables with _source column
567    let sql = format!(
568        r#"
569        CREATE TABLE IF NOT EXISTS {schema}.sessions (
570            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
571            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
572            _source VARCHAR DEFAULT '{remote_name}'
573        );
574        CREATE TABLE IF NOT EXISTS {schema}.invocations (
575            id UUID, session_id VARCHAR, timestamp TIMESTAMP, duration_ms BIGINT,
576            cwd VARCHAR, cmd VARCHAR, executable VARCHAR, runner_id VARCHAR,
577            exit_code INTEGER, status VARCHAR, format_hint VARCHAR, client_id VARCHAR,
578            hostname VARCHAR, username VARCHAR, tag VARCHAR, date DATE,
579            _source VARCHAR DEFAULT '{remote_name}'
580        );
581        CREATE TABLE IF NOT EXISTS {schema}.outputs (
582            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
583            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
584            content_type VARCHAR, date DATE,
585            _source VARCHAR DEFAULT '{remote_name}'
586        );
587        CREATE TABLE IF NOT EXISTS {schema}.events (
588            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
589            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
590            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
591            status VARCHAR, format_used VARCHAR, date DATE,
592            _source VARCHAR DEFAULT '{remote_name}'
593        );
594        "#,
595        schema = schema,
596        remote_name = remote_name.replace('\'', "''")
597    );
598    conn.execute_batch(&sql)?;
599    Ok(())
600}
601
602/// Build the WHERE clause for time filtering.
603fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
604    since
605        .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
606        .unwrap_or_default()
607}
608
609/// Build the WHERE clause for client filtering.
610fn client_clause(client_id: Option<&str>) -> String {
611    client_id
612        .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
613        .unwrap_or_default()
614}
615
616/// Count sessions that would be pushed.
617/// Reads from `local` schema.
618fn count_sessions_to_push(
619    conn: &Connection,
620    remote_schema: &str,
621    since: Option<NaiveDate>,
622) -> Result<usize> {
623    let since_filter = since_clause(since, "i.timestamp");
624
625    let sql = format!(
626        r#"
627        SELECT COUNT(DISTINCT s.session_id)
628        FROM local.sessions s
629        JOIN local.invocations i ON i.session_id = s.session_id
630        WHERE NOT EXISTS (
631            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
632        )
633        {since}
634        "#,
635        remote = remote_schema,
636        since = since_filter,
637    );
638
639    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
640    Ok(count as usize)
641}
642
643/// Count records that would be pushed for a table.
644/// Reads from `local` schema.
645fn count_table_to_push(
646    conn: &Connection,
647    table: &str,
648    remote_schema: &str,
649    since: Option<NaiveDate>,
650) -> Result<usize> {
651    let sql = match table {
652        "invocations" => {
653            let since_filter = since_clause(since, "l.timestamp");
654            format!(
655                r#"
656                SELECT COUNT(*)
657                FROM local.{table} l
658                WHERE NOT EXISTS (
659                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
660                )
661                {since}
662                "#,
663                table = table,
664                remote = remote_schema,
665                since = since_filter,
666            )
667        }
668        "outputs" | "events" => {
669            let since_filter = since_clause(since, "i.timestamp");
670            format!(
671                r#"
672                SELECT COUNT(*)
673                FROM local.{table} l
674                JOIN local.invocations i ON i.id = l.invocation_id
675                WHERE NOT EXISTS (
676                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
677                )
678                {since}
679                "#,
680                table = table,
681                remote = remote_schema,
682                since = since_filter,
683            )
684        }
685        _ => {
686            format!(
687                r#"
688                SELECT COUNT(*)
689                FROM local.{table} l
690                WHERE NOT EXISTS (
691                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
692                )
693                "#,
694                table = table,
695                remote = remote_schema,
696            )
697        }
698    };
699
700    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
701    Ok(count as usize)
702}
703
704/// Push sessions from `local` to remote.
705fn push_sessions(
706    conn: &Connection,
707    remote_schema: &str,
708    since: Option<NaiveDate>,
709) -> Result<usize> {
710    let since_filter = since_clause(since, "i.timestamp");
711
712    let sql = format!(
713        r#"
714        INSERT INTO {remote}.sessions
715        SELECT DISTINCT s.*
716        FROM local.sessions s
717        JOIN local.invocations i ON i.session_id = s.session_id
718        WHERE NOT EXISTS (
719            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
720        )
721        {since}
722        "#,
723        remote = remote_schema,
724        since = since_filter,
725    );
726
727    let count = conn.execute(&sql, [])?;
728    Ok(count)
729}
730
731/// Push records from `local` to remote.
732fn push_table(
733    conn: &Connection,
734    table: &str,
735    remote_schema: &str,
736    since: Option<NaiveDate>,
737) -> Result<usize> {
738    let sql = match table {
739        "invocations" => {
740            let since_filter = since_clause(since, "l.timestamp");
741            format!(
742                r#"
743                INSERT INTO {remote}.{table}
744                SELECT *
745                FROM local.{table} l
746                WHERE NOT EXISTS (
747                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
748                )
749                {since}
750                "#,
751                table = table,
752                remote = remote_schema,
753                since = since_filter,
754            )
755        }
756        "outputs" | "events" => {
757            let since_filter = since_clause(since, "i.timestamp");
758            format!(
759                r#"
760                INSERT INTO {remote}.{table}
761                SELECT l.*
762                FROM local.{table} l
763                JOIN local.invocations i ON i.id = l.invocation_id
764                WHERE NOT EXISTS (
765                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
766                )
767                {since}
768                "#,
769                table = table,
770                remote = remote_schema,
771                since = since_filter,
772            )
773        }
774        _ => {
775            format!(
776                r#"
777                INSERT INTO {remote}.{table}
778                SELECT *
779                FROM local.{table} l
780                WHERE NOT EXISTS (
781                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
782                )
783                "#,
784                table = table,
785                remote = remote_schema,
786            )
787        }
788    };
789
790    let count = conn.execute(&sql, [])?;
791    Ok(count)
792}
793
794/// Pull sessions from remote into cached schema.
795fn pull_sessions(
796    conn: &Connection,
797    remote_schema: &str,
798    cached_schema: &str,
799    since: Option<NaiveDate>,
800    client_id: Option<&str>,
801) -> Result<usize> {
802    let since_filter = since_clause(since, "r.registered_at");
803    let client_filter = client_clause(client_id);
804
805    let sql = format!(
806        r#"
807        INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
808        SELECT r.*
809        FROM {remote}.sessions r
810        WHERE NOT EXISTS (
811            SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
812        )
813        {since}
814        {client}
815        "#,
816        cached = cached_schema,
817        remote = remote_schema,
818        since = since_filter,
819        client = client_filter,
820    );
821
822    let count = conn.execute(&sql, [])?;
823    Ok(count)
824}
825
826/// Pull records from remote into cached schema.
827fn pull_table(
828    conn: &Connection,
829    table: &str,
830    remote_schema: &str,
831    cached_schema: &str,
832    since: Option<NaiveDate>,
833    client_id: Option<&str>,
834) -> Result<usize> {
835    let client_filter = client_clause(client_id);
836
837    let sql = match table {
838        "invocations" => {
839            let since_filter = since_clause(since, "r.timestamp");
840            format!(
841                r#"
842                INSERT INTO {cached}.{table} (id, session_id, timestamp, duration_ms, cwd, cmd, executable, runner_id, exit_code, status, format_hint, client_id, hostname, username, tag, date)
843                SELECT r.*
844                FROM {remote}.{table} r
845                WHERE NOT EXISTS (
846                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
847                )
848                {since}
849                {client}
850                "#,
851                table = table,
852                cached = cached_schema,
853                remote = remote_schema,
854                since = since_filter,
855                client = client_filter,
856            )
857        }
858        "outputs" => {
859            let since_filter = since_clause(since, "i.timestamp");
860            format!(
861                r#"
862                INSERT INTO {cached}.{table} (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
863                SELECT r.*
864                FROM {remote}.{table} r
865                JOIN {remote}.invocations i ON i.id = r.invocation_id
866                WHERE NOT EXISTS (
867                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
868                )
869                {since}
870                {client}
871                "#,
872                table = table,
873                cached = cached_schema,
874                remote = remote_schema,
875                since = since_filter,
876                client = if client_id.is_some() {
877                    format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
878                } else {
879                    String::new()
880                },
881            )
882        }
883        "events" => {
884            let since_filter = since_clause(since, "i.timestamp");
885            format!(
886                r#"
887                INSERT INTO {cached}.{table} (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
888                SELECT r.*
889                FROM {remote}.{table} r
890                JOIN {remote}.invocations i ON i.id = r.invocation_id
891                WHERE NOT EXISTS (
892                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
893                )
894                {since}
895                {client}
896                "#,
897                table = table,
898                cached = cached_schema,
899                remote = remote_schema,
900                since = since_filter,
901                client = if client_id.is_some() {
902                    format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
903                } else {
904                    String::new()
905                },
906            )
907        }
908        _ => {
909            format!(
910                r#"
911                INSERT INTO {cached}.{table}
912                SELECT r.*
913                FROM {remote}.{table} r
914                WHERE NOT EXISTS (
915                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
916                )
917                {client}
918                "#,
919                table = table,
920                cached = cached_schema,
921                remote = remote_schema,
922                client = client_filter,
923            )
924        }
925    };
926
927    let count = conn.execute(&sql, [])?;
928    Ok(count)
929}
930
931/// Count blobs that would be pushed (for dry run).
932fn count_blobs_to_push(
933    conn: &Connection,
934    remote_schema: &str,
935    since: Option<NaiveDate>,
936) -> Result<BlobStats> {
937    let since_filter = since_clause(since, "i.timestamp");
938
939    let sql = format!(
940        r#"
941        SELECT COUNT(DISTINCT o.content_hash), COALESCE(SUM(o.byte_length), 0)
942        FROM local.outputs o
943        JOIN local.invocations i ON i.id = o.invocation_id
944        WHERE o.storage_type = 'blob'
945          AND NOT EXISTS (
946              SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
947          )
948        {since}
949        "#,
950        remote = remote_schema,
951        since = since_filter,
952    );
953
954    let (count, bytes): (i64, i64) = conn.query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
955    Ok(BlobStats {
956        count: count as usize,
957        bytes: bytes as u64,
958        ..Default::default()
959    })
960}
961
962/// Get blobs that need to be pushed to remote.
963fn get_blobs_to_push(
964    conn: &Connection,
965    remote_schema: &str,
966    since: Option<NaiveDate>,
967) -> Result<Vec<BlobInfo>> {
968    let since_filter = since_clause(since, "i.timestamp");
969
970    let sql = format!(
971        r#"
972        SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
973        FROM local.outputs o
974        JOIN local.invocations i ON i.id = o.invocation_id
975        JOIN blob_registry b ON b.content_hash = o.content_hash
976        WHERE o.storage_type = 'blob'
977          AND NOT EXISTS (
978              SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
979          )
980        {since}
981        "#,
982        remote = remote_schema,
983        since = since_filter,
984    );
985
986    let mut stmt = conn.prepare(&sql)?;
987    let blobs = stmt
988        .query_map([], |row| {
989            Ok(BlobInfo {
990                content_hash: row.get(0)?,
991                storage_path: row.get(1)?,
992                byte_length: row.get(2)?,
993            })
994        })?
995        .filter_map(|r| r.ok())
996        .collect();
997
998    Ok(blobs)
999}
1000
1001/// Get blobs that were pulled (in remote outputs but not in local blob_registry).
1002fn get_blobs_to_pull(
1003    conn: &Connection,
1004    remote_schema: &str,
1005    cached_schema: &str,
1006) -> Result<Vec<BlobInfo>> {
1007    let sql = format!(
1008        r#"
1009        SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1010        FROM {cached}.outputs o
1011        JOIN {remote}.blob_registry b ON b.content_hash = o.content_hash
1012        WHERE o.storage_type = 'blob'
1013          AND NOT EXISTS (
1014              SELECT 1 FROM blob_registry r WHERE r.content_hash = o.content_hash
1015          )
1016        "#,
1017        cached = cached_schema,
1018        remote = remote_schema,
1019    );
1020
1021    let mut stmt = conn.prepare(&sql)?;
1022    let blobs = stmt
1023        .query_map([], |row| {
1024            Ok(BlobInfo {
1025                content_hash: row.get(0)?,
1026                storage_path: row.get(1)?,
1027                byte_length: row.get(2)?,
1028            })
1029        })?
1030        .filter_map(|r| r.ok())
1031        .collect();
1032
1033    Ok(blobs)
1034}
1035
1036/// Push outputs to remote, optionally converting storage_ref paths.
1037fn push_outputs(
1038    conn: &Connection,
1039    remote_schema: &str,
1040    since: Option<NaiveDate>,
1041    _sync_blobs: bool,
1042) -> Result<usize> {
1043    let since_filter = since_clause(since, "i.timestamp");
1044
1045    // For now, we keep storage_ref as-is. The blob files are synced separately.
1046    // The storage_ref format (file://recent/blobs/...) is relative and works on both sides.
1047    let sql = format!(
1048        r#"
1049        INSERT INTO {remote}.outputs
1050        SELECT l.*
1051        FROM local.outputs l
1052        JOIN local.invocations i ON i.id = l.invocation_id
1053        WHERE NOT EXISTS (
1054            SELECT 1 FROM {remote}.outputs r WHERE r.id = l.id
1055        )
1056        {since}
1057        "#,
1058        remote = remote_schema,
1059        since = since_filter,
1060    );
1061
1062    let count = conn.execute(&sql, [])?;
1063    Ok(count)
1064}
1065
1066/// Pull outputs from remote, optionally handling storage_ref paths.
1067fn pull_outputs(
1068    conn: &Connection,
1069    remote_schema: &str,
1070    cached_schema: &str,
1071    since: Option<NaiveDate>,
1072    client_id: Option<&str>,
1073    _sync_blobs: bool,
1074) -> Result<usize> {
1075    let since_filter = since_clause(since, "i.timestamp");
1076    let client_filter = if client_id.is_some() {
1077        format!("AND i.client_id = '{}'", client_id.unwrap().replace('\'', "''"))
1078    } else {
1079        String::new()
1080    };
1081
1082    // For now, we keep storage_ref as-is. The blob files are synced separately.
1083    // The storage_ref format (file://recent/blobs/...) is relative and works on both sides.
1084    let sql = format!(
1085        r#"
1086        INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1087        SELECT r.*
1088        FROM {remote}.outputs r
1089        JOIN {remote}.invocations i ON i.id = r.invocation_id
1090        WHERE NOT EXISTS (
1091            SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1092        )
1093        {since}
1094        {client}
1095        "#,
1096        cached = cached_schema,
1097        remote = remote_schema,
1098        since = since_filter,
1099        client = client_filter,
1100    );
1101
1102    let count = conn.execute(&sql, [])?;
1103    Ok(count)
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::config::{RemoteConfig, RemoteMode, RemoteType};
1110    use crate::init::initialize;
1111    use crate::schema::InvocationRecord;
1112    use crate::store::{ConnectionOptions, Store};
1113    use crate::Config;
1114    use tempfile::TempDir;
1115
1116    fn setup_store_duckdb() -> (TempDir, Store) {
1117        let tmp = TempDir::new().unwrap();
1118        let config = Config::with_duckdb_mode(tmp.path());
1119        initialize(&config).unwrap();
1120        let store = Store::open(config).unwrap();
1121        (tmp, store)
1122    }
1123
1124    fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
1125        RemoteConfig {
1126            name: name.to_string(),
1127            remote_type: RemoteType::File,
1128            uri: path.to_string_lossy().to_string(),
1129            mode: RemoteMode::ReadWrite,
1130            auto_attach: true,
1131            credential_provider: None,
1132        }
1133    }
1134
1135    // ===== Date parsing tests =====
1136
1137    #[test]
1138    fn test_parse_since_days() {
1139        let today = Utc::now().date_naive();
1140        let result = parse_since("7d").unwrap();
1141        assert_eq!(result, today - TimeDelta::days(7));
1142    }
1143
1144    #[test]
1145    fn test_parse_since_weeks() {
1146        let today = Utc::now().date_naive();
1147        let result = parse_since("2w").unwrap();
1148        assert_eq!(result, today - TimeDelta::days(14));
1149    }
1150
1151    #[test]
1152    fn test_parse_since_months() {
1153        let today = Utc::now().date_naive();
1154        let result = parse_since("1m").unwrap();
1155        assert_eq!(result, today - TimeDelta::days(30));
1156    }
1157
1158    #[test]
1159    fn test_parse_since_date() {
1160        let result = parse_since("2024-01-15").unwrap();
1161        assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
1162    }
1163
1164    #[test]
1165    fn test_parse_since_invalid() {
1166        assert!(parse_since("invalid").is_err());
1167    }
1168
1169    // ===== Push/Pull integration tests =====
1170
1171    #[test]
1172    fn test_push_to_file_remote() {
1173        let (tmp, store) = setup_store_duckdb();
1174
1175        // Write some local data
1176        let inv = InvocationRecord::new(
1177            "test-session",
1178            "echo hello",
1179            "/home/user",
1180            0,
1181            "test@client",
1182        );
1183        store.write_invocation(&inv).unwrap();
1184
1185        // Create a file remote
1186        let remote_path = tmp.path().join("remote.duckdb");
1187        let remote = create_file_remote("test", &remote_path);
1188
1189        // Push to remote
1190        let stats = store.push(&remote, PushOptions::default()).unwrap();
1191
1192        assert_eq!(stats.invocations, 1);
1193        assert!(remote_path.exists(), "Remote database file should be created");
1194    }
1195
1196    #[test]
1197    fn test_push_is_idempotent() {
1198        let (tmp, store) = setup_store_duckdb();
1199
1200        // Write local data
1201        let inv = InvocationRecord::new(
1202            "test-session",
1203            "echo hello",
1204            "/home/user",
1205            0,
1206            "test@client",
1207        );
1208        store.write_invocation(&inv).unwrap();
1209
1210        // Create remote
1211        let remote_path = tmp.path().join("remote.duckdb");
1212        let remote = create_file_remote("test", &remote_path);
1213
1214        // Push twice
1215        let stats1 = store.push(&remote, PushOptions::default()).unwrap();
1216        let stats2 = store.push(&remote, PushOptions::default()).unwrap();
1217
1218        // First push should transfer data, second should be idempotent
1219        assert_eq!(stats1.invocations, 1);
1220        assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
1221    }
1222
1223    #[test]
1224    fn test_push_dry_run() {
1225        let (tmp, store) = setup_store_duckdb();
1226
1227        // Write local data
1228        let inv = InvocationRecord::new(
1229            "test-session",
1230            "echo hello",
1231            "/home/user",
1232            0,
1233            "test@client",
1234        );
1235        store.write_invocation(&inv).unwrap();
1236
1237        // Create remote
1238        let remote_path = tmp.path().join("remote.duckdb");
1239        let remote = create_file_remote("test", &remote_path);
1240
1241        // Dry run push
1242        let dry_stats = store
1243            .push(
1244                &remote,
1245                PushOptions {
1246                    dry_run: true,
1247                    ..Default::default()
1248                },
1249            )
1250            .unwrap();
1251
1252        assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
1253
1254        // Actual push should still transfer data (dry run didn't modify)
1255        let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
1256        assert_eq!(actual_stats.invocations, 1);
1257    }
1258
1259    #[test]
1260    fn test_pull_from_file_remote() {
1261        let (tmp, store) = setup_store_duckdb();
1262
1263        // Write local data and push to remote
1264        let inv = InvocationRecord::new(
1265            "test-session",
1266            "echo hello",
1267            "/home/user",
1268            0,
1269            "test@client",
1270        );
1271        store.write_invocation(&inv).unwrap();
1272
1273        let remote_path = tmp.path().join("remote.duckdb");
1274        let remote = create_file_remote("test", &remote_path);
1275        store.push(&remote, PushOptions::default()).unwrap();
1276
1277        // Clear local data (simulate different client)
1278        let conn = store.connection().unwrap();
1279        conn.execute("DELETE FROM local.invocations", []).unwrap();
1280        drop(conn);
1281
1282        // Pull from remote
1283        let stats = store.pull(&remote, PullOptions::default()).unwrap();
1284
1285        assert_eq!(stats.invocations, 1, "Should pull the invocation back");
1286
1287        // Verify data in cached schema
1288        let conn = store.connection().unwrap();
1289        let count: i64 = conn
1290            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1291            .unwrap();
1292        assert_eq!(count, 1, "Data should be in cached schema");
1293    }
1294
1295    #[test]
1296    fn test_pull_is_idempotent() {
1297        let (tmp, store) = setup_store_duckdb();
1298
1299        // Setup: write, push, clear local
1300        let inv = InvocationRecord::new(
1301            "test-session",
1302            "echo hello",
1303            "/home/user",
1304            0,
1305            "test@client",
1306        );
1307        store.write_invocation(&inv).unwrap();
1308
1309        let remote_path = tmp.path().join("remote.duckdb");
1310        let remote = create_file_remote("test", &remote_path);
1311        store.push(&remote, PushOptions::default()).unwrap();
1312
1313        // Pull twice
1314        let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
1315        let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
1316
1317        assert_eq!(stats1.invocations, 1);
1318        assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
1319    }
1320
1321    // ===== Remote name handling tests =====
1322
1323    #[test]
1324    fn test_remote_name_with_hyphen() {
1325        let (tmp, store) = setup_store_duckdb();
1326
1327        // Write local data
1328        let inv = InvocationRecord::new(
1329            "test-session",
1330            "echo hello",
1331            "/home/user",
1332            0,
1333            "test@client",
1334        );
1335        store.write_invocation(&inv).unwrap();
1336
1337        // Create remote with hyphen in name
1338        let remote_path = tmp.path().join("my-team-remote.duckdb");
1339        let remote = create_file_remote("my-team", &remote_path);
1340
1341        // Push should work despite hyphen
1342        let stats = store.push(&remote, PushOptions::default()).unwrap();
1343        assert_eq!(stats.invocations, 1);
1344
1345        // Pull should also work (hyphen in name handled correctly)
1346        let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
1347        // Pull brings data from remote into cached_my_team schema
1348        assert_eq!(pull_stats.invocations, 1);
1349    }
1350
1351    #[test]
1352    fn test_remote_name_with_dots() {
1353        let (tmp, store) = setup_store_duckdb();
1354
1355        let inv = InvocationRecord::new(
1356            "test-session",
1357            "echo hello",
1358            "/home/user",
1359            0,
1360            "test@client",
1361        );
1362        store.write_invocation(&inv).unwrap();
1363
1364        // Remote with dots in name
1365        let remote_path = tmp.path().join("team.v2.duckdb");
1366        let remote = create_file_remote("team.v2", &remote_path);
1367
1368        let stats = store.push(&remote, PushOptions::default()).unwrap();
1369        assert_eq!(stats.invocations, 1);
1370    }
1371
1372    // ===== Connection options tests =====
1373
1374    #[test]
1375    fn test_connection_minimal_vs_full() {
1376        let (_tmp, store) = setup_store_duckdb();
1377
1378        // Minimal connection should work
1379        let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
1380        let count: i64 = conn_minimal
1381            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1382            .unwrap();
1383        assert_eq!(count, 0);
1384        drop(conn_minimal);
1385
1386        // Full connection should also work
1387        let conn_full = store.connect(ConnectionOptions::full()).unwrap();
1388        let count: i64 = conn_full
1389            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1390            .unwrap();
1391        assert_eq!(count, 0);
1392    }
1393
1394    #[test]
1395    fn test_multiple_sequential_connections() {
1396        let (_tmp, store) = setup_store_duckdb();
1397
1398        // Open and close multiple connections sequentially
1399        // This tests for database corruption issues
1400        for i in 0..5 {
1401            let conn = store.connection().unwrap();
1402            let count: i64 = conn
1403                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1404                .unwrap();
1405            assert_eq!(count, 0, "Connection {} should work", i);
1406            drop(conn);
1407        }
1408
1409        // Write some data
1410        let inv = InvocationRecord::new(
1411            "test-session",
1412            "echo hello",
1413            "/home/user",
1414            0,
1415            "test@client",
1416        );
1417        store.write_invocation(&inv).unwrap();
1418
1419        // More connections should still work and see the data
1420        for i in 0..3 {
1421            let conn = store.connection().unwrap();
1422            let count: i64 = conn
1423                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1424                .unwrap();
1425            assert_eq!(count, 1, "Connection {} should see the data", i);
1426            drop(conn);
1427        }
1428    }
1429
1430    // ===== Cached schema tests =====
1431
1432    #[test]
1433    fn test_caches_schema_views_work() {
1434        let (tmp, store) = setup_store_duckdb();
1435
1436        // Initially caches should be empty
1437        let conn = store.connection().unwrap();
1438        let count: i64 = conn
1439            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1440            .unwrap();
1441        assert_eq!(count, 0);
1442        drop(conn);
1443
1444        // Write and push data
1445        let inv = InvocationRecord::new(
1446            "test-session",
1447            "echo hello",
1448            "/home/user",
1449            0,
1450            "test@client",
1451        );
1452        store.write_invocation(&inv).unwrap();
1453
1454        let remote_path = tmp.path().join("remote.duckdb");
1455        let remote = create_file_remote("test", &remote_path);
1456        store.push(&remote, PushOptions::default()).unwrap();
1457
1458        // Pull creates cached_test schema
1459        store.pull(&remote, PullOptions::default()).unwrap();
1460
1461        // Rebuild caches views to include cached_test
1462        let conn = store.connection().unwrap();
1463        store.rebuild_caches_schema(&conn).unwrap();
1464
1465        // caches.invocations should now have data
1466        let count: i64 = conn
1467            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1468            .unwrap();
1469        assert_eq!(count, 1, "caches should include pulled data after rebuild");
1470    }
1471
1472    #[test]
1473    fn test_main_schema_unions_local_and_caches() {
1474        let (tmp, store) = setup_store_duckdb();
1475
1476        // Write local data
1477        let inv1 = InvocationRecord::new(
1478            "test-session",
1479            "local command",
1480            "/home/user",
1481            0,
1482            "local@client",
1483        );
1484        store.write_invocation(&inv1).unwrap();
1485
1486        // Push to remote, then pull (simulating another client's data)
1487        let remote_path = tmp.path().join("remote.duckdb");
1488        let remote = create_file_remote("team", &remote_path);
1489
1490        // Push local data to remote
1491        store.push(&remote, PushOptions::default()).unwrap();
1492
1493        // Delete local, pull from remote to create cached data
1494        let conn = store.connection().unwrap();
1495        conn.execute("DELETE FROM local.invocations", []).unwrap();
1496        drop(conn);
1497
1498        store.pull(&remote, PullOptions::default()).unwrap();
1499
1500        // Rebuild caches schema
1501        let conn = store.connection().unwrap();
1502        store.rebuild_caches_schema(&conn).unwrap();
1503
1504        // Write new local data
1505        let inv2 = InvocationRecord::new(
1506            "test-session-2",
1507            "new local command",
1508            "/home/user",
1509            0,
1510            "local@client",
1511        );
1512        drop(conn);
1513        store.write_invocation(&inv2).unwrap();
1514
1515        // main.invocations should have both local and cached
1516        let conn = store.connection().unwrap();
1517        let count: i64 = conn
1518            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1519            .unwrap();
1520        assert_eq!(count, 2, "main should union local + caches");
1521    }
1522
1523    // ===== Heterogeneous Storage Mode Tests =====
1524    // Test querying across different storage modes (parquet and duckdb)
1525
1526    fn setup_store_parquet() -> (TempDir, Store) {
1527        let tmp = TempDir::new().unwrap();
1528        let config = Config::with_root(tmp.path()); // Parquet is the default
1529        initialize(&config).unwrap();
1530        let store = Store::open(config).unwrap();
1531        (tmp, store)
1532    }
1533
1534    #[test]
1535    fn test_heterogeneous_parquet_local_duckdb_remote() {
1536        // Local store uses parquet mode
1537        let (_local_tmp, local_store) = setup_store_parquet();
1538
1539        // Remote store uses duckdb mode
1540        let remote_tmp = TempDir::new().unwrap();
1541        let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1542        initialize(&remote_config).unwrap();
1543        let remote_store = Store::open(remote_config).unwrap();
1544
1545        // Write data to remote (DuckDB mode - stored in local.invocations table)
1546        let remote_inv = InvocationRecord::new(
1547            "remote-session",
1548            "remote command",
1549            "/home/remote",
1550            0,
1551            "remote@client",
1552        );
1553        remote_store.write_invocation(&remote_inv).unwrap();
1554
1555        // Verify remote has data
1556        let remote_conn = remote_store.connection().unwrap();
1557        let remote_count: i64 = remote_conn
1558            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1559            .unwrap();
1560        assert_eq!(remote_count, 1, "Remote should have data in local schema");
1561        drop(remote_conn);
1562
1563        // Write data to local (Parquet mode - stored in parquet files)
1564        let local_inv = InvocationRecord::new(
1565            "local-session",
1566            "local command",
1567            "/home/local",
1568            0,
1569            "local@client",
1570        );
1571        local_store.write_invocation(&local_inv).unwrap();
1572
1573        // Configure local to attach the remote (read-only)
1574        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1575        let remote_config = RemoteConfig {
1576            name: "duckdb-store".to_string(),
1577            remote_type: RemoteType::File,
1578            uri: format!("file://{}", remote_db_path.display()),
1579            mode: RemoteMode::ReadOnly,
1580            auto_attach: true,
1581            credential_provider: None,
1582        };
1583
1584        // Manually attach the remote to test heterogeneous querying
1585        let conn = local_store.connection_with_options(false).unwrap();
1586        local_store.attach_remote(&conn, &remote_config).unwrap();
1587
1588        // Create remote macros (this tests detect_remote_table_path)
1589        // The remote is a DuckDB-mode BIRD database, so tables are in local schema
1590        let schema = remote_config.quoted_schema_name();
1591        let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1592        assert_eq!(table_prefix, "local.", "Should detect DuckDB mode remote has local. prefix");
1593
1594        // Query the remote directly
1595        let remote_count: i64 = conn
1596            .query_row(
1597                &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1598                [],
1599                |r| r.get(0),
1600            )
1601            .unwrap();
1602        assert_eq!(remote_count, 1, "Should be able to query DuckDB remote from Parquet local");
1603    }
1604
1605    #[test]
1606    fn test_heterogeneous_duckdb_local_parquet_remote() {
1607        // Local store uses duckdb mode
1608        let (_local_tmp, local_store) = setup_store_duckdb();
1609
1610        // Remote store uses parquet mode
1611        let remote_tmp = TempDir::new().unwrap();
1612        let remote_config = Config::with_root(remote_tmp.path());
1613        initialize(&remote_config).unwrap();
1614        let remote_store = Store::open(remote_config).unwrap();
1615
1616        // Write data to remote (Parquet mode - stored in parquet files)
1617        let remote_inv = InvocationRecord::new(
1618            "remote-session",
1619            "remote command",
1620            "/home/remote",
1621            0,
1622            "remote@client",
1623        );
1624        remote_store.write_invocation(&remote_inv).unwrap();
1625
1626        // Write data to local (DuckDB mode - stored in local.invocations table)
1627        let local_inv = InvocationRecord::new(
1628            "local-session",
1629            "local command",
1630            "/home/local",
1631            0,
1632            "local@client",
1633        );
1634        local_store.write_invocation(&local_inv).unwrap();
1635
1636        // Configure local to attach the remote (read-only)
1637        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1638        let remote_config = RemoteConfig {
1639            name: "parquet-store".to_string(),
1640            remote_type: RemoteType::File,
1641            uri: format!("file://{}", remote_db_path.display()),
1642            mode: RemoteMode::ReadOnly,
1643            auto_attach: true,
1644            credential_provider: None,
1645        };
1646
1647        // Manually attach the remote (this should also set file_search_path)
1648        let conn = local_store.connection_with_options(false).unwrap();
1649        local_store.attach_remote(&conn, &remote_config).unwrap();
1650
1651        // Verify detection works - both modes have local.invocations
1652        let schema = remote_config.quoted_schema_name();
1653        let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1654        assert_eq!(table_prefix, "local.", "BIRD databases have local schema in both modes");
1655
1656        // Query the remote directly (parquet mode views should resolve via file_search_path)
1657        let remote_count: i64 = conn
1658            .query_row(
1659                &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1660                [],
1661                |r| r.get(0),
1662            )
1663            .unwrap();
1664        assert_eq!(remote_count, 1, "Should be able to query Parquet remote from DuckDB local");
1665    }
1666
1667    #[test]
1668    fn test_heterogeneous_unified_views() {
1669        // This tests the full heterogeneous setup with unified views
1670        let (local_tmp, local_store) = setup_store_parquet();
1671
1672        // Create a DuckDB-mode remote
1673        let remote_tmp = TempDir::new().unwrap();
1674        let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1675        initialize(&remote_config).unwrap();
1676        let remote_store = Store::open(remote_config).unwrap();
1677
1678        // Write unique data to remote
1679        let remote_inv = InvocationRecord::new(
1680            "remote-session",
1681            "remote-specific-cmd",
1682            "/home/remote",
1683            42,
1684            "remote@client",
1685        );
1686        remote_store.write_invocation(&remote_inv).unwrap();
1687
1688        // Write unique data to local
1689        let local_inv = InvocationRecord::new(
1690            "local-session",
1691            "local-specific-cmd",
1692            "/home/local",
1693            0,
1694            "local@client",
1695        );
1696        local_store.write_invocation(&local_inv).unwrap();
1697
1698        // Create config with remote
1699        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1700        let mut config = Config::with_root(local_tmp.path());
1701        config.remotes.push(RemoteConfig {
1702            name: "heterogeneous-test".to_string(),
1703            remote_type: RemoteType::File,
1704            uri: format!("file://{}", remote_db_path.display()),
1705            mode: RemoteMode::ReadOnly,
1706            auto_attach: true,
1707            credential_provider: None,
1708        });
1709
1710        // Open store with remote config
1711        let store = Store::open(config).unwrap();
1712
1713        // Connection with auto-attach should set up unified views
1714        let conn = store.connection().unwrap();
1715
1716        // Query local data
1717        let local_count: i64 = conn
1718            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1719            .unwrap();
1720        assert_eq!(local_count, 1, "Local should have 1 record");
1721
1722        // Query unified view - should include both local and remote
1723        let unified_count: i64 = conn
1724            .query_row("SELECT COUNT(*) FROM unified.invocations", [], |r| r.get(0))
1725            .unwrap();
1726        assert_eq!(unified_count, 2, "Unified view should have local + remote records");
1727
1728        // Verify we can see both commands
1729        let cmds: Vec<String> = conn
1730            .prepare("SELECT cmd FROM unified.invocations ORDER BY cmd")
1731            .unwrap()
1732            .query_map([], |r| r.get(0))
1733            .unwrap()
1734            .filter_map(|r| r.ok())
1735            .collect();
1736        assert_eq!(cmds.len(), 2);
1737        assert!(cmds.contains(&"local-specific-cmd".to_string()));
1738        assert!(cmds.contains(&"remote-specific-cmd".to_string()));
1739    }
1740
1741    #[test]
1742    fn test_detect_remote_table_path_standalone_db() {
1743        // Test detection of standalone databases (not BIRD, no local schema)
1744        let (_tmp, store) = setup_store_duckdb();
1745
1746        // Create a standalone database (not a BIRD database)
1747        let standalone_tmp = TempDir::new().unwrap();
1748        let standalone_db_path = standalone_tmp.path().join("standalone.duckdb");
1749        {
1750            let conn = duckdb::Connection::open(&standalone_db_path).unwrap();
1751            conn.execute(
1752                "CREATE TABLE invocations (id UUID, cmd VARCHAR)",
1753                [],
1754            )
1755            .unwrap();
1756            conn.execute(
1757                "INSERT INTO invocations VALUES (gen_random_uuid(), 'test')",
1758                [],
1759            )
1760            .unwrap();
1761        }
1762
1763        // Attach as remote
1764        let remote = RemoteConfig {
1765            name: "standalone".to_string(),
1766            remote_type: RemoteType::File,
1767            uri: format!("file://{}", standalone_db_path.display()),
1768            mode: RemoteMode::ReadOnly,
1769            auto_attach: true,
1770            credential_provider: None,
1771        };
1772
1773        let conn = store.connection_with_options(false).unwrap();
1774        store.attach_remote(&conn, &remote).unwrap();
1775
1776        // Detect table path (should be empty - no local schema)
1777        let schema = remote.quoted_schema_name();
1778        let table_prefix = store.detect_remote_table_path(&conn, &schema);
1779        assert_eq!(table_prefix, "", "Standalone DB should have no prefix");
1780
1781        // Query should work
1782        let count: i64 = conn
1783            .query_row(
1784                &format!("SELECT COUNT(*) FROM {}.invocations", schema),
1785                [],
1786                |r| r.get(0),
1787            )
1788            .unwrap();
1789        assert_eq!(count, 1);
1790    }
1791
1792    #[test]
1793    fn test_push_to_readonly_remote_fails() {
1794        let (_tmp, store) = setup_store_duckdb();
1795
1796        // Write local data
1797        let inv = InvocationRecord::new(
1798            "test-session",
1799            "echo hello",
1800            "/home/user",
1801            0,
1802            "test@client",
1803        );
1804        store.write_invocation(&inv).unwrap();
1805
1806        // Create a read-only remote
1807        let remote_tmp = TempDir::new().unwrap();
1808        let remote_path = remote_tmp.path().join("remote.duckdb");
1809        let remote = RemoteConfig {
1810            name: "readonly".to_string(),
1811            remote_type: RemoteType::File,
1812            uri: format!("file://{}", remote_path.display()),
1813            mode: RemoteMode::ReadOnly,
1814            auto_attach: true,
1815            credential_provider: None,
1816        };
1817
1818        // Push to read-only should fail
1819        let result = store.push(&remote, PushOptions::default());
1820        assert!(result.is_err(), "Push to read-only remote should fail");
1821        assert!(
1822            result.unwrap_err().to_string().contains("Cannot push to read-only"),
1823            "Error should mention read-only"
1824        );
1825    }
1826
1827    #[test]
1828    fn test_push_to_readonly_remote_dry_run_returns_empty() {
1829        let (_tmp, store) = setup_store_duckdb();
1830
1831        // Write local data
1832        let inv = InvocationRecord::new(
1833            "test-session",
1834            "echo hello",
1835            "/home/user",
1836            0,
1837            "test@client",
1838        );
1839        store.write_invocation(&inv).unwrap();
1840
1841        // Create a read-only remote
1842        let remote_tmp = TempDir::new().unwrap();
1843        let remote_path = remote_tmp.path().join("remote.duckdb");
1844        let remote = RemoteConfig {
1845            name: "readonly".to_string(),
1846            remote_type: RemoteType::File,
1847            uri: format!("file://{}", remote_path.display()),
1848            mode: RemoteMode::ReadOnly,
1849            auto_attach: true,
1850            credential_provider: None,
1851        };
1852
1853        // Dry run on read-only should return empty stats (nothing to push)
1854        let stats = store.push(&remote, PushOptions { dry_run: true, ..Default::default() }).unwrap();
1855        assert_eq!(stats.invocations, 0);
1856        assert_eq!(stats.sessions, 0);
1857    }
1858}