Skip to main content

magic_bird/store/
remote.rs

1//! Remote sync operations (push/pull).
2//!
3//! Provides functionality to sync data between local and remote DuckDB databases.
4//!
5//! # Schema Architecture
6//!
7//! - **Push**: Reads from `local` schema, writes to `remote_<name>` schema tables
8//! - **Pull**: Reads from `remote_<name>` schema, writes to `cached_<name>` schema tables
9//!
10//! Remote databases have tables: `sessions`, `invocations`, `outputs`, `events`
11//! (no `_table` suffix - consistent naming across all schemas).
12//!
13//! # Blob Sync
14//!
15//! When `sync_blobs` is enabled, blob files (outputs stored as file:// refs) are
16//! also synced. For file remotes, we prefer hard links (fast, no disk duplication)
17//! and fall back to copying when hard links fail (cross-filesystem).
18
19use std::fs;
20use std::path::{Path, PathBuf};
21
22use chrono::{NaiveDate, TimeDelta, Utc};
23use duckdb::Connection;
24
25use crate::config::RemoteType;
26use crate::{Error, RemoteConfig, Result};
27
28/// Statistics from blob sync operations.
29#[derive(Debug, Default)]
30pub struct BlobStats {
31    /// Number of blobs synced.
32    pub count: usize,
33    /// Total bytes synced.
34    pub bytes: u64,
35    /// Number of blobs hard-linked (no copy needed).
36    pub linked: usize,
37    /// Number of blobs copied (hard link failed).
38    pub copied: usize,
39    /// Number of blobs skipped (already exist).
40    pub skipped: usize,
41}
42
43impl std::fmt::Display for BlobStats {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        if self.count == 0 {
46            write!(f, "0 blobs")
47        } else {
48            let kb = self.bytes / 1024;
49            write!(
50                f,
51                "{} blobs ({}KB, {} linked, {} copied, {} skipped)",
52                self.count, kb, self.linked, self.copied, self.skipped
53            )
54        }
55    }
56}
57
58/// Statistics from a push operation.
59#[derive(Debug, Default)]
60pub struct PushStats {
61    pub sessions: usize,
62    pub invocations: usize,
63    pub outputs: usize,
64    pub events: usize,
65    pub blobs: BlobStats,
66}
67
68impl std::fmt::Display for PushStats {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "{} sessions, {} invocations, {} outputs, {} events",
73            self.sessions, self.invocations, self.outputs, self.events
74        )?;
75        if self.blobs.count > 0 {
76            write!(f, ", {}", self.blobs)?;
77        }
78        Ok(())
79    }
80}
81
82/// Statistics from a pull operation.
83#[derive(Debug, Default)]
84pub struct PullStats {
85    pub sessions: usize,
86    pub invocations: usize,
87    pub outputs: usize,
88    pub events: usize,
89    pub blobs: BlobStats,
90}
91
92impl std::fmt::Display for PullStats {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "{} sessions, {} invocations, {} outputs, {} events",
97            self.sessions, self.invocations, self.outputs, self.events
98        )?;
99        if self.blobs.count > 0 {
100            write!(f, ", {}", self.blobs)?;
101        }
102        Ok(())
103    }
104}
105
106/// Options for push operation.
107#[derive(Debug, Default)]
108pub struct PushOptions {
109    /// Only push data since this date.
110    pub since: Option<NaiveDate>,
111    /// Show what would be pushed without actually pushing.
112    pub dry_run: bool,
113    /// Sync blob files (not just metadata).
114    pub sync_blobs: bool,
115}
116
117/// Options for pull operation.
118#[derive(Debug, Default)]
119pub struct PullOptions {
120    /// Only pull data since this date.
121    pub since: Option<NaiveDate>,
122    /// Only pull data from this client.
123    pub client_id: Option<String>,
124    /// Sync blob files (not just metadata).
125    pub sync_blobs: bool,
126}
127
128/// Parse a "since" string into a date.
129///
130/// Supports:
131/// - Duration: "7d", "2w", "1m" (days, weeks, months)
132/// - Date: "2024-01-15"
133pub fn parse_since(s: &str) -> Result<NaiveDate> {
134    let s = s.trim();
135
136    // Try duration first (7d, 2w, 1m)
137    if let Some(days) = parse_duration_days(s) {
138        let date = Utc::now().date_naive() - TimeDelta::days(days);
139        return Ok(date);
140    }
141
142    // Try date format (YYYY-MM-DD)
143    NaiveDate::parse_from_str(s, "%Y-%m-%d")
144        .map_err(|e| Error::Config(format!("Invalid date '{}': {}", s, e)))
145}
146
147/// Parse a duration string into days.
148fn parse_duration_days(s: &str) -> Option<i64> {
149    let s = s.trim().to_lowercase();
150
151    if let Some(num) = s.strip_suffix('d') {
152        num.parse::<i64>().ok()
153    } else if let Some(num) = s.strip_suffix('w') {
154        num.parse::<i64>().ok().map(|n| n * 7)
155    } else if let Some(num) = s.strip_suffix('m') {
156        num.parse::<i64>().ok().map(|n| n * 30)
157    } else {
158        None
159    }
160}
161
162/// Get the cached schema name for a remote (e.g., "cached_team" for remote "team").
163#[allow(dead_code)]
164pub fn cached_schema_name(remote_name: &str) -> String {
165    format!("cached_{}", remote_name)
166}
167
168/// Get the quoted cached schema name for SQL.
169pub fn quoted_cached_schema_name(remote_name: &str) -> String {
170    format!("\"cached_{}\"", remote_name)
171}
172
173/// Get the data directory for a file remote.
174///
175/// For a remote URI like `file:///path/to/remote.duckdb`, returns `/path/to`.
176/// This is where blob paths (like `recent/blobs/content/...`) are relative to.
177fn file_remote_data_dir(remote: &RemoteConfig) -> Option<PathBuf> {
178    if remote.remote_type != RemoteType::File {
179        return None;
180    }
181
182    // Parse file:// URI to get the database path
183    let db_path = remote.uri.strip_prefix("file://")?;
184    let db_path = Path::new(db_path);
185
186    // Data directory is the parent of the .duckdb file
187    // e.g., /path/to/remote.duckdb -> /path/to
188    db_path.parent().map(PathBuf::from)
189}
190
191/// Information about a blob to sync.
192#[derive(Debug)]
193struct BlobInfo {
194    content_hash: String,
195    storage_path: String,
196    byte_length: i64,
197}
198
199/// Sync a single blob file using hard link or copy.
200///
201/// Returns `Ok(true)` if the blob was synced (linked or copied),
202/// `Ok(false)` if it already exists at destination.
203fn sync_blob_file(src: &Path, dst: &Path, stats: &mut BlobStats) -> Result<bool> {
204    // Check if destination already exists
205    if dst.exists() {
206        stats.skipped += 1;
207        return Ok(false);
208    }
209
210    // Ensure parent directory exists
211    if let Some(parent) = dst.parent() {
212        fs::create_dir_all(parent)?;
213    }
214
215    // Try hard link first (fast, no disk duplication)
216    match fs::hard_link(src, dst) {
217        Ok(()) => {
218            stats.linked += 1;
219            stats.count += 1;
220            if let Ok(meta) = fs::metadata(dst) {
221                stats.bytes += meta.len();
222            }
223            Ok(true)
224        }
225        Err(_) => {
226            // Fall back to copy (cross-filesystem)
227            fs::copy(src, dst)?;
228            stats.copied += 1;
229            stats.count += 1;
230            if let Ok(meta) = fs::metadata(dst) {
231                stats.bytes += meta.len();
232            }
233            Ok(true)
234        }
235    }
236}
237
238impl super::Store {
239    /// Push local data to a remote.
240    ///
241    /// Reads from `local` schema, writes to remote's tables.
242    /// Only pushes records that don't already exist on the remote (by id).
243    /// When `sync_blobs` is enabled, also syncs blob files for file remotes.
244    pub fn push(&self, remote: &RemoteConfig, opts: PushOptions) -> Result<PushStats> {
245        use crate::config::RemoteMode;
246
247        // Read-only remotes can't be pushed to - return empty stats for dry_run
248        if remote.mode == RemoteMode::ReadOnly {
249            if opts.dry_run {
250                // Nothing to push to a read-only remote
251                return Ok(PushStats::default());
252            } else {
253                return Err(Error::Config(format!(
254                    "Cannot push to read-only remote '{}'",
255                    remote.name
256                )));
257            }
258        }
259
260        // Use connection without auto-attach to avoid conflicts and unnecessary views
261        let conn = self.connection_with_options(false)?;
262
263        // Attach only the target remote
264        self.attach_remote(&conn, remote)?;
265
266        let remote_schema = remote.quoted_schema_name();
267
268        // Ensure remote has the required tables (including blob_registry)
269        ensure_remote_schema(&conn, &remote_schema)?;
270
271        let mut stats = PushStats::default();
272
273        if opts.dry_run {
274            // Count what would be pushed
275            stats.sessions = count_sessions_to_push(&conn, &remote_schema, opts.since)?;
276            // V5: count only attempts (represents invocation count)
277            stats.invocations = count_table_to_push(&conn, "attempts", &remote_schema, opts.since)?;
278            stats.outputs = count_table_to_push(&conn, "outputs", &remote_schema, opts.since)?;
279            stats.events = count_table_to_push(&conn, "events", &remote_schema, opts.since)?;
280            if opts.sync_blobs {
281                stats.blobs = count_blobs_to_push(&conn, &remote_schema, opts.since)?;
282            }
283        } else {
284            // Sync blobs first (before pushing output metadata)
285            if opts.sync_blobs {
286                stats.blobs = self.push_blobs(&conn, remote, &remote_schema, opts.since)?;
287            }
288
289            // Actually push in dependency order
290            stats.sessions = push_sessions(&conn, &remote_schema, opts.since)?;
291            // V5: push attempts first, then outcomes (report attempts count as "invocations")
292            stats.invocations = push_table(&conn, "attempts", &remote_schema, opts.since)?;
293            let _ = push_table(&conn, "outcomes", &remote_schema, opts.since)?;
294            stats.outputs = push_outputs(&conn, &remote_schema, opts.since, opts.sync_blobs)?;
295            stats.events = push_table(&conn, "events", &remote_schema, opts.since)?;
296        }
297
298        Ok(stats)
299    }
300
301    /// Push blob files to a file remote.
302    ///
303    /// Syncs blob files using hard links (preferred) or copies (fallback).
304    /// Also syncs blob_registry entries.
305    fn push_blobs(
306        &self,
307        conn: &Connection,
308        remote: &RemoteConfig,
309        remote_schema: &str,
310        since: Option<NaiveDate>,
311    ) -> Result<BlobStats> {
312        let mut stats = BlobStats::default();
313
314        // Only file remotes support blob sync for now
315        let remote_data_dir = match file_remote_data_dir(remote) {
316            Some(dir) => dir,
317            None => return Ok(stats), // Not a file remote, skip blob sync
318        };
319
320        // Find blobs that need to be synced
321        let blobs = get_blobs_to_push(conn, remote_schema, since)?;
322        if blobs.is_empty() {
323            return Ok(stats);
324        }
325
326        let local_data_dir = self.config.data_dir();
327
328        for blob in &blobs {
329            // Build source and destination paths
330            // storage_path is relative to data_dir (e.g., "recent/blobs/content/ab/hash.bin")
331            let src = local_data_dir.join(&blob.storage_path);
332            let dst = remote_data_dir.join(&blob.storage_path);
333
334            if !src.exists() {
335                // Source blob missing, skip
336                continue;
337            }
338
339            // Sync the blob file
340            sync_blob_file(&src, &dst, &mut stats)?;
341
342            // Sync blob_registry entry
343            let escaped_hash = blob.content_hash.replace('\'', "''");
344            let escaped_path = blob.storage_path.replace('\'', "''");
345            conn.execute(
346                &format!(
347                    r#"
348                    INSERT INTO {schema}.blob_registry (content_hash, byte_length, storage_path)
349                    SELECT '{hash}', {len}, '{path}'
350                    WHERE NOT EXISTS (
351                        SELECT 1 FROM {schema}.blob_registry WHERE content_hash = '{hash}'
352                    )
353                    "#,
354                    schema = remote_schema,
355                    hash = escaped_hash,
356                    len = blob.byte_length,
357                    path = escaped_path,
358                ),
359                [],
360            )?;
361        }
362
363        Ok(stats)
364    }
365
366    /// Pull data from a remote into local cached_<name> schema.
367    ///
368    /// Reads from remote's tables, writes to `cached_<name>` schema.
369    /// Only pulls records that don't already exist in the cached schema (by id).
370    /// After pulling, rebuilds the `caches` union views.
371    /// When `sync_blobs` is enabled, also syncs blob files for file remotes.
372    pub fn pull(&self, remote: &RemoteConfig, opts: PullOptions) -> Result<PullStats> {
373        // Use connection without auto-attach to avoid conflicts
374        let conn = self.connection_with_options(false)?;
375
376        // Attach only the target remote
377        self.attach_remote(&conn, remote)?;
378
379        let remote_schema = remote.quoted_schema_name();
380        let cached_schema = quoted_cached_schema_name(&remote.name);
381
382        // Ensure cached schema exists with required tables
383        ensure_cached_schema(&conn, &cached_schema, &remote.name)?;
384
385        // Pull in dependency order (sessions first, then attempts, outcomes, outputs, events)
386        // V5: pull attempts first, then outcomes (report attempts count as "invocations")
387        let attempts_pulled = pull_table(&conn, "attempts", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?;
388        let _ = pull_table(&conn, "outcomes", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?;
389        let mut stats = PullStats {
390            sessions: pull_sessions(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
391            invocations: attempts_pulled,
392            outputs: pull_outputs(&conn, &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref(), opts.sync_blobs)?,
393            events: pull_table(&conn, "events", &remote_schema, &cached_schema, opts.since, opts.client_id.as_deref())?,
394            blobs: BlobStats::default(),
395        };
396
397        // Sync blob files after pulling output metadata
398        if opts.sync_blobs {
399            stats.blobs = self.pull_blobs(&conn, remote, &remote_schema, &cached_schema)?;
400        }
401
402        // Rebuild caches union views to include this cached schema
403        self.rebuild_caches_schema(&conn)?;
404
405        Ok(stats)
406    }
407
408    /// Pull blob files from a file remote.
409    ///
410    /// Syncs blob files using hard links (preferred) or copies (fallback).
411    /// Also registers blobs in the local blob_registry.
412    fn pull_blobs(
413        &self,
414        conn: &Connection,
415        remote: &RemoteConfig,
416        remote_schema: &str,
417        cached_schema: &str,
418    ) -> Result<BlobStats> {
419        let mut stats = BlobStats::default();
420
421        // Only file remotes support blob sync for now
422        let remote_data_dir = match file_remote_data_dir(remote) {
423            Some(dir) => dir,
424            None => return Ok(stats), // Not a file remote, skip blob sync
425        };
426
427        // Find blobs that were pulled (in cached outputs but not in local blob_registry)
428        let blobs = get_blobs_to_pull(conn, remote_schema, cached_schema)?;
429        if blobs.is_empty() {
430            return Ok(stats);
431        }
432
433        let local_data_dir = self.config.data_dir();
434
435        for blob in &blobs {
436            // Build source and destination paths
437            // storage_path is relative to data_dir (e.g., "recent/blobs/content/ab/hash.bin")
438            let src = remote_data_dir.join(&blob.storage_path);
439            let dst = local_data_dir.join(&blob.storage_path);
440
441            if !src.exists() {
442                // Source blob missing on remote, skip
443                continue;
444            }
445
446            // Sync the blob file
447            let synced = sync_blob_file(&src, &dst, &mut stats)?;
448
449            // Register in local blob_registry if we synced a new blob
450            if synced {
451                let escaped_hash = blob.content_hash.replace('\'', "''");
452                let escaped_path = blob.storage_path.replace('\'', "''");
453                conn.execute(
454                    &format!(
455                        r#"
456                        INSERT INTO blob_registry (content_hash, byte_length, storage_path)
457                        SELECT '{hash}', {len}, '{path}'
458                        WHERE NOT EXISTS (
459                            SELECT 1 FROM blob_registry WHERE content_hash = '{hash}'
460                        )
461                        "#,
462                        hash = escaped_hash,
463                        len = blob.byte_length,
464                        path = escaped_path,
465                    ),
466                    [],
467                )?;
468            }
469        }
470
471        Ok(stats)
472    }
473
474    /// Rebuild the `caches` schema views to union all `cached_*` schemas.
475    ///
476    /// Uses explicit transaction for DDL safety. The caches.* views reference
477    /// local cached_* schemas (not attached databases), so they should be safe
478    /// to persist.
479    /// V5 schema: unions attempts/outcomes tables and creates invocations view.
480    pub fn rebuild_caches_schema(&self, conn: &Connection) -> Result<()> {
481        // Find all cached_* schemas
482        let schemas: Vec<String> = conn
483            .prepare("SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'cached_%'")?
484            .query_map([], |row| row.get(0))?
485            .filter_map(|r| r.ok())
486            .collect();
487
488        // Use transaction for DDL safety
489        conn.execute("BEGIN TRANSACTION", [])?;
490
491        let result = (|| -> std::result::Result<(), duckdb::Error> {
492            // V5: union attempts and outcomes tables
493            for table in &["sessions", "attempts", "outcomes", "outputs", "events"] {
494                let mut union_parts: Vec<String> = schemas
495                    .iter()
496                    .map(|s| format!("SELECT * FROM \"{}\".{}", s, table))
497                    .collect();
498
499                // Always include placeholder (ensures view is valid even with no cached schemas)
500                if !schemas.iter().any(|s| s == "cached_placeholder") {
501                    union_parts.push(format!("SELECT * FROM cached_placeholder.{}", table));
502                }
503
504                let sql = format!(
505                    "CREATE OR REPLACE VIEW caches.{} AS {}",
506                    table,
507                    union_parts.join(" UNION ALL BY NAME ")
508                );
509                conn.execute(&sql, [])?;
510            }
511
512            // V5: Create invocations view that joins attempts and outcomes
513            // Note: We don't include metadata in this view because merging MAPs from
514            // different sources is problematic. Use attempts.metadata or outcomes.metadata directly.
515            conn.execute(
516                r#"
517                CREATE OR REPLACE VIEW caches.invocations AS
518                SELECT
519                    a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
520                    a.tag, a.source_client, a.machine_id, a.hostname,
521                    a.executable, a.format_hint,
522                    o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
523                    a.date,
524                    CASE
525                        WHEN o.attempt_id IS NULL THEN 'pending'
526                        WHEN o.exit_code IS NULL THEN 'orphaned'
527                        ELSE 'completed'
528                    END AS status,
529                    a._source
530                FROM caches.attempts a
531                LEFT JOIN caches.outcomes o ON a.id = o.attempt_id
532                "#,
533                [],
534            )?;
535
536            Ok(())
537        })();
538
539        match result {
540            Ok(()) => {
541                conn.execute("COMMIT", [])?;
542                Ok(())
543            }
544            Err(e) => {
545                let _ = conn.execute("ROLLBACK", []);
546                Err(crate::Error::DuckDb(e))
547            }
548        }
549    }
550}
551
552/// Ensure the remote schema has the required tables.
553/// Tables use consistent naming (no `_table` suffix).
554/// V5 schema: uses attempts/outcomes tables instead of invocations.
555fn ensure_remote_schema(conn: &Connection, schema: &str) -> Result<()> {
556    let sql = format!(
557        r#"
558        CREATE TABLE IF NOT EXISTS {schema}.sessions (
559            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
560            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE
561        );
562        -- V5: attempts table (invocation start)
563        CREATE TABLE IF NOT EXISTS {schema}.attempts (
564            id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
565            tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
566            executable VARCHAR, format_hint VARCHAR, metadata MAP(VARCHAR, JSON), date DATE
567        );
568        -- V5: outcomes table (invocation completion)
569        CREATE TABLE IF NOT EXISTS {schema}.outcomes (
570            attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
571            signal INTEGER, timeout BOOLEAN, metadata MAP(VARCHAR, JSON), date DATE
572        );
573        -- V5: invocations VIEW for compatibility
574        -- Note: metadata not included due to MAP_CONCAT complexity; use attempts/outcomes directly
575        CREATE OR REPLACE VIEW {schema}.invocations AS
576        SELECT
577            a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
578            a.tag, a.source_client, a.machine_id, a.hostname,
579            a.executable, a.format_hint,
580            o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
581            a.date,
582            CASE
583                WHEN o.attempt_id IS NULL THEN 'pending'
584                WHEN o.exit_code IS NULL THEN 'orphaned'
585                ELSE 'completed'
586            END AS status
587        FROM {schema}.attempts a
588        LEFT JOIN {schema}.outcomes o ON a.id = o.attempt_id;
589        CREATE TABLE IF NOT EXISTS {schema}.outputs (
590            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
591            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
592            content_type VARCHAR, date DATE
593        );
594        CREATE TABLE IF NOT EXISTS {schema}.events (
595            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
596            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
597            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
598            status VARCHAR, format_used VARCHAR, date DATE
599        );
600        CREATE TABLE IF NOT EXISTS {schema}.blob_registry (
601            content_hash VARCHAR PRIMARY KEY,
602            byte_length BIGINT NOT NULL,
603            ref_count INTEGER DEFAULT 1,
604            first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
605            last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
606            storage_path VARCHAR NOT NULL
607        );
608        "#,
609        schema = schema
610    );
611    conn.execute_batch(&sql)?;
612    Ok(())
613}
614
615/// Ensure the cached schema exists with required tables.
616/// Tables include a `_source` column to track which remote the data came from.
617/// V5 schema: uses attempts/outcomes tables instead of invocations.
618fn ensure_cached_schema(conn: &Connection, schema: &str, remote_name: &str) -> Result<()> {
619    // Create the schema if it doesn't exist
620    conn.execute(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema), [])?;
621
622    // Create tables with _source column
623    let sql = format!(
624        r#"
625        CREATE TABLE IF NOT EXISTS {schema}.sessions (
626            session_id VARCHAR, client_id VARCHAR, invoker VARCHAR, invoker_pid INTEGER,
627            invoker_type VARCHAR, registered_at TIMESTAMP, cwd VARCHAR, date DATE,
628            _source VARCHAR DEFAULT '{remote_name}'
629        );
630        -- V5: attempts table (invocation start)
631        CREATE TABLE IF NOT EXISTS {schema}.attempts (
632            id UUID, timestamp TIMESTAMP, cmd VARCHAR, cwd VARCHAR, session_id VARCHAR,
633            tag VARCHAR, source_client VARCHAR, machine_id VARCHAR, hostname VARCHAR,
634            executable VARCHAR, format_hint VARCHAR, metadata MAP(VARCHAR, JSON), date DATE,
635            _source VARCHAR DEFAULT '{remote_name}'
636        );
637        -- V5: outcomes table (invocation completion)
638        CREATE TABLE IF NOT EXISTS {schema}.outcomes (
639            attempt_id UUID, completed_at TIMESTAMP, exit_code INTEGER, duration_ms BIGINT,
640            signal INTEGER, timeout BOOLEAN, metadata MAP(VARCHAR, JSON), date DATE,
641            _source VARCHAR DEFAULT '{remote_name}'
642        );
643        -- V5: invocations VIEW for compatibility
644        -- Note: metadata not included due to MAP_CONCAT complexity; use attempts/outcomes directly
645        CREATE OR REPLACE VIEW {schema}.invocations AS
646        SELECT
647            a.id, a.timestamp, a.cmd, a.cwd, a.session_id,
648            a.tag, a.source_client, a.machine_id, a.hostname,
649            a.executable, a.format_hint,
650            o.completed_at, o.exit_code, o.duration_ms, o.signal, o.timeout,
651            a.date,
652            CASE
653                WHEN o.attempt_id IS NULL THEN 'pending'
654                WHEN o.exit_code IS NULL THEN 'orphaned'
655                ELSE 'completed'
656            END AS status,
657            a._source
658        FROM {schema}.attempts a
659        LEFT JOIN {schema}.outcomes o ON a.id = o.attempt_id;
660        CREATE TABLE IF NOT EXISTS {schema}.outputs (
661            id UUID, invocation_id UUID, stream VARCHAR, content_hash VARCHAR,
662            byte_length BIGINT, storage_type VARCHAR, storage_ref VARCHAR,
663            content_type VARCHAR, date DATE,
664            _source VARCHAR DEFAULT '{remote_name}'
665        );
666        CREATE TABLE IF NOT EXISTS {schema}.events (
667            id UUID, invocation_id UUID, client_id VARCHAR, hostname VARCHAR,
668            event_type VARCHAR, severity VARCHAR, ref_file VARCHAR, ref_line INTEGER,
669            ref_column INTEGER, message VARCHAR, error_code VARCHAR, test_name VARCHAR,
670            status VARCHAR, format_used VARCHAR, date DATE,
671            _source VARCHAR DEFAULT '{remote_name}'
672        );
673        "#,
674        schema = schema,
675        remote_name = remote_name.replace('\'', "''")
676    );
677    conn.execute_batch(&sql)?;
678    Ok(())
679}
680
681/// Build the WHERE clause for time filtering.
682fn since_clause(since: Option<NaiveDate>, timestamp_col: &str) -> String {
683    since
684        .map(|d| format!("AND {} >= '{}'", timestamp_col, d))
685        .unwrap_or_default()
686}
687
688/// Build the WHERE clause for client filtering.
689fn client_clause(client_id: Option<&str>) -> String {
690    client_id
691        .map(|c| format!("AND client_id = '{}'", c.replace('\'', "''")))
692        .unwrap_or_default()
693}
694
695/// Count sessions that would be pushed.
696/// Reads from `local` schema.
697/// V5 schema: joins on attempts instead of invocations.
698fn count_sessions_to_push(
699    conn: &Connection,
700    remote_schema: &str,
701    since: Option<NaiveDate>,
702) -> Result<usize> {
703    let since_filter = since_clause(since, "a.timestamp");
704
705    let sql = format!(
706        r#"
707        SELECT COUNT(DISTINCT s.session_id)
708        FROM local.sessions s
709        JOIN local.attempts a ON a.session_id = s.session_id
710        WHERE NOT EXISTS (
711            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
712        )
713        {since}
714        "#,
715        remote = remote_schema,
716        since = since_filter,
717    );
718
719    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
720    Ok(count as usize)
721}
722
723/// Count records that would be pushed for a table.
724/// Reads from `local` schema.
725/// V5 schema: uses attempts/outcomes tables instead of invocations.
726fn count_table_to_push(
727    conn: &Connection,
728    table: &str,
729    remote_schema: &str,
730    since: Option<NaiveDate>,
731) -> Result<usize> {
732    let sql = match table {
733        // V5: count attempts
734        "attempts" => {
735            let since_filter = since_clause(since, "l.timestamp");
736            format!(
737                r#"
738                SELECT COUNT(*)
739                FROM local.attempts l
740                WHERE NOT EXISTS (
741                    SELECT 1 FROM {remote}.attempts r WHERE r.id = l.id
742                )
743                {since}
744                "#,
745                remote = remote_schema,
746                since = since_filter,
747            )
748        }
749        // V5: count outcomes
750        "outcomes" => {
751            let since_filter = since_clause(since, "a.timestamp");
752            format!(
753                r#"
754                SELECT COUNT(*)
755                FROM local.outcomes l
756                JOIN local.attempts a ON a.id = l.attempt_id
757                WHERE NOT EXISTS (
758                    SELECT 1 FROM {remote}.outcomes r WHERE r.attempt_id = l.attempt_id
759                )
760                {since}
761                "#,
762                remote = remote_schema,
763                since = since_filter,
764            )
765        }
766        "outputs" | "events" => {
767            // V5: join on attempts instead of invocations
768            let since_filter = since_clause(since, "a.timestamp");
769            format!(
770                r#"
771                SELECT COUNT(*)
772                FROM local.{table} l
773                JOIN local.attempts a ON a.id = l.invocation_id
774                WHERE NOT EXISTS (
775                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
776                )
777                {since}
778                "#,
779                table = table,
780                remote = remote_schema,
781                since = since_filter,
782            )
783        }
784        _ => {
785            format!(
786                r#"
787                SELECT COUNT(*)
788                FROM local.{table} l
789                WHERE NOT EXISTS (
790                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
791                )
792                "#,
793                table = table,
794                remote = remote_schema,
795            )
796        }
797    };
798
799    let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
800    Ok(count as usize)
801}
802
803/// Push sessions from `local` to remote.
804/// V5 schema: joins on attempts instead of invocations.
805fn push_sessions(
806    conn: &Connection,
807    remote_schema: &str,
808    since: Option<NaiveDate>,
809) -> Result<usize> {
810    let since_filter = since_clause(since, "a.timestamp");
811
812    let sql = format!(
813        r#"
814        INSERT INTO {remote}.sessions
815        SELECT DISTINCT s.*
816        FROM local.sessions s
817        JOIN local.attempts a ON a.session_id = s.session_id
818        WHERE NOT EXISTS (
819            SELECT 1 FROM {remote}.sessions r WHERE r.session_id = s.session_id
820        )
821        {since}
822        "#,
823        remote = remote_schema,
824        since = since_filter,
825    );
826
827    let count = conn.execute(&sql, [])?;
828    Ok(count)
829}
830
831/// Push records from `local` to remote.
832/// V5 schema: uses attempts/outcomes tables instead of invocations.
833fn push_table(
834    conn: &Connection,
835    table: &str,
836    remote_schema: &str,
837    since: Option<NaiveDate>,
838) -> Result<usize> {
839    let sql = match table {
840        // V5: Push attempts table
841        "attempts" => {
842            let since_filter = since_clause(since, "l.timestamp");
843            format!(
844                r#"
845                INSERT INTO {remote}.attempts
846                SELECT *
847                FROM local.attempts l
848                WHERE NOT EXISTS (
849                    SELECT 1 FROM {remote}.attempts r WHERE r.id = l.id
850                )
851                {since}
852                "#,
853                remote = remote_schema,
854                since = since_filter,
855            )
856        }
857        // V5: Push outcomes table
858        "outcomes" => {
859            let since_filter = since_clause(since, "a.timestamp");
860            format!(
861                r#"
862                INSERT INTO {remote}.outcomes
863                SELECT l.*
864                FROM local.outcomes l
865                JOIN local.attempts a ON a.id = l.attempt_id
866                WHERE NOT EXISTS (
867                    SELECT 1 FROM {remote}.outcomes r WHERE r.attempt_id = l.attempt_id
868                )
869                {since}
870                "#,
871                remote = remote_schema,
872                since = since_filter,
873            )
874        }
875        "outputs" | "events" => {
876            // V5: Join on attempts instead of invocations
877            let since_filter = since_clause(since, "a.timestamp");
878            format!(
879                r#"
880                INSERT INTO {remote}.{table}
881                SELECT l.*
882                FROM local.{table} l
883                JOIN local.attempts a ON a.id = l.invocation_id
884                WHERE NOT EXISTS (
885                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
886                )
887                {since}
888                "#,
889                table = table,
890                remote = remote_schema,
891                since = since_filter,
892            )
893        }
894        _ => {
895            format!(
896                r#"
897                INSERT INTO {remote}.{table}
898                SELECT *
899                FROM local.{table} l
900                WHERE NOT EXISTS (
901                    SELECT 1 FROM {remote}.{table} r WHERE r.id = l.id
902                )
903                "#,
904                table = table,
905                remote = remote_schema,
906            )
907        }
908    };
909
910    let count = conn.execute(&sql, [])?;
911    Ok(count)
912}
913
914/// Pull sessions from remote into cached schema.
915fn pull_sessions(
916    conn: &Connection,
917    remote_schema: &str,
918    cached_schema: &str,
919    since: Option<NaiveDate>,
920    client_id: Option<&str>,
921) -> Result<usize> {
922    let since_filter = since_clause(since, "r.registered_at");
923    let client_filter = client_clause(client_id);
924
925    let sql = format!(
926        r#"
927        INSERT INTO {cached}.sessions (session_id, client_id, invoker, invoker_pid, invoker_type, registered_at, cwd, date)
928        SELECT r.*
929        FROM {remote}.sessions r
930        WHERE NOT EXISTS (
931            SELECT 1 FROM {cached}.sessions l WHERE l.session_id = r.session_id
932        )
933        {since}
934        {client}
935        "#,
936        cached = cached_schema,
937        remote = remote_schema,
938        since = since_filter,
939        client = client_filter,
940    );
941
942    let count = conn.execute(&sql, [])?;
943    Ok(count)
944}
945
946/// Pull records from remote into cached schema.
947/// V5 schema: uses attempts/outcomes tables instead of invocations.
948fn pull_table(
949    conn: &Connection,
950    table: &str,
951    remote_schema: &str,
952    cached_schema: &str,
953    since: Option<NaiveDate>,
954    client_id: Option<&str>,
955) -> Result<usize> {
956    let client_filter = client_clause(client_id);
957
958    let sql = match table {
959        // V5: pull attempts
960        "attempts" => {
961            let since_filter = since_clause(since, "r.timestamp");
962            format!(
963                r#"
964                INSERT INTO {cached}.attempts (id, timestamp, cmd, cwd, session_id, tag, source_client, machine_id, hostname, executable, format_hint, metadata, date)
965                SELECT r.*
966                FROM {remote}.attempts r
967                WHERE NOT EXISTS (
968                    SELECT 1 FROM {cached}.attempts l WHERE l.id = r.id
969                )
970                {since}
971                {client}
972                "#,
973                cached = cached_schema,
974                remote = remote_schema,
975                since = since_filter,
976                client = if client_id.is_some() {
977                    format!("AND r.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
978                } else {
979                    String::new()
980                },
981            )
982        }
983        // V5: pull outcomes
984        "outcomes" => {
985            let since_filter = since_clause(since, "a.timestamp");
986            format!(
987                r#"
988                INSERT INTO {cached}.outcomes (attempt_id, completed_at, exit_code, duration_ms, signal, timeout, metadata, date)
989                SELECT r.*
990                FROM {remote}.outcomes r
991                JOIN {remote}.attempts a ON a.id = r.attempt_id
992                WHERE NOT EXISTS (
993                    SELECT 1 FROM {cached}.outcomes l WHERE l.attempt_id = r.attempt_id
994                )
995                {since}
996                {client}
997                "#,
998                cached = cached_schema,
999                remote = remote_schema,
1000                since = since_filter,
1001                client = if client_id.is_some() {
1002                    format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1003                } else {
1004                    String::new()
1005                },
1006            )
1007        }
1008        // V5: join on attempts instead of invocations
1009        "outputs" => {
1010            let since_filter = since_clause(since, "a.timestamp");
1011            format!(
1012                r#"
1013                INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1014                SELECT r.*
1015                FROM {remote}.outputs r
1016                JOIN {remote}.attempts a ON a.id = r.invocation_id
1017                WHERE NOT EXISTS (
1018                    SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1019                )
1020                {since}
1021                {client}
1022                "#,
1023                cached = cached_schema,
1024                remote = remote_schema,
1025                since = since_filter,
1026                client = if client_id.is_some() {
1027                    format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1028                } else {
1029                    String::new()
1030                },
1031            )
1032        }
1033        "events" => {
1034            let since_filter = since_clause(since, "a.timestamp");
1035            format!(
1036                r#"
1037                INSERT INTO {cached}.events (id, invocation_id, client_id, hostname, event_type, severity, ref_file, ref_line, ref_column, message, error_code, test_name, status, format_used, date)
1038                SELECT r.*
1039                FROM {remote}.events r
1040                JOIN {remote}.attempts a ON a.id = r.invocation_id
1041                WHERE NOT EXISTS (
1042                    SELECT 1 FROM {cached}.events l WHERE l.id = r.id
1043                )
1044                {since}
1045                {client}
1046                "#,
1047                cached = cached_schema,
1048                remote = remote_schema,
1049                since = since_filter,
1050                client = if client_id.is_some() {
1051                    format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1052                } else {
1053                    String::new()
1054                },
1055            )
1056        }
1057        _ => {
1058            format!(
1059                r#"
1060                INSERT INTO {cached}.{table}
1061                SELECT r.*
1062                FROM {remote}.{table} r
1063                WHERE NOT EXISTS (
1064                    SELECT 1 FROM {cached}.{table} l WHERE l.id = r.id
1065                )
1066                {client}
1067                "#,
1068                table = table,
1069                cached = cached_schema,
1070                remote = remote_schema,
1071                client = client_filter,
1072            )
1073        }
1074    };
1075
1076    let count = conn.execute(&sql, [])?;
1077    Ok(count)
1078}
1079
1080/// Count blobs that would be pushed (for dry run).
1081/// V5 schema: joins on attempts instead of invocations.
1082fn count_blobs_to_push(
1083    conn: &Connection,
1084    remote_schema: &str,
1085    since: Option<NaiveDate>,
1086) -> Result<BlobStats> {
1087    let since_filter = since_clause(since, "a.timestamp");
1088
1089    let sql = format!(
1090        r#"
1091        SELECT COUNT(DISTINCT o.content_hash), COALESCE(SUM(o.byte_length), 0)
1092        FROM local.outputs o
1093        JOIN local.attempts a ON a.id = o.invocation_id
1094        WHERE o.storage_type = 'blob'
1095          AND NOT EXISTS (
1096              SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
1097          )
1098        {since}
1099        "#,
1100        remote = remote_schema,
1101        since = since_filter,
1102    );
1103
1104    let (count, bytes): (i64, i64) = conn.query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1105    Ok(BlobStats {
1106        count: count as usize,
1107        bytes: bytes as u64,
1108        ..Default::default()
1109    })
1110}
1111
1112/// Get blobs that need to be pushed to remote.
1113/// V5 schema: joins on attempts instead of invocations.
1114fn get_blobs_to_push(
1115    conn: &Connection,
1116    remote_schema: &str,
1117    since: Option<NaiveDate>,
1118) -> Result<Vec<BlobInfo>> {
1119    let since_filter = since_clause(since, "a.timestamp");
1120
1121    let sql = format!(
1122        r#"
1123        SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1124        FROM local.outputs o
1125        JOIN local.attempts a ON a.id = o.invocation_id
1126        JOIN blob_registry b ON b.content_hash = o.content_hash
1127        WHERE o.storage_type = 'blob'
1128          AND NOT EXISTS (
1129              SELECT 1 FROM {remote}.blob_registry r WHERE r.content_hash = o.content_hash
1130          )
1131        {since}
1132        "#,
1133        remote = remote_schema,
1134        since = since_filter,
1135    );
1136
1137    let mut stmt = conn.prepare(&sql)?;
1138    let blobs = stmt
1139        .query_map([], |row| {
1140            Ok(BlobInfo {
1141                content_hash: row.get(0)?,
1142                storage_path: row.get(1)?,
1143                byte_length: row.get(2)?,
1144            })
1145        })?
1146        .filter_map(|r| r.ok())
1147        .collect();
1148
1149    Ok(blobs)
1150}
1151
1152/// Get blobs that were pulled (in remote outputs but not in local blob_registry).
1153fn get_blobs_to_pull(
1154    conn: &Connection,
1155    remote_schema: &str,
1156    cached_schema: &str,
1157) -> Result<Vec<BlobInfo>> {
1158    let sql = format!(
1159        r#"
1160        SELECT DISTINCT o.content_hash, b.storage_path, o.byte_length
1161        FROM {cached}.outputs o
1162        JOIN {remote}.blob_registry b ON b.content_hash = o.content_hash
1163        WHERE o.storage_type = 'blob'
1164          AND NOT EXISTS (
1165              SELECT 1 FROM blob_registry r WHERE r.content_hash = o.content_hash
1166          )
1167        "#,
1168        cached = cached_schema,
1169        remote = remote_schema,
1170    );
1171
1172    let mut stmt = conn.prepare(&sql)?;
1173    let blobs = stmt
1174        .query_map([], |row| {
1175            Ok(BlobInfo {
1176                content_hash: row.get(0)?,
1177                storage_path: row.get(1)?,
1178                byte_length: row.get(2)?,
1179            })
1180        })?
1181        .filter_map(|r| r.ok())
1182        .collect();
1183
1184    Ok(blobs)
1185}
1186
1187/// Push outputs to remote, optionally converting storage_ref paths.
1188/// V5 schema: joins on attempts instead of invocations.
1189fn push_outputs(
1190    conn: &Connection,
1191    remote_schema: &str,
1192    since: Option<NaiveDate>,
1193    _sync_blobs: bool,
1194) -> Result<usize> {
1195    let since_filter = since_clause(since, "a.timestamp");
1196
1197    // For now, we keep storage_ref as-is. The blob files are synced separately.
1198    // The storage_ref format (file://recent/blobs/...) is relative and works on both sides.
1199    let sql = format!(
1200        r#"
1201        INSERT INTO {remote}.outputs
1202        SELECT l.*
1203        FROM local.outputs l
1204        JOIN local.attempts a ON a.id = l.invocation_id
1205        WHERE NOT EXISTS (
1206            SELECT 1 FROM {remote}.outputs r WHERE r.id = l.id
1207        )
1208        {since}
1209        "#,
1210        remote = remote_schema,
1211        since = since_filter,
1212    );
1213
1214    let count = conn.execute(&sql, [])?;
1215    Ok(count)
1216}
1217
1218/// Pull outputs from remote, optionally handling storage_ref paths.
1219/// V5 schema: joins on attempts instead of invocations.
1220fn pull_outputs(
1221    conn: &Connection,
1222    remote_schema: &str,
1223    cached_schema: &str,
1224    since: Option<NaiveDate>,
1225    client_id: Option<&str>,
1226    _sync_blobs: bool,
1227) -> Result<usize> {
1228    let since_filter = since_clause(since, "a.timestamp");
1229    let client_filter = if client_id.is_some() {
1230        format!("AND a.source_client = '{}'", client_id.unwrap().replace('\'', "''"))
1231    } else {
1232        String::new()
1233    };
1234
1235    // For now, we keep storage_ref as-is. The blob files are synced separately.
1236    // The storage_ref format (file://recent/blobs/...) is relative and works on both sides.
1237    let sql = format!(
1238        r#"
1239        INSERT INTO {cached}.outputs (id, invocation_id, stream, content_hash, byte_length, storage_type, storage_ref, content_type, date)
1240        SELECT r.*
1241        FROM {remote}.outputs r
1242        JOIN {remote}.attempts a ON a.id = r.invocation_id
1243        WHERE NOT EXISTS (
1244            SELECT 1 FROM {cached}.outputs l WHERE l.id = r.id
1245        )
1246        {since}
1247        {client}
1248        "#,
1249        cached = cached_schema,
1250        remote = remote_schema,
1251        since = since_filter,
1252        client = client_filter,
1253    );
1254
1255    let count = conn.execute(&sql, [])?;
1256    Ok(count)
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261    use super::*;
1262    use crate::config::{RemoteConfig, RemoteMode, RemoteType};
1263    use crate::init::initialize;
1264    use crate::schema::InvocationRecord;
1265    use crate::store::{ConnectionOptions, Store};
1266    use crate::Config;
1267    use tempfile::TempDir;
1268
1269    fn setup_store_duckdb() -> (TempDir, Store) {
1270        let tmp = TempDir::new().unwrap();
1271        let config = Config::with_duckdb_mode(tmp.path());
1272        initialize(&config).unwrap();
1273        let store = Store::open(config).unwrap();
1274        (tmp, store)
1275    }
1276
1277    fn create_file_remote(name: &str, path: &std::path::Path) -> RemoteConfig {
1278        RemoteConfig {
1279            name: name.to_string(),
1280            remote_type: RemoteType::File,
1281            uri: path.to_string_lossy().to_string(),
1282            mode: RemoteMode::ReadWrite,
1283            auto_attach: true,
1284            credential_provider: None,
1285        }
1286    }
1287
1288    // ===== Date parsing tests =====
1289
1290    #[test]
1291    fn test_parse_since_days() {
1292        let today = Utc::now().date_naive();
1293        let result = parse_since("7d").unwrap();
1294        assert_eq!(result, today - TimeDelta::days(7));
1295    }
1296
1297    #[test]
1298    fn test_parse_since_weeks() {
1299        let today = Utc::now().date_naive();
1300        let result = parse_since("2w").unwrap();
1301        assert_eq!(result, today - TimeDelta::days(14));
1302    }
1303
1304    #[test]
1305    fn test_parse_since_months() {
1306        let today = Utc::now().date_naive();
1307        let result = parse_since("1m").unwrap();
1308        assert_eq!(result, today - TimeDelta::days(30));
1309    }
1310
1311    #[test]
1312    fn test_parse_since_date() {
1313        let result = parse_since("2024-01-15").unwrap();
1314        assert_eq!(result, NaiveDate::from_ymd_opt(2024, 1, 15).unwrap());
1315    }
1316
1317    #[test]
1318    fn test_parse_since_invalid() {
1319        assert!(parse_since("invalid").is_err());
1320    }
1321
1322    // ===== Push/Pull integration tests =====
1323
1324    #[test]
1325    fn test_push_to_file_remote() {
1326        let (tmp, store) = setup_store_duckdb();
1327
1328        // Write some local data
1329        let inv = InvocationRecord::new(
1330            "test-session",
1331            "echo hello",
1332            "/home/user",
1333            0,
1334            "test@client",
1335        );
1336        store.write_invocation(&inv).unwrap();
1337
1338        // Create a file remote
1339        let remote_path = tmp.path().join("remote.duckdb");
1340        let remote = create_file_remote("test", &remote_path);
1341
1342        // Push to remote
1343        let stats = store.push(&remote, PushOptions::default()).unwrap();
1344
1345        assert_eq!(stats.invocations, 1);
1346        assert!(remote_path.exists(), "Remote database file should be created");
1347    }
1348
1349    #[test]
1350    fn test_push_is_idempotent() {
1351        let (tmp, store) = setup_store_duckdb();
1352
1353        // Write local data
1354        let inv = InvocationRecord::new(
1355            "test-session",
1356            "echo hello",
1357            "/home/user",
1358            0,
1359            "test@client",
1360        );
1361        store.write_invocation(&inv).unwrap();
1362
1363        // Create remote
1364        let remote_path = tmp.path().join("remote.duckdb");
1365        let remote = create_file_remote("test", &remote_path);
1366
1367        // Push twice
1368        let stats1 = store.push(&remote, PushOptions::default()).unwrap();
1369        let stats2 = store.push(&remote, PushOptions::default()).unwrap();
1370
1371        // First push should transfer data, second should be idempotent
1372        assert_eq!(stats1.invocations, 1);
1373        assert_eq!(stats2.invocations, 0, "Second push should be idempotent");
1374    }
1375
1376    #[test]
1377    fn test_push_dry_run() {
1378        let (tmp, store) = setup_store_duckdb();
1379
1380        // Write local data
1381        let inv = InvocationRecord::new(
1382            "test-session",
1383            "echo hello",
1384            "/home/user",
1385            0,
1386            "test@client",
1387        );
1388        store.write_invocation(&inv).unwrap();
1389
1390        // Create remote
1391        let remote_path = tmp.path().join("remote.duckdb");
1392        let remote = create_file_remote("test", &remote_path);
1393
1394        // Dry run push
1395        let dry_stats = store
1396            .push(
1397                &remote,
1398                PushOptions {
1399                    dry_run: true,
1400                    ..Default::default()
1401                },
1402            )
1403            .unwrap();
1404
1405        assert_eq!(dry_stats.invocations, 1, "Dry run should count invocations");
1406
1407        // Actual push should still transfer data (dry run didn't modify)
1408        let actual_stats = store.push(&remote, PushOptions::default()).unwrap();
1409        assert_eq!(actual_stats.invocations, 1);
1410    }
1411
1412    #[test]
1413    fn test_pull_from_file_remote() {
1414        let (tmp, store) = setup_store_duckdb();
1415
1416        // Write local data and push to remote
1417        let inv = InvocationRecord::new(
1418            "test-session",
1419            "echo hello",
1420            "/home/user",
1421            0,
1422            "test@client",
1423        );
1424        store.write_invocation(&inv).unwrap();
1425
1426        let remote_path = tmp.path().join("remote.duckdb");
1427        let remote = create_file_remote("test", &remote_path);
1428        store.push(&remote, PushOptions::default()).unwrap();
1429
1430        // Clear local data (simulate different client)
1431        let conn = store.connection().unwrap();
1432        conn.execute("DELETE FROM local.outcomes; DELETE FROM local.attempts", []).unwrap();
1433        drop(conn);
1434
1435        // Pull from remote
1436        let stats = store.pull(&remote, PullOptions::default()).unwrap();
1437
1438        assert_eq!(stats.invocations, 1, "Should pull the invocation back");
1439
1440        // Verify data in cached schema
1441        let conn = store.connection().unwrap();
1442        let count: i64 = conn
1443            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1444            .unwrap();
1445        assert_eq!(count, 1, "Data should be in cached schema");
1446    }
1447
1448    #[test]
1449    fn test_pull_is_idempotent() {
1450        let (tmp, store) = setup_store_duckdb();
1451
1452        // Setup: write, push, clear local
1453        let inv = InvocationRecord::new(
1454            "test-session",
1455            "echo hello",
1456            "/home/user",
1457            0,
1458            "test@client",
1459        );
1460        store.write_invocation(&inv).unwrap();
1461
1462        let remote_path = tmp.path().join("remote.duckdb");
1463        let remote = create_file_remote("test", &remote_path);
1464        store.push(&remote, PushOptions::default()).unwrap();
1465
1466        // Pull twice
1467        let stats1 = store.pull(&remote, PullOptions::default()).unwrap();
1468        let stats2 = store.pull(&remote, PullOptions::default()).unwrap();
1469
1470        assert_eq!(stats1.invocations, 1);
1471        assert_eq!(stats2.invocations, 0, "Second pull should be idempotent");
1472    }
1473
1474    // ===== Remote name handling tests =====
1475
1476    #[test]
1477    fn test_remote_name_with_hyphen() {
1478        let (tmp, store) = setup_store_duckdb();
1479
1480        // Write local data
1481        let inv = InvocationRecord::new(
1482            "test-session",
1483            "echo hello",
1484            "/home/user",
1485            0,
1486            "test@client",
1487        );
1488        store.write_invocation(&inv).unwrap();
1489
1490        // Create remote with hyphen in name
1491        let remote_path = tmp.path().join("my-team-remote.duckdb");
1492        let remote = create_file_remote("my-team", &remote_path);
1493
1494        // Push should work despite hyphen
1495        let stats = store.push(&remote, PushOptions::default()).unwrap();
1496        assert_eq!(stats.invocations, 1);
1497
1498        // Pull should also work (hyphen in name handled correctly)
1499        let pull_stats = store.pull(&remote, PullOptions::default()).unwrap();
1500        // Pull brings data from remote into cached_my_team schema
1501        assert_eq!(pull_stats.invocations, 1);
1502    }
1503
1504    #[test]
1505    fn test_remote_name_with_dots() {
1506        let (tmp, store) = setup_store_duckdb();
1507
1508        let inv = InvocationRecord::new(
1509            "test-session",
1510            "echo hello",
1511            "/home/user",
1512            0,
1513            "test@client",
1514        );
1515        store.write_invocation(&inv).unwrap();
1516
1517        // Remote with dots in name
1518        let remote_path = tmp.path().join("team.v2.duckdb");
1519        let remote = create_file_remote("team.v2", &remote_path);
1520
1521        let stats = store.push(&remote, PushOptions::default()).unwrap();
1522        assert_eq!(stats.invocations, 1);
1523    }
1524
1525    // ===== Connection options tests =====
1526
1527    #[test]
1528    fn test_connection_minimal_vs_full() {
1529        let (_tmp, store) = setup_store_duckdb();
1530
1531        // Minimal connection should work
1532        let conn_minimal = store.connect(ConnectionOptions::minimal()).unwrap();
1533        let count: i64 = conn_minimal
1534            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1535            .unwrap();
1536        assert_eq!(count, 0);
1537        drop(conn_minimal);
1538
1539        // Full connection should also work
1540        let conn_full = store.connect(ConnectionOptions::full()).unwrap();
1541        let count: i64 = conn_full
1542            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1543            .unwrap();
1544        assert_eq!(count, 0);
1545    }
1546
1547    #[test]
1548    fn test_multiple_sequential_connections() {
1549        let (_tmp, store) = setup_store_duckdb();
1550
1551        // Open and close multiple connections sequentially
1552        // This tests for database corruption issues
1553        for i in 0..5 {
1554            let conn = store.connection().unwrap();
1555            let count: i64 = conn
1556                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1557                .unwrap();
1558            assert_eq!(count, 0, "Connection {} should work", i);
1559            drop(conn);
1560        }
1561
1562        // Write some data
1563        let inv = InvocationRecord::new(
1564            "test-session",
1565            "echo hello",
1566            "/home/user",
1567            0,
1568            "test@client",
1569        );
1570        store.write_invocation(&inv).unwrap();
1571
1572        // More connections should still work and see the data
1573        for i in 0..3 {
1574            let conn = store.connection().unwrap();
1575            let count: i64 = conn
1576                .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1577                .unwrap();
1578            assert_eq!(count, 1, "Connection {} should see the data", i);
1579            drop(conn);
1580        }
1581    }
1582
1583    // ===== Cached schema tests =====
1584
1585    #[test]
1586    fn test_caches_schema_views_work() {
1587        let (tmp, store) = setup_store_duckdb();
1588
1589        // Initially caches should be empty
1590        let conn = store.connection().unwrap();
1591        let count: i64 = conn
1592            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1593            .unwrap();
1594        assert_eq!(count, 0);
1595        drop(conn);
1596
1597        // Write and push data
1598        let inv = InvocationRecord::new(
1599            "test-session",
1600            "echo hello",
1601            "/home/user",
1602            0,
1603            "test@client",
1604        );
1605        store.write_invocation(&inv).unwrap();
1606
1607        let remote_path = tmp.path().join("remote.duckdb");
1608        let remote = create_file_remote("test", &remote_path);
1609        store.push(&remote, PushOptions::default()).unwrap();
1610
1611        // Pull creates cached_test schema
1612        store.pull(&remote, PullOptions::default()).unwrap();
1613
1614        // Rebuild caches views to include cached_test
1615        let conn = store.connection().unwrap();
1616        store.rebuild_caches_schema(&conn).unwrap();
1617
1618        // caches.invocations should now have data
1619        let count: i64 = conn
1620            .query_row("SELECT COUNT(*) FROM caches.invocations", [], |r| r.get(0))
1621            .unwrap();
1622        assert_eq!(count, 1, "caches should include pulled data after rebuild");
1623    }
1624
1625    #[test]
1626    fn test_main_schema_unions_local_and_caches() {
1627        let (tmp, store) = setup_store_duckdb();
1628
1629        // Write local data
1630        let inv1 = InvocationRecord::new(
1631            "test-session",
1632            "local command",
1633            "/home/user",
1634            0,
1635            "local@client",
1636        );
1637        store.write_invocation(&inv1).unwrap();
1638
1639        // Push to remote, then pull (simulating another client's data)
1640        let remote_path = tmp.path().join("remote.duckdb");
1641        let remote = create_file_remote("team", &remote_path);
1642
1643        // Push local data to remote
1644        store.push(&remote, PushOptions::default()).unwrap();
1645
1646        // Delete local, pull from remote to create cached data
1647        let conn = store.connection().unwrap();
1648        conn.execute("DELETE FROM local.outcomes; DELETE FROM local.attempts", []).unwrap();
1649        drop(conn);
1650
1651        store.pull(&remote, PullOptions::default()).unwrap();
1652
1653        // Rebuild caches schema
1654        let conn = store.connection().unwrap();
1655        store.rebuild_caches_schema(&conn).unwrap();
1656
1657        // Write new local data
1658        let inv2 = InvocationRecord::new(
1659            "test-session-2",
1660            "new local command",
1661            "/home/user",
1662            0,
1663            "local@client",
1664        );
1665        drop(conn);
1666        store.write_invocation(&inv2).unwrap();
1667
1668        // main.invocations should have both local and cached
1669        let conn = store.connection().unwrap();
1670        let count: i64 = conn
1671            .query_row("SELECT COUNT(*) FROM main.invocations", [], |r| r.get(0))
1672            .unwrap();
1673        assert_eq!(count, 2, "main should union local + caches");
1674    }
1675
1676    // ===== Heterogeneous Storage Mode Tests =====
1677    // Test querying across different storage modes (parquet and duckdb)
1678
1679    fn setup_store_parquet() -> (TempDir, Store) {
1680        let tmp = TempDir::new().unwrap();
1681        let config = Config::with_root(tmp.path()); // Parquet is the default
1682        initialize(&config).unwrap();
1683        let store = Store::open(config).unwrap();
1684        (tmp, store)
1685    }
1686
1687    #[test]
1688    fn test_heterogeneous_parquet_local_duckdb_remote() {
1689        // Local store uses parquet mode
1690        let (_local_tmp, local_store) = setup_store_parquet();
1691
1692        // Remote store uses duckdb mode
1693        let remote_tmp = TempDir::new().unwrap();
1694        let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1695        initialize(&remote_config).unwrap();
1696        let remote_store = Store::open(remote_config).unwrap();
1697
1698        // Write data to remote (DuckDB mode - stored in local.invocations table)
1699        let remote_inv = InvocationRecord::new(
1700            "remote-session",
1701            "remote command",
1702            "/home/remote",
1703            0,
1704            "remote@client",
1705        );
1706        remote_store.write_invocation(&remote_inv).unwrap();
1707
1708        // Verify remote has data
1709        let remote_conn = remote_store.connection().unwrap();
1710        let remote_count: i64 = remote_conn
1711            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1712            .unwrap();
1713        assert_eq!(remote_count, 1, "Remote should have data in local schema");
1714        drop(remote_conn);
1715
1716        // Write data to local (Parquet mode - stored in parquet files)
1717        let local_inv = InvocationRecord::new(
1718            "local-session",
1719            "local command",
1720            "/home/local",
1721            0,
1722            "local@client",
1723        );
1724        local_store.write_invocation(&local_inv).unwrap();
1725
1726        // Configure local to attach the remote (read-only)
1727        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1728        let remote_config = RemoteConfig {
1729            name: "duckdb-store".to_string(),
1730            remote_type: RemoteType::File,
1731            uri: format!("file://{}", remote_db_path.display()),
1732            mode: RemoteMode::ReadOnly,
1733            auto_attach: true,
1734            credential_provider: None,
1735        };
1736
1737        // Manually attach the remote to test heterogeneous querying
1738        let conn = local_store.connection_with_options(false).unwrap();
1739        local_store.attach_remote(&conn, &remote_config).unwrap();
1740
1741        // Create remote macros (this tests detect_remote_table_path)
1742        // The remote is a DuckDB-mode BIRD database, so tables are in local schema
1743        let schema = remote_config.quoted_schema_name();
1744        let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1745        assert_eq!(table_prefix, "local.", "Should detect DuckDB mode remote has local. prefix");
1746
1747        // Query the remote directly
1748        let remote_count: i64 = conn
1749            .query_row(
1750                &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1751                [],
1752                |r| r.get(0),
1753            )
1754            .unwrap();
1755        assert_eq!(remote_count, 1, "Should be able to query DuckDB remote from Parquet local");
1756    }
1757
1758    #[test]
1759    fn test_heterogeneous_duckdb_local_parquet_remote() {
1760        // Local store uses duckdb mode
1761        let (_local_tmp, local_store) = setup_store_duckdb();
1762
1763        // Remote store uses parquet mode
1764        let remote_tmp = TempDir::new().unwrap();
1765        let remote_config = Config::with_root(remote_tmp.path());
1766        initialize(&remote_config).unwrap();
1767        let remote_store = Store::open(remote_config).unwrap();
1768
1769        // Write data to remote (Parquet mode - stored in parquet files)
1770        let remote_inv = InvocationRecord::new(
1771            "remote-session",
1772            "remote command",
1773            "/home/remote",
1774            0,
1775            "remote@client",
1776        );
1777        remote_store.write_invocation(&remote_inv).unwrap();
1778
1779        // Write data to local (DuckDB mode - stored in local.invocations table)
1780        let local_inv = InvocationRecord::new(
1781            "local-session",
1782            "local command",
1783            "/home/local",
1784            0,
1785            "local@client",
1786        );
1787        local_store.write_invocation(&local_inv).unwrap();
1788
1789        // Configure local to attach the remote (read-only)
1790        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1791        let remote_config = RemoteConfig {
1792            name: "parquet-store".to_string(),
1793            remote_type: RemoteType::File,
1794            uri: format!("file://{}", remote_db_path.display()),
1795            mode: RemoteMode::ReadOnly,
1796            auto_attach: true,
1797            credential_provider: None,
1798        };
1799
1800        // Manually attach the remote (this should also set file_search_path)
1801        let conn = local_store.connection_with_options(false).unwrap();
1802        local_store.attach_remote(&conn, &remote_config).unwrap();
1803
1804        // Verify detection works - both modes have local.invocations
1805        let schema = remote_config.quoted_schema_name();
1806        let table_prefix = local_store.detect_remote_table_path(&conn, &schema);
1807        assert_eq!(table_prefix, "local.", "BIRD databases have local schema in both modes");
1808
1809        // Query the remote directly (parquet mode views should resolve via file_search_path)
1810        let remote_count: i64 = conn
1811            .query_row(
1812                &format!("SELECT COUNT(*) FROM {}.local.invocations", schema),
1813                [],
1814                |r| r.get(0),
1815            )
1816            .unwrap();
1817        assert_eq!(remote_count, 1, "Should be able to query Parquet remote from DuckDB local");
1818    }
1819
1820    #[test]
1821    fn test_heterogeneous_unified_views() {
1822        // This tests the full heterogeneous setup with unified views
1823        let (local_tmp, local_store) = setup_store_parquet();
1824
1825        // Create a DuckDB-mode remote
1826        let remote_tmp = TempDir::new().unwrap();
1827        let remote_config = Config::with_duckdb_mode(remote_tmp.path());
1828        initialize(&remote_config).unwrap();
1829        let remote_store = Store::open(remote_config).unwrap();
1830
1831        // Write unique data to remote
1832        let remote_inv = InvocationRecord::new(
1833            "remote-session",
1834            "remote-specific-cmd",
1835            "/home/remote",
1836            42,
1837            "remote@client",
1838        );
1839        remote_store.write_invocation(&remote_inv).unwrap();
1840
1841        // Write unique data to local
1842        let local_inv = InvocationRecord::new(
1843            "local-session",
1844            "local-specific-cmd",
1845            "/home/local",
1846            0,
1847            "local@client",
1848        );
1849        local_store.write_invocation(&local_inv).unwrap();
1850
1851        // Create config with remote
1852        let remote_db_path = remote_tmp.path().join("db/bird.duckdb");
1853        let mut config = Config::with_root(local_tmp.path());
1854        config.remotes.push(RemoteConfig {
1855            name: "heterogeneous-test".to_string(),
1856            remote_type: RemoteType::File,
1857            uri: format!("file://{}", remote_db_path.display()),
1858            mode: RemoteMode::ReadOnly,
1859            auto_attach: true,
1860            credential_provider: None,
1861        });
1862
1863        // Open store with remote config
1864        let store = Store::open(config).unwrap();
1865
1866        // Connection with auto-attach should set up unified views
1867        let conn = store.connection().unwrap();
1868
1869        // Query local data
1870        let local_count: i64 = conn
1871            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1872            .unwrap();
1873        assert_eq!(local_count, 1, "Local should have 1 record");
1874
1875        // Query unified view - should include both local and remote
1876        let unified_count: i64 = conn
1877            .query_row("SELECT COUNT(*) FROM unified.invocations", [], |r| r.get(0))
1878            .unwrap();
1879        assert_eq!(unified_count, 2, "Unified view should have local + remote records");
1880
1881        // Verify we can see both commands
1882        let cmds: Vec<String> = conn
1883            .prepare("SELECT cmd FROM unified.invocations ORDER BY cmd")
1884            .unwrap()
1885            .query_map([], |r| r.get(0))
1886            .unwrap()
1887            .filter_map(|r| r.ok())
1888            .collect();
1889        assert_eq!(cmds.len(), 2);
1890        assert!(cmds.contains(&"local-specific-cmd".to_string()));
1891        assert!(cmds.contains(&"remote-specific-cmd".to_string()));
1892    }
1893
1894    #[test]
1895    fn test_detect_remote_table_path_standalone_db() {
1896        // Test detection of standalone databases (not BIRD, no local schema)
1897        let (_tmp, store) = setup_store_duckdb();
1898
1899        // Create a standalone database (not a BIRD database)
1900        let standalone_tmp = TempDir::new().unwrap();
1901        let standalone_db_path = standalone_tmp.path().join("standalone.duckdb");
1902        {
1903            let conn = duckdb::Connection::open(&standalone_db_path).unwrap();
1904            conn.execute(
1905                "CREATE TABLE invocations (id UUID, cmd VARCHAR)",
1906                [],
1907            )
1908            .unwrap();
1909            conn.execute(
1910                "INSERT INTO invocations VALUES (gen_random_uuid(), 'test')",
1911                [],
1912            )
1913            .unwrap();
1914        }
1915
1916        // Attach as remote
1917        let remote = RemoteConfig {
1918            name: "standalone".to_string(),
1919            remote_type: RemoteType::File,
1920            uri: format!("file://{}", standalone_db_path.display()),
1921            mode: RemoteMode::ReadOnly,
1922            auto_attach: true,
1923            credential_provider: None,
1924        };
1925
1926        let conn = store.connection_with_options(false).unwrap();
1927        store.attach_remote(&conn, &remote).unwrap();
1928
1929        // Detect table path (should be empty - no local schema)
1930        let schema = remote.quoted_schema_name();
1931        let table_prefix = store.detect_remote_table_path(&conn, &schema);
1932        assert_eq!(table_prefix, "", "Standalone DB should have no prefix");
1933
1934        // Query should work
1935        let count: i64 = conn
1936            .query_row(
1937                &format!("SELECT COUNT(*) FROM {}.invocations", schema),
1938                [],
1939                |r| r.get(0),
1940            )
1941            .unwrap();
1942        assert_eq!(count, 1);
1943    }
1944
1945    #[test]
1946    fn test_push_to_readonly_remote_fails() {
1947        let (_tmp, store) = setup_store_duckdb();
1948
1949        // Write local data
1950        let inv = InvocationRecord::new(
1951            "test-session",
1952            "echo hello",
1953            "/home/user",
1954            0,
1955            "test@client",
1956        );
1957        store.write_invocation(&inv).unwrap();
1958
1959        // Create a read-only remote
1960        let remote_tmp = TempDir::new().unwrap();
1961        let remote_path = remote_tmp.path().join("remote.duckdb");
1962        let remote = RemoteConfig {
1963            name: "readonly".to_string(),
1964            remote_type: RemoteType::File,
1965            uri: format!("file://{}", remote_path.display()),
1966            mode: RemoteMode::ReadOnly,
1967            auto_attach: true,
1968            credential_provider: None,
1969        };
1970
1971        // Push to read-only should fail
1972        let result = store.push(&remote, PushOptions::default());
1973        assert!(result.is_err(), "Push to read-only remote should fail");
1974        assert!(
1975            result.unwrap_err().to_string().contains("Cannot push to read-only"),
1976            "Error should mention read-only"
1977        );
1978    }
1979
1980    #[test]
1981    fn test_push_to_readonly_remote_dry_run_returns_empty() {
1982        let (_tmp, store) = setup_store_duckdb();
1983
1984        // Write local data
1985        let inv = InvocationRecord::new(
1986            "test-session",
1987            "echo hello",
1988            "/home/user",
1989            0,
1990            "test@client",
1991        );
1992        store.write_invocation(&inv).unwrap();
1993
1994        // Create a read-only remote
1995        let remote_tmp = TempDir::new().unwrap();
1996        let remote_path = remote_tmp.path().join("remote.duckdb");
1997        let remote = RemoteConfig {
1998            name: "readonly".to_string(),
1999            remote_type: RemoteType::File,
2000            uri: format!("file://{}", remote_path.display()),
2001            mode: RemoteMode::ReadOnly,
2002            auto_attach: true,
2003            credential_provider: None,
2004        };
2005
2006        // Dry run on read-only should return empty stats (nothing to push)
2007        let stats = store.push(&remote, PushOptions { dry_run: true, ..Default::default() }).unwrap();
2008        assert_eq!(stats.invocations, 0);
2009        assert_eq!(stats.sessions, 0);
2010    }
2011}