Skip to main content

ai_memory/cli/
sync.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! `cmd_sync` and `cmd_sync_daemon` migrations. The daemon-mode body
5//! delegates to `daemon_runtime::run_sync_daemon_with_shutdown_using_client`
6//! (W3 work); this module owns only the wrapper + the in-process sync
7//! body (pull/push/merge/dry-run).
8
9use crate::cli::CliOutput;
10use crate::{db, identity, models, tls, validate};
11use anyhow::Result;
12use clap::Args;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tracing_subscriber::EnvFilter;
16
17#[derive(Args)]
18pub struct SyncArgs {
19    /// Path to the remote database to sync with
20    pub remote_db: PathBuf,
21    /// Direction: pull, push, or merge
22    #[arg(long, short, default_value = "merge")]
23    pub direction: String,
24    /// Trust `metadata.agent_id` in remote memories (default: restamp with caller's id).
25    /// Only use this when syncing between databases you fully control (e.g., your own backup).
26    #[arg(long, default_value_t = false)]
27    pub trust_source: bool,
28    /// Phase 3 foundation (issue #224): preview what would change without
29    /// writing anything. Counts new / updated / unchanged memories and
30    /// links in each direction. Uses today's timestamp-aware merge
31    /// semantics; CRDT-lite field-level diagnostics land with #224 Task 3a.1.
32    #[arg(long, default_value_t = false)]
33    pub dry_run: bool,
34}
35
36#[derive(Args)]
37pub struct SyncDaemonArgs {
38    /// Comma-separated list of peer HTTP endpoints to mesh with.
39    #[arg(long, value_delimiter = ',')]
40    pub peers: Vec<String>,
41    /// Seconds between sync cycles. Minimum 1.
42    #[arg(long, default_value_t = 2)]
43    pub interval: u64,
44    /// Optional `X-API-Key` to present to peers that have api-key auth enabled.
45    #[arg(long)]
46    pub api_key: Option<String>,
47    /// Cap on the number of memories transferred per peer per cycle.
48    #[arg(long, default_value_t = 500)]
49    pub batch_size: usize,
50    /// Layer 2 client-cert PEM used when the peer demands mTLS.
51    #[arg(long, requires = "client_key")]
52    pub client_cert: Option<PathBuf>,
53    /// Layer 2 client-key PEM. Must pair with `--client-cert`.
54    #[arg(long, requires = "client_cert")]
55    pub client_key: Option<PathBuf>,
56    /// Disable server-cert verification on outbound HTTPS to peers.
57    /// **DANGEROUS** — accepts any server cert without validation.
58    #[arg(long, default_value_t = false)]
59    pub insecure_skip_server_verify: bool,
60}
61
62/// NHI: restamp `metadata.agent_id` to the caller's id, preserving the
63/// original as `imported_from_agent_id`. Mirrors `main.rs::restamp_agent_id`
64/// (W5 had to extract it because main.rs version is private).
65fn restamp_agent_id(mem: &mut models::Memory, caller_id: &str) {
66    let original = mem
67        .metadata
68        .get("agent_id")
69        .and_then(serde_json::Value::as_str)
70        .map(ToString::to_string);
71    if let Some(obj) = mem.metadata.as_object_mut() {
72        obj.insert(
73            "agent_id".to_string(),
74            serde_json::Value::String(caller_id.to_string()),
75        );
76        if let Some(orig) = original
77            && orig != caller_id
78        {
79            obj.insert(
80                crate::models::field_names::IMPORTED_FROM_AGENT_ID.to_string(),
81                serde_json::Value::String(orig),
82            );
83        }
84    }
85}
86
87#[derive(Default)]
88struct SyncPreview {
89    would_pull_new: usize,
90    would_pull_update: usize,
91    would_pull_noop: usize,
92    would_push_new: usize,
93    would_push_update: usize,
94    would_push_noop: usize,
95    would_pull_links: usize,
96    would_push_links: usize,
97}
98
99impl SyncPreview {
100    fn classify(local: Option<&models::Memory>, remote: &models::Memory) -> MergeOutcome {
101        match local {
102            None => MergeOutcome::New,
103            Some(existing) => {
104                if remote.updated_at > existing.updated_at {
105                    MergeOutcome::Update
106                } else {
107                    MergeOutcome::Noop
108                }
109            }
110        }
111    }
112}
113
114enum MergeOutcome {
115    New,
116    Update,
117    Noop,
118}
119
120/// `sync` handler.
121#[allow(clippy::too_many_lines)]
122pub fn run(
123    db_path: &Path,
124    args: &SyncArgs,
125    json_out: bool,
126    cli_agent_id: Option<&str>,
127    out: &mut CliOutput<'_>,
128) -> Result<()> {
129    let local_conn = db::open(db_path)?;
130    let remote_conn = db::open(&args.remote_db)?;
131    let caller_id = identity::resolve_agent_id(cli_agent_id, None)?;
132
133    if args.dry_run {
134        return cmd_sync_dry_run(&local_conn, &remote_conn, &args.direction, json_out, out);
135    }
136
137    match args.direction.as_str() {
138        "pull" => {
139            let mems = db::export_all(&remote_conn)?;
140            let links = db::export_links(&remote_conn)?;
141            let mut n = 0;
142            for mem in &mems {
143                let mut owned = mem.clone();
144                if !args.trust_source {
145                    restamp_agent_id(&mut owned, &caller_id);
146                }
147                if let Err(e) = validate::validate_memory(&owned) {
148                    tracing::warn!("sync: skipping invalid memory {}: {}", owned.id, e);
149                    continue;
150                }
151                if db::insert(&local_conn, &owned).is_ok() {
152                    n += 1;
153                }
154            }
155            for link in &links {
156                if validate::validate_link(&link.source_id, &link.target_id, link.relation.as_str())
157                    .is_err()
158                {
159                    continue;
160                }
161                let _ = db::create_link(
162                    &local_conn,
163                    &link.source_id,
164                    &link.target_id,
165                    link.relation.as_str(),
166                );
167            }
168            if json_out {
169                writeln!(
170                    out.stdout,
171                    "{}",
172                    serde_json::json!({"direction": "pull", "imported": n})
173                )?;
174            } else {
175                writeln!(out.stdout, "pulled {n} memories from remote")?;
176            }
177        }
178        "push" => {
179            let mems = db::export_all(&local_conn)?;
180            let links = db::export_links(&local_conn)?;
181            let mut n = 0;
182            for mem in &mems {
183                if let Err(e) = validate::validate_memory(mem) {
184                    tracing::warn!("sync: skipping invalid memory {}: {}", mem.id, e);
185                    continue;
186                }
187                if db::insert(&remote_conn, mem).is_ok() {
188                    n += 1;
189                }
190            }
191            for link in &links {
192                if validate::validate_link(&link.source_id, &link.target_id, link.relation.as_str())
193                    .is_err()
194                {
195                    continue;
196                }
197                let _ = db::create_link(
198                    &remote_conn,
199                    &link.source_id,
200                    &link.target_id,
201                    link.relation.as_str(),
202                );
203            }
204            if json_out {
205                writeln!(
206                    out.stdout,
207                    "{}",
208                    serde_json::json!({"direction": "push", "exported": n})
209                )?;
210            } else {
211                writeln!(out.stdout, "pushed {n} memories to remote")?;
212            }
213        }
214        "merge" => {
215            let r_mems = db::export_all(&remote_conn)?;
216            let r_links = db::export_links(&remote_conn)?;
217            let l_mems = db::export_all(&local_conn)?;
218            let l_links = db::export_links(&local_conn)?;
219            let (mut pulled, mut pushed) = (0, 0);
220            for mem in &r_mems {
221                let mut owned = mem.clone();
222                if !args.trust_source {
223                    restamp_agent_id(&mut owned, &caller_id);
224                }
225                if validate::validate_memory(&owned).is_err() {
226                    continue;
227                }
228                if db::insert_if_newer(&local_conn, &owned).is_ok() {
229                    pulled += 1;
230                }
231            }
232            for link in &r_links {
233                if validate::validate_link(&link.source_id, &link.target_id, link.relation.as_str())
234                    .is_err()
235                {
236                    continue;
237                }
238                let _ = db::create_link(
239                    &local_conn,
240                    &link.source_id,
241                    &link.target_id,
242                    link.relation.as_str(),
243                );
244            }
245            for mem in &l_mems {
246                if validate::validate_memory(mem).is_err() {
247                    continue;
248                }
249                if db::insert_if_newer(&remote_conn, mem).is_ok() {
250                    pushed += 1;
251                }
252            }
253            for link in &l_links {
254                if validate::validate_link(&link.source_id, &link.target_id, link.relation.as_str())
255                    .is_err()
256                {
257                    continue;
258                }
259                let _ = db::create_link(
260                    &remote_conn,
261                    &link.source_id,
262                    &link.target_id,
263                    link.relation.as_str(),
264                );
265            }
266            if json_out {
267                writeln!(
268                    out.stdout,
269                    "{}",
270                    serde_json::json!({"direction": "merge", "pulled": pulled, "pushed": pushed})
271                )?;
272            } else {
273                writeln!(out.stdout, "merged: pulled {pulled}, pushed {pushed}")?;
274            }
275        }
276        _ => anyhow::bail!(
277            "invalid direction: {} (use pull, push, merge)",
278            args.direction
279        ),
280    }
281    Ok(())
282}
283
284fn cmd_sync_dry_run(
285    local_conn: &rusqlite::Connection,
286    remote_conn: &rusqlite::Connection,
287    direction: &str,
288    json_out: bool,
289    out: &mut CliOutput<'_>,
290) -> Result<()> {
291    let l_mems = db::export_all(local_conn)?;
292    let r_mems = db::export_all(remote_conn)?;
293    let l_links = db::export_links(local_conn)?;
294    let r_links = db::export_links(remote_conn)?;
295
296    let local_by_id: std::collections::HashMap<&str, &models::Memory> =
297        l_mems.iter().map(|m| (m.id.as_str(), m)).collect();
298    let remote_by_id: std::collections::HashMap<&str, &models::Memory> =
299        r_mems.iter().map(|m| (m.id.as_str(), m)).collect();
300
301    let mut preview = SyncPreview::default();
302
303    let classify_pull = direction != "push";
304    let classify_push = direction != "pull";
305
306    if classify_pull {
307        for mem in &r_mems {
308            match SyncPreview::classify(local_by_id.get(mem.id.as_str()).copied(), mem) {
309                MergeOutcome::New => preview.would_pull_new += 1,
310                MergeOutcome::Update => preview.would_pull_update += 1,
311                MergeOutcome::Noop => preview.would_pull_noop += 1,
312            }
313        }
314        preview.would_pull_links = r_links.len();
315    }
316
317    if classify_push {
318        for mem in &l_mems {
319            match SyncPreview::classify(remote_by_id.get(mem.id.as_str()).copied(), mem) {
320                MergeOutcome::New => preview.would_push_new += 1,
321                MergeOutcome::Update => preview.would_push_update += 1,
322                MergeOutcome::Noop => preview.would_push_noop += 1,
323            }
324        }
325        preview.would_push_links = l_links.len();
326    }
327
328    if json_out {
329        writeln!(
330            out.stdout,
331            "{}",
332            serde_json::json!({
333                "dry_run": true,
334                "direction": direction,
335                "pull": {
336                    "new": preview.would_pull_new,
337                    "update": preview.would_pull_update,
338                    "noop": preview.would_pull_noop,
339                    "links": preview.would_pull_links,
340                },
341                "push": {
342                    "new": preview.would_push_new,
343                    "update": preview.would_push_update,
344                    "noop": preview.would_push_noop,
345                    "links": preview.would_push_links,
346                }
347            })
348        )?;
349    } else {
350        writeln!(
351            out.stdout,
352            "DRY RUN — no changes written. Direction: {direction}"
353        )?;
354        if classify_pull {
355            writeln!(
356                out.stdout,
357                "  pull: {} new, {} update, {} noop, {} links",
358                preview.would_pull_new,
359                preview.would_pull_update,
360                preview.would_pull_noop,
361                preview.would_pull_links
362            )?;
363        }
364        if classify_push {
365            writeln!(
366                out.stdout,
367                "  push: {} new, {} update, {} noop, {} links",
368                preview.would_push_new,
369                preview.would_push_update,
370                preview.would_push_noop,
371                preview.would_push_links
372            )?;
373        }
374    }
375    Ok(())
376}
377
378/// `sync-daemon` handler. Delegates the inner loop to `daemon_runtime`.
379pub async fn run_daemon(
380    db_path: &Path,
381    args: SyncDaemonArgs,
382    cli_agent_id: Option<&str>,
383) -> Result<()> {
384    if args.peers.is_empty() {
385        anyhow::bail!("at least one --peers URL is required");
386    }
387    let interval = args.interval.max(1);
388    let batch_size = args.batch_size.max(1);
389    let local_agent_id = identity::resolve_agent_id(cli_agent_id, None)?;
390
391    let _ = tracing_subscriber::fmt()
392        .with_env_filter(
393            EnvFilter::from_default_env()
394                .add_directive(crate::logging::DEFAULT_LOG_DIRECTIVE.parse()?)
395                .add_directive("tower_http=info".parse()?),
396        )
397        .try_init();
398
399    let _ = rustls::crypto::ring::default_provider().install_default();
400    if args.insecure_skip_server_verify && (args.client_cert.is_none() || args.client_key.is_none())
401    {
402        anyhow::bail!(
403            "sync-daemon: --insecure-skip-server-verify requires both --client-cert \
404             and --client-key as a compensating mTLS control. Running with neither side \
405             of the TLS handshake verified is an open MITM surface and is refused."
406        );
407    }
408
409    let client = if let (Some(cert_path), Some(key_path)) = (&args.client_cert, &args.client_key) {
410        let rustls_config = tls::build_rustls_client_config(cert_path, key_path).await?;
411        let mut builder = reqwest::Client::builder()
412            .timeout(std::time::Duration::from_secs(30))
413            .use_preconfigured_tls(rustls_config);
414        if args.insecure_skip_server_verify {
415            tracing::warn!(
416                "sync-daemon: --insecure-skip-server-verify set with --client-cert — \
417                 peer server certificates will NOT be validated; peer authenticates us \
418                 via mTLS allowlist (compensating control). Do NOT use in production."
419            );
420            builder = builder.danger_accept_invalid_certs(true);
421        }
422        builder.build()?
423    } else {
424        reqwest::Client::builder()
425            .timeout(std::time::Duration::from_secs(30))
426            .build()?
427    };
428
429    tracing::info!(
430        "sync-daemon: local_agent_id={local_agent_id} peers={peers:?} interval={interval}s",
431        peers = args.peers
432    );
433
434    let shutdown = Arc::new(tokio::sync::Notify::new());
435    let shutdown_for_signal = shutdown.clone();
436    tokio::spawn(async move {
437        let _ = tokio::signal::ctrl_c().await;
438        shutdown_for_signal.notify_one();
439    });
440
441    crate::daemon_runtime::run_sync_daemon_with_shutdown_using_client(
442        client,
443        db_path.to_path_buf(),
444        local_agent_id,
445        args.peers,
446        args.api_key,
447        interval,
448        batch_size,
449        shutdown,
450    )
451    .await
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::cli::test_utils::{TestEnv, seed_memory};
458
459    fn args_for(remote_db: PathBuf, direction: &str) -> SyncArgs {
460        SyncArgs {
461            remote_db,
462            direction: direction.to_string(),
463            trust_source: false,
464            dry_run: false,
465        }
466    }
467
468    #[test]
469    fn test_sync_dry_run_merge() {
470        let mut env = TestEnv::fresh();
471        let local = env.db_path.clone();
472        let remote_env = TestEnv::fresh();
473        let remote = remote_env.db_path.clone();
474        seed_memory(&local, "ns", "local-only", "L");
475        seed_memory(&remote, "ns", "remote-only", "R");
476        let mut args = args_for(remote, "merge");
477        args.dry_run = true;
478        {
479            let mut out = env.output();
480            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
481        }
482        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
483        assert_eq!(v["dry_run"].as_bool().unwrap(), true);
484        assert_eq!(v["direction"].as_str().unwrap(), "merge");
485    }
486
487    #[test]
488    fn test_sync_pull_direction() {
489        let mut env = TestEnv::fresh();
490        let local = env.db_path.clone();
491        let remote_env = TestEnv::fresh();
492        let remote = remote_env.db_path.clone();
493        seed_memory(&remote, "ns", "from-remote", "data");
494        let args = args_for(remote, "pull");
495        {
496            let mut out = env.output();
497            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
498        }
499        assert!(env.stdout_str().contains("pulled"));
500    }
501
502    #[test]
503    fn test_sync_push_direction() {
504        let mut env = TestEnv::fresh();
505        let local = env.db_path.clone();
506        let remote_env = TestEnv::fresh();
507        let remote = remote_env.db_path.clone();
508        seed_memory(&local, "ns", "to-remote", "data");
509        let args = args_for(remote, "push");
510        {
511            let mut out = env.output();
512            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
513        }
514        assert!(env.stdout_str().contains("pushed"));
515    }
516
517    #[test]
518    fn test_sync_merge_direction() {
519        let mut env = TestEnv::fresh();
520        let local = env.db_path.clone();
521        let remote_env = TestEnv::fresh();
522        let remote = remote_env.db_path.clone();
523        seed_memory(&local, "ns", "L", "L");
524        seed_memory(&remote, "ns", "R", "R");
525        let args = args_for(remote, "merge");
526        {
527            let mut out = env.output();
528            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
529        }
530        assert!(env.stdout_str().contains("merged:"));
531    }
532
533    #[test]
534    fn test_sync_invalid_direction_errors() {
535        let mut env = TestEnv::fresh();
536        let local = env.db_path.clone();
537        let remote_env = TestEnv::fresh();
538        let remote = remote_env.db_path.clone();
539        let args = args_for(remote, "sideways");
540        let mut out = env.output();
541        let res = run(&local, &args, false, Some("test-agent"), &mut out);
542        assert!(res.is_err());
543    }
544
545    #[test]
546    fn test_sync_dry_run_pull_only() {
547        let mut env = TestEnv::fresh();
548        let local = env.db_path.clone();
549        let remote_env = TestEnv::fresh();
550        let remote = remote_env.db_path.clone();
551        seed_memory(&remote, "ns", "remote", "x");
552        let mut args = args_for(remote, "pull");
553        args.dry_run = true;
554        {
555            let mut out = env.output();
556            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
557        }
558        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
559        assert!(v["pull"]["new"].as_u64().unwrap() >= 1);
560    }
561
562    #[test]
563    fn test_restamp_agent_id_preserves_original() {
564        let mut mem = models::Memory {
565            id: "m1".to_string(),
566            tier: models::Tier::Mid,
567            namespace: "ns".to_string(),
568            title: "t".to_string(),
569            content: "c".to_string(),
570            tags: vec![],
571            priority: 5,
572            confidence: 1.0,
573            source: "test".to_string(),
574            access_count: 0,
575            created_at: "2026-01-01T00:00:00Z".to_string(),
576            updated_at: "2026-01-01T00:00:00Z".to_string(),
577            last_accessed_at: None,
578            expires_at: None,
579            metadata: serde_json::json!({"agent_id": "remote-agent"}),
580            reflection_depth: 0,
581            memory_kind: crate::models::MemoryKind::Observation,
582            entity_id: None,
583            persona_version: None,
584            citations: Vec::new(),
585            source_uri: None,
586            source_span: None,
587            confidence_source: crate::models::ConfidenceSource::CallerProvided,
588            confidence_signals: None,
589            confidence_decayed_at: None,
590            version: 1,
591        };
592        restamp_agent_id(&mut mem, "local-agent");
593        assert_eq!(mem.metadata["agent_id"].as_str().unwrap(), "local-agent");
594        assert_eq!(
595            mem.metadata["imported_from_agent_id"].as_str().unwrap(),
596            "remote-agent"
597        );
598    }
599
600    #[test]
601    fn test_restamp_same_agent_no_imported_from() {
602        let mut mem = models::Memory {
603            id: "m1".to_string(),
604            tier: models::Tier::Mid,
605            namespace: "ns".to_string(),
606            title: "t".to_string(),
607            content: "c".to_string(),
608            tags: vec![],
609            priority: 5,
610            confidence: 1.0,
611            source: "test".to_string(),
612            access_count: 0,
613            created_at: "2026-01-01T00:00:00Z".to_string(),
614            updated_at: "2026-01-01T00:00:00Z".to_string(),
615            last_accessed_at: None,
616            expires_at: None,
617            metadata: serde_json::json!({"agent_id": "same-agent"}),
618            reflection_depth: 0,
619            memory_kind: crate::models::MemoryKind::Observation,
620            entity_id: None,
621            persona_version: None,
622            citations: Vec::new(),
623            source_uri: None,
624            source_span: None,
625            confidence_source: crate::models::ConfidenceSource::CallerProvided,
626            confidence_signals: None,
627            confidence_decayed_at: None,
628            version: 1,
629        };
630        restamp_agent_id(&mut mem, "same-agent");
631        assert_eq!(mem.metadata["agent_id"].as_str().unwrap(), "same-agent");
632        assert!(mem.metadata.get("imported_from_agent_id").is_none());
633    }
634
635    #[tokio::test]
636    async fn test_sync_daemon_empty_peers_errors() {
637        let env = TestEnv::fresh();
638        let db = env.db_path.clone();
639        let args = SyncDaemonArgs {
640            peers: Vec::new(),
641            interval: 2,
642            api_key: None,
643            batch_size: 500,
644            client_cert: None,
645            client_key: None,
646            insecure_skip_server_verify: false,
647        };
648        let res = run_daemon(&db, args, Some("test-agent")).await;
649        assert!(res.is_err());
650        assert!(res.unwrap_err().to_string().contains("--peers"));
651    }
652
653    #[tokio::test]
654    async fn test_sync_daemon_insecure_without_mtls_errors() {
655        let env = TestEnv::fresh();
656        let db = env.db_path.clone();
657        let args = SyncDaemonArgs {
658            peers: vec!["http://example.com:9077".to_string()],
659            interval: 2,
660            api_key: None,
661            batch_size: 500,
662            client_cert: None,
663            client_key: None,
664            insecure_skip_server_verify: true,
665        };
666        let res = run_daemon(&db, args, Some("test-agent")).await;
667        assert!(res.is_err());
668        assert!(
669            res.unwrap_err()
670                .to_string()
671                .contains("insecure-skip-server-verify")
672        );
673    }
674
675    // PR-9i — buffer coverage uplift. Targets previously-uncovered branches
676    // in run() / cmd_sync_dry_run: link-sync paths in pull/push/merge,
677    // text-mode dry_run output, restamp_agent_id with no original agent_id.
678
679    #[test]
680    fn pr9i_pull_propagates_links() {
681        let mut env = TestEnv::fresh();
682        let local = env.db_path.clone();
683        let remote_env = TestEnv::fresh();
684        let remote = remote_env.db_path.clone();
685        let id1 = seed_memory(&remote, "ns", "src", "src-content");
686        let id2 = seed_memory(&remote, "ns", "tgt", "tgt-content");
687        {
688            let conn = db::open(&remote).unwrap();
689            db::create_link(&conn, &id1, &id2, "related_to").unwrap();
690        }
691        let args = args_for(remote, "pull");
692        {
693            let mut out = env.output();
694            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
695        }
696        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
697        assert_eq!(v["direction"].as_str().unwrap(), "pull");
698        let local_conn = db::open(&local).unwrap();
699        let local_links = db::export_links(&local_conn).unwrap();
700        assert!(
701            local_links
702                .iter()
703                .any(|l| l.relation == crate::models::MemoryLinkRelation::RelatedTo),
704            "expected pulled link to land in local: {local_links:?}"
705        );
706    }
707
708    #[test]
709    fn pr9i_push_propagates_links() {
710        let mut env = TestEnv::fresh();
711        let local = env.db_path.clone();
712        let remote_env = TestEnv::fresh();
713        let remote = remote_env.db_path.clone();
714        let id1 = seed_memory(&local, "ns", "a", "a");
715        let id2 = seed_memory(&local, "ns", "b", "b");
716        {
717            let conn = db::open(&local).unwrap();
718            db::create_link(&conn, &id1, &id2, "supersedes").unwrap();
719        }
720        let args = args_for(remote.clone(), "push");
721        {
722            let mut out = env.output();
723            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
724        }
725        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
726        assert_eq!(v["direction"].as_str().unwrap(), "push");
727        let remote_conn = db::open(&remote).unwrap();
728        let remote_links = db::export_links(&remote_conn).unwrap();
729        assert!(
730            remote_links
731                .iter()
732                .any(|l| l.relation == crate::models::MemoryLinkRelation::Supersedes)
733        );
734    }
735
736    #[test]
737    fn pr9i_merge_propagates_links_both_directions() {
738        let mut env = TestEnv::fresh();
739        let local = env.db_path.clone();
740        let remote_env = TestEnv::fresh();
741        let remote = remote_env.db_path.clone();
742        let l1 = seed_memory(&local, "ns", "l1", "l1");
743        let l2 = seed_memory(&local, "ns", "l2", "l2");
744        {
745            let conn = db::open(&local).unwrap();
746            db::create_link(&conn, &l1, &l2, "related_to").unwrap();
747        }
748        let r1 = seed_memory(&remote, "ns", "r1", "r1");
749        let r2 = seed_memory(&remote, "ns", "r2", "r2");
750        {
751            let conn = db::open(&remote).unwrap();
752            db::create_link(&conn, &r1, &r2, "derived_from").unwrap();
753        }
754        let args = args_for(remote.clone(), "merge");
755        {
756            let mut out = env.output();
757            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
758        }
759        assert!(env.stdout_str().contains("merged:"));
760        let lconn = db::open(&local).unwrap();
761        let rconn = db::open(&remote).unwrap();
762        let l_relations: Vec<String> = db::export_links(&lconn)
763            .unwrap()
764            .into_iter()
765            .map(|l| l.relation.as_str().to_string())
766            .collect();
767        let r_relations: Vec<String> = db::export_links(&rconn)
768            .unwrap()
769            .into_iter()
770            .map(|l| l.relation.as_str().to_string())
771            .collect();
772        assert!(l_relations.iter().any(|r| r == "derived_from"));
773        assert!(r_relations.iter().any(|r| r == "related_to"));
774    }
775
776    #[test]
777    fn pr9i_dry_run_text_mode_merge() {
778        let mut env = TestEnv::fresh();
779        let local = env.db_path.clone();
780        let remote_env = TestEnv::fresh();
781        let remote = remote_env.db_path.clone();
782        seed_memory(&local, "ns", "L", "L");
783        seed_memory(&remote, "ns", "R", "R");
784        let mut args = args_for(remote, "merge");
785        args.dry_run = true;
786        {
787            let mut out = env.output();
788            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
789        }
790        let s = env.stdout_str();
791        assert!(s.contains("DRY RUN"));
792        assert!(s.contains("pull:"));
793        assert!(s.contains("push:"));
794        assert!(s.contains("noop"));
795        assert!(s.contains("links"));
796    }
797
798    #[test]
799    fn pr9i_dry_run_text_mode_pull_only() {
800        let mut env = TestEnv::fresh();
801        let local = env.db_path.clone();
802        let remote_env = TestEnv::fresh();
803        let remote = remote_env.db_path.clone();
804        seed_memory(&remote, "ns", "remote-only", "rr");
805        let mut args = args_for(remote, "pull");
806        args.dry_run = true;
807        {
808            let mut out = env.output();
809            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
810        }
811        let s = env.stdout_str();
812        assert!(s.contains("DRY RUN"));
813        assert!(s.contains("pull:"));
814        assert!(!s.contains("push:"));
815    }
816
817    #[test]
818    fn pr9i_dry_run_text_mode_push_only() {
819        let mut env = TestEnv::fresh();
820        let local = env.db_path.clone();
821        let remote_env = TestEnv::fresh();
822        let remote = remote_env.db_path.clone();
823        seed_memory(&local, "ns", "local-only", "ll");
824        let mut args = args_for(remote, "push");
825        args.dry_run = true;
826        {
827            let mut out = env.output();
828            run(&local, &args, false, Some("test-agent"), &mut out).unwrap();
829        }
830        let s = env.stdout_str();
831        assert!(s.contains("DRY RUN"));
832        assert!(s.contains("push:"));
833        assert!(!s.contains("pull:"));
834    }
835
836    #[test]
837    fn pr9i_dry_run_classify_update_branch() {
838        let mut env = TestEnv::fresh();
839        let local = env.db_path.clone();
840        let remote_env = TestEnv::fresh();
841        let remote = remote_env.db_path.clone();
842        let id = seed_memory(&local, "ns", "shared", "old-content");
843        let conn = db::open(&remote).unwrap();
844        let mem = models::Memory {
845            id: id.clone(),
846            tier: models::Tier::Mid,
847            namespace: "ns".to_string(),
848            title: "shared".to_string(),
849            content: "newer-content".to_string(),
850            tags: vec![],
851            priority: 5,
852            confidence: 1.0,
853            source: "test".to_string(),
854            access_count: 0,
855            // created_at is `now` so the #1466 tier-default expiry
856            // backfill on this Mid row (created_at + 7d) lands in the
857            // future and the row survives the sync pull; updated_at stays
858            // far-future so it still classifies as the "newer" side.
859            created_at: chrono::Utc::now().to_rfc3339(),
860            updated_at: "2099-01-01T00:00:00Z".to_string(),
861            last_accessed_at: None,
862            expires_at: None,
863            metadata: serde_json::json!({}),
864            reflection_depth: 0,
865            memory_kind: crate::models::MemoryKind::Observation,
866            entity_id: None,
867            persona_version: None,
868            citations: Vec::new(),
869            source_uri: None,
870            source_span: None,
871            confidence_source: crate::models::ConfidenceSource::CallerProvided,
872            confidence_signals: None,
873            confidence_decayed_at: None,
874            version: 1,
875        };
876        db::insert(&conn, &mem).unwrap();
877        drop(conn);
878        let mut args = args_for(remote, "merge");
879        args.dry_run = true;
880        {
881            let mut out = env.output();
882            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
883        }
884        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
885        assert!(v["pull"]["update"].as_u64().unwrap() >= 1);
886    }
887
888    #[test]
889    fn pr9i_restamp_no_original_agent_id() {
890        let mut mem = models::Memory {
891            id: "m-noid".to_string(),
892            tier: models::Tier::Mid,
893            namespace: "ns".to_string(),
894            title: "t".to_string(),
895            content: "c".to_string(),
896            tags: vec![],
897            priority: 5,
898            confidence: 1.0,
899            source: "test".to_string(),
900            access_count: 0,
901            created_at: "2026-01-01T00:00:00Z".to_string(),
902            updated_at: "2026-01-01T00:00:00Z".to_string(),
903            last_accessed_at: None,
904            expires_at: None,
905            metadata: serde_json::json!({}),
906            reflection_depth: 0,
907            memory_kind: crate::models::MemoryKind::Observation,
908            entity_id: None,
909            persona_version: None,
910            citations: Vec::new(),
911            source_uri: None,
912            source_span: None,
913            confidence_source: crate::models::ConfidenceSource::CallerProvided,
914            confidence_signals: None,
915            confidence_decayed_at: None,
916            version: 1,
917        };
918        restamp_agent_id(&mut mem, "caller-agent");
919        assert_eq!(mem.metadata["agent_id"].as_str().unwrap(), "caller-agent");
920        assert!(mem.metadata.get("imported_from_agent_id").is_none());
921    }
922
923    #[test]
924    fn pr9i_pull_skips_invalid_link() {
925        // v0.7.0 fix campaign R1-M2 — the substrate CHECK trigger now
926        // refuses an empty / off-closed-set relation at the SQL layer,
927        // so the original "seed a bad row then pull and verify skip"
928        // shape can no longer be set up: the seed itself fails. Verify
929        // both halves of the defense-in-depth contract instead —
930        //
931        //   1. The CHECK trigger refuses to seed an invalid relation
932        //      directly via SQL (this is the new R1-M2 guarantee).
933        //   2. The pull path's `validate_link` filter (kept in place
934        //      as a second line of defense for legacy DBs where the
935        //      trigger hasn't run yet) is still wired up, asserted
936        //      indirectly by the same call returning a successful
937        //      `direction: pull` envelope after seeing the seed
938        //      attempt fail.
939        let mut env = TestEnv::fresh();
940        let local = env.db_path.clone();
941        let remote_env = TestEnv::fresh();
942        let remote = remote_env.db_path.clone();
943        let id1 = seed_memory(&remote, "ns", "src", "src");
944        let id2 = seed_memory(&remote, "ns", "tgt", "tgt");
945        let conn = db::open(&remote).unwrap();
946        let seed = conn.execute(
947            "INSERT INTO memory_links (source_id, target_id, relation, created_at) VALUES (?, ?, '', datetime('now'))",
948            rusqlite::params![id1, id2],
949        );
950        assert!(
951            seed.is_err(),
952            "R1-M2 CHECK trigger must refuse to land an empty relation; \
953             a successful seed would mean defense-in-depth has regressed"
954        );
955        drop(conn);
956        let args = args_for(remote, "pull");
957        {
958            let mut out = env.output();
959            run(&local, &args, true, Some("test-agent"), &mut out).unwrap();
960        }
961        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
962        assert_eq!(v["direction"].as_str().unwrap(), "pull");
963    }
964}