Skip to main content

ai_memory/federation/
receive.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Post-partition catchup poller: spawn_catchup_loop, catchup_once,
5//! urlencoding_encode.
6
7#[cfg(feature = "sal")]
8use std::sync::Arc;
9use std::time::Duration;
10
11use super::FederationConfig;
12
13// ---------------------------------------------------------------------------
14// #1558 batch 5 wave 2 — file-local catchup log helpers.
15//
16// The three catchup variants (`catchup_once_with_store`,
17// `catchup_once_legacy`, `catchup_once_for_tests`) previously spelled
18// each of these tracing templates inline, tripling every wording. The
19// helpers below are the single spelling; message bytes are IDENTICAL
20// to the prior inline macros (`tracing` level per helper unchanged).
21// ---------------------------------------------------------------------------
22
23fn log_catchup_http_skip(peer_id: &str, status: impl std::fmt::Display) {
24    tracing::debug!("catchup: peer {peer_id} returned HTTP {status} — skipping this tick");
25}
26
27fn log_catchup_unreachable(peer_id: &str, e: impl std::fmt::Display) {
28    tracing::debug!("catchup: peer {peer_id} unreachable: {e}");
29}
30
31fn log_catchup_unparseable_body(peer_id: &str, e: impl std::fmt::Display) {
32    tracing::warn!("catchup: peer {peer_id} returned unparseable body: {e}");
33}
34
35fn log_catchup_pull_ok(peer_id: &str, rows: usize) {
36    tracing::info!("catchup: pull: {peer_id} ok ({rows} row(s) returned)");
37}
38
39fn log_catchup_unparseable_memory(peer_id: &str, e: impl std::fmt::Display) {
40    tracing::warn!("catchup: unparseable memory from peer {peer_id}: {e}");
41}
42
43fn log_catchup_sync_state_observe_failed(peer_id: &str, e: impl std::fmt::Display) {
44    tracing::warn!("catchup: sync_state_observe failed for {peer_id}: {e}");
45}
46
47/// #1687 — advance the per-peer catchup watermark for a row that just applied
48/// successfully, but never while `halted` (set once any earlier row in the
49/// batch failed to apply). Guarantees `sync_state` is never moved past an
50/// un-persisted row — which would silently drop it from every future delta.
51#[inline]
52fn advance_catchup_watermark(latest_ts: &mut Option<String>, halted: bool, row_ts: &str) {
53    if !halted && latest_ts.as_deref().is_none_or(|cur| row_ts > cur) {
54        *latest_ts = Some(row_ts.to_string());
55    }
56}
57
58/// v0.6.0.1 (#320) — post-partition catchup poller.
59///
60/// Previously a node rejoining the mesh after SIGSTOP / network blip / restart
61/// would only receive NEW writes that arrived AFTER resume; anything the
62/// other peers wrote during the outage stayed on those peers. r14 scenario-14
63/// observed this as node-3 seeing 2/20 writes post-SIGCONT.
64///
65/// This loop periodically calls `GET /api/v1/sync/since?peer=<local>` against
66/// each configured peer, applying returned memories via `insert_if_newer`.
67/// The `since` value is the receiver-side vector clock entry for that peer,
68/// so we never re-pull already-applied rows. First catchup after a restart
69/// runs with `since=None`, pulling a capped snapshot (limit=500).
70///
71/// Interval is operator-tunable via `--catchup-interval-secs`. 0 disables.
72/// The loop is a best-effort background task: errors are logged but never
73/// propagated. In the happy path a partitioned node converges within one
74/// interval after resume.
75///
76/// This is deliberately NOT a substitute for the synchronous quorum-write
77/// path — it's a safety net for the tail. Normal writes still fan out via
78/// `broadcast_store_quorum`; catchup only fires for rows that DIDN'T land
79/// during the original write deadline.
80pub fn spawn_catchup_loop(
81    config: FederationConfig,
82    db: crate::handlers::Db,
83    interval: Duration,
84) -> tokio::task::JoinHandle<()> {
85    // Pre-existing no-sal build break (caught by the #625 port subagent
86    // 2026-05-11): the historical bootstrap path forwarded through
87    // `spawn_catchup_loop_with_store`, which is `#[cfg(feature = "sal")]`
88    // only. With `sal` off the call site is unresolved. Inline the
89    // tokio::spawn loop here so the sqlite-only build compiles. Under
90    // `sal` we still route through the store-aware variant so
91    // postgres-backed daemons keep the M3 routing fix.
92    #[cfg(feature = "sal")]
93    {
94        spawn_catchup_loop_with_store(config, db, None, interval)
95    }
96    #[cfg(not(feature = "sal"))]
97    {
98        tokio::spawn(async move {
99            tokio::time::sleep(Duration::from_secs(5)).await;
100            loop {
101                catchup_once(&config, &db).await;
102                tokio::time::sleep(interval).await;
103            }
104        })
105    }
106}
107
108/// v0.7.0 M3 — same as [`spawn_catchup_loop`] but accepts an optional
109/// SAL-trait store handle. When `store` is `Some`, applied memories are
110/// written through `store.apply_remote_memory` (which routes through the
111/// active backend — postgres on `--store-url postgres://` deployments,
112/// sqlite otherwise). When `None`, the legacy `db::insert_if_newer` path
113/// over the shared rusqlite connection is preserved verbatim.
114///
115/// The split exists so the bootstrap can keep the historical
116/// `spawn_catchup_loop` signature (used by tests) intact while
117/// postgres-backed daemons get the routing fix.
118#[cfg(feature = "sal")]
119pub fn spawn_catchup_loop_with_store(
120    config: FederationConfig,
121    db: crate::handlers::Db,
122    store: Option<Arc<dyn crate::store::MemoryStore>>,
123    interval: Duration,
124) -> tokio::task::JoinHandle<()> {
125    tokio::spawn(async move {
126        // Small upfront delay so the first catchup doesn't fire before the
127        // HTTP server has bound — avoids spurious "connection refused" on
128        // node-1 during rolling start of a fresh cluster.
129        tokio::time::sleep(Duration::from_secs(5)).await;
130        loop {
131            catchup_once_with_store(&config, &db, store.as_ref()).await;
132            tokio::time::sleep(interval).await;
133        }
134    })
135}
136
137/// Legacy two-arg wrapper preserved so existing tests + non-SAL builds
138/// keep dispatching through the sqlite path. Postgres-backed daemons
139/// should invoke [`catchup_once_with_store`] directly via
140/// [`spawn_catchup_loop_with_store`].
141#[cfg_attr(not(test), allow(dead_code))]
142pub(super) async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
143    #[cfg(feature = "sal")]
144    {
145        catchup_once_with_store(config, db, None).await;
146    }
147    #[cfg(not(feature = "sal"))]
148    {
149        catchup_once_legacy(config, db).await;
150    }
151}
152
153#[cfg(feature = "sal")]
154pub(super) async fn catchup_once_with_store(
155    config: &FederationConfig,
156    db: &crate::handlers::Db,
157    store: Option<&Arc<dyn crate::store::MemoryStore>>,
158) {
159    let local_id = config.sender_agent_id.clone();
160    for peer in &config.peers {
161        // Rebuild the peer's base URL from sync_push_url to get the
162        // /api/v1/sync/since endpoint without recomputing peer config.
163        let base = peer
164            .sync_push_url
165            .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
166            .to_string();
167
168        // Load our local vector-clock entry for this peer so we only pull
169        // the delta. First-time-ever runs with no prior clock pull a full
170        // snapshot (capped below by ?limit=500 on the peer side).
171        let since_opt: Option<String> = {
172            let lock = db.lock().await;
173            match crate::db::sync_state_load(&lock.0, &local_id) {
174                Ok(clock) => clock.entries.get(&peer.id).cloned(),
175                Err(_) => None,
176            }
177        };
178
179        let url = sync_since_url(&base, &local_id, since_opt.as_deref());
180
181        // v0.7.0 #239 — attach `x-peer-id` to the outbound /sync/since
182        // GET so the peer's per-peer namespace allowlist can scope
183        // the returned rows. Without this, a v0.7.0 peer that's
184        // configured an allowlist will default-deny our catchup and
185        // hand back an empty page.
186        //
187        // #935 (v0.7.0 Track D, 2026-05-20): attach `x-api-key` when
188        // the daemon was configured with `[api] api_key` so peers
189        // running with api-key auth accept the catchup GET. The
190        // pre-#935 catchup loop omitted this header even though
191        // `sync_cycle_once` and `broadcast_store_quorum` both forward
192        // it, so alice's catchup-pull from bob 401'd on every tick
193        // while the broadcast path worked. The header is attached
194        // ONLY when `config.api_key` is `Some` so mTLS-only
195        // deployments keep the v0.6.x backwards-compatible header
196        // set (the inbound `/sync/since` auth bypass for mTLS
197        // listeners absorbs the missing header). Also attach
198        // `x-agent-id` for parity with `sync_cycle_once` so the
199        // receive-side identity gate (#238/#239) sees a consistent
200        // wire identity on every sync path.
201        let mut req = config
202            .client
203            .get(&url)
204            .header(crate::HEADER_AGENT_ID, local_id.as_str())
205            .header(
206                crate::federation::peer_attestation::PEER_ID_HEADER,
207                local_id.as_str(),
208            );
209        if let Some(ref key) = config.api_key {
210            req = req.header(crate::HEADER_API_KEY, key);
211        }
212        let resp = match req.send().await {
213            Ok(r) if r.status().is_success() => r,
214            Ok(r) => {
215                log_catchup_http_skip(&peer.id, r.status());
216                continue;
217            }
218            Err(e) => {
219                log_catchup_unreachable(&peer.id, e);
220                continue;
221            }
222        };
223
224        let body: serde_json::Value = match resp.json().await {
225            Ok(v) => v,
226            Err(e) => {
227                log_catchup_unparseable_body(&peer.id, e);
228                continue;
229            }
230        };
231
232        let memories = match body.get("memories").and_then(|v| v.as_array()) {
233            Some(arr) => arr.clone(),
234            None => continue,
235        };
236
237        // #935 (v0.7.0 Track D, 2026-05-20): emit an info-level
238        // success line on every accepted pull so operators tailing
239        // `docker logs alice | grep catchup` can confirm the
240        // catchup loop is healthy without enabling `RUST_LOG=trace`.
241        // The "pull: <peer-id> ok" tag pins the canonical wording
242        // pinned by the regression test in
243        // `tests/federation_catchup_api_key.rs`.
244        log_catchup_pull_ok(&peer.id, memories.len());
245
246        if memories.is_empty() {
247            continue;
248        }
249
250        let mut applied = 0usize;
251        let mut latest_ts: Option<String> = None;
252        // #1687 — once an apply fails, stop advancing the catchup watermark so
253        // sync_state never moves past an un-persisted row.
254        let mut catchup_halted = false;
255
256        // v0.7.0 M3 — when a SAL store handle is supplied (postgres-
257        // backed daemons) we dispatch each row through
258        // `store.apply_remote_memory`, which routes the write to the
259        // active backend instead of always landing in the local sqlite
260        // file. Default-None preserves the legacy behavior (sqlite via
261        // `db::insert_if_newer`) for daemons that don't yet have a SAL
262        // handle plumbed through (e.g. v0.6.x configurations).
263        if let Some(store) = store {
264            // #910 — federation catchup is operator-level (peer sync);
265            // it MUST round-trip every row regardless of metadata.scope
266            // so the receiving daemon has the full snapshot. Use the
267            // admin builder to bypass the SAL visibility filter.
268            let ctx = crate::store::CallerContext::for_admin(
269                crate::identity::sentinels::FEDERATION_CATCHUP,
270            );
271            for raw in &memories {
272                let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
273                    Ok(m) => m,
274                    Err(e) => {
275                        log_catchup_unparseable_memory(&peer.id, e);
276                        continue;
277                    }
278                };
279                if crate::validate::validate_memory(&mem).is_err() {
280                    continue;
281                }
282                // #1687 — advance the catchup watermark ONLY for rows that
283                // durably applied, halting at the first failure, so sync_state
284                // never moves past an un-persisted row (which would silently
285                // drop it from every future delta). Idempotent upserts make
286                // re-fetching post-failure rows next cycle harmless.
287                match store.apply_remote_memory(&ctx, &mem).await {
288                    Ok(_) => {
289                        applied += 1;
290                        advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
291                    }
292                    Err(e) => {
293                        catchup_halted = true;
294                        tracing::warn!(
295                            "catchup: apply_remote_memory failed for peer {}: {e}",
296                            peer.id
297                        );
298                    }
299                }
300            }
301            if let Some(ts) = latest_ts.as_deref() {
302                let lock = db.lock().await;
303                if let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts) {
304                    log_catchup_sync_state_observe_failed(&peer.id, e);
305                }
306            }
307        } else {
308            let lock = db.lock().await;
309            for raw in &memories {
310                let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
311                    Ok(m) => m,
312                    Err(e) => {
313                        log_catchup_unparseable_memory(&peer.id, e);
314                        continue;
315                    }
316                };
317                if crate::validate::validate_memory(&mem).is_err() {
318                    continue;
319                }
320                // #1687 — advance the catchup watermark only on a successful
321                // insert and halt at the first failure (see the SAL branch).
322                match crate::db::insert_if_newer(&lock.0, &mem) {
323                    Ok(_) => {
324                        applied += 1;
325                        advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
326                    }
327                    Err(_) => catchup_halted = true,
328                }
329            }
330            if let Some(ts) = latest_ts.as_deref()
331                && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
332            {
333                log_catchup_sync_state_observe_failed(&peer.id, e);
334            }
335        }
336
337        if applied > 0 {
338            tracing::info!(
339                "catchup: applied {applied} memories from peer {} (since={})",
340                peer.id,
341                since_opt.as_deref().unwrap_or("<full-snapshot>"),
342            );
343        }
344    }
345}
346
347/// v0.7.0 M3 — non-SAL fallback. Default sqlite-only path is preserved
348/// verbatim for builds without `--features sal`. The signature parallels
349/// the SAL variant minus the `store` parameter so callers compiled
350/// against the legacy posture continue to dispatch through the local
351/// rusqlite connection.
352#[cfg(not(feature = "sal"))]
353async fn catchup_once_legacy(config: &FederationConfig, db: &crate::handlers::Db) {
354    let local_id = config.sender_agent_id.clone();
355    for peer in &config.peers {
356        let base = peer
357            .sync_push_url
358            .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
359            .to_string();
360
361        let since_opt: Option<String> = {
362            let lock = db.lock().await;
363            match crate::db::sync_state_load(&lock.0, &local_id) {
364                Ok(clock) => clock.entries.get(&peer.id).cloned(),
365                Err(_) => None,
366            }
367        };
368
369        let url = sync_since_url(&base, &local_id, since_opt.as_deref());
370
371        // v0.7.0 #239 — attach `x-peer-id` so the peer's per-peer
372        // namespace allowlist can scope the returned rows (sqlite
373        // catchup path, parity with the SAL-routed loop above).
374        //
375        // #935 (v0.7.0 Track D, 2026-05-20): attach `x-api-key` +
376        // `x-agent-id` for parity with the SAL branch and
377        // `sync_cycle_once`. See the matching block in
378        // `catchup_once_with_store` for the full RCA.
379        let mut req = config
380            .client
381            .get(&url)
382            .header(crate::HEADER_AGENT_ID, local_id.as_str())
383            .header(
384                crate::federation::peer_attestation::PEER_ID_HEADER,
385                local_id.as_str(),
386            );
387        if let Some(ref key) = config.api_key {
388            req = req.header(crate::HEADER_API_KEY, key);
389        }
390        let resp = match req.send().await {
391            Ok(r) if r.status().is_success() => r,
392            Ok(r) => {
393                log_catchup_http_skip(&peer.id, r.status());
394                continue;
395            }
396            Err(e) => {
397                log_catchup_unreachable(&peer.id, e);
398                continue;
399            }
400        };
401
402        let body: serde_json::Value = match resp.json().await {
403            Ok(v) => v,
404            Err(e) => {
405                log_catchup_unparseable_body(&peer.id, e);
406                continue;
407            }
408        };
409
410        let memories = match body.get("memories").and_then(|v| v.as_array()) {
411            Some(arr) => arr.clone(),
412            None => continue,
413        };
414
415        // #935 — emit the canonical "pull: <peer> ok" success line
416        // pinned by `tests/federation_catchup_api_key.rs`.
417        log_catchup_pull_ok(&peer.id, memories.len());
418
419        if memories.is_empty() {
420            continue;
421        }
422
423        let mut applied = 0usize;
424        let mut latest_ts: Option<String> = None;
425        // #1687 — once an apply fails, stop advancing the catchup watermark so
426        // sync_state never moves past an un-persisted row.
427        let mut catchup_halted = false;
428        {
429            let lock = db.lock().await;
430            for raw in &memories {
431                let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
432                    Ok(m) => m,
433                    Err(e) => {
434                        log_catchup_unparseable_memory(&peer.id, e);
435                        continue;
436                    }
437                };
438                if crate::validate::validate_memory(&mem).is_err() {
439                    continue;
440                }
441                // #1687 — advance the catchup watermark only on a successful
442                // insert and halt at the first failure (see the SAL branch).
443                match crate::db::insert_if_newer(&lock.0, &mem) {
444                    Ok(_) => {
445                        applied += 1;
446                        advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
447                    }
448                    Err(_) => catchup_halted = true,
449                }
450            }
451            if let Some(ts) = latest_ts.as_deref()
452                && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
453            {
454                log_catchup_sync_state_observe_failed(&peer.id, e);
455            }
456        }
457
458        if applied > 0 {
459            tracing::info!(
460                "catchup: applied {applied} memories from peer {} (since={})",
461                peer.id,
462                since_opt.as_deref().unwrap_or("<full-snapshot>"),
463            );
464        }
465    }
466}
467
468/// v0.7.0 Track D #935 — minimal test-driver helper for the
469/// catchup GET path. Used by `tests/federation_catchup_api_key.rs`
470/// to assert the outbound request headers without bringing the
471/// full sqlite `Db` / `MemoryStore` plumbing into the test scope.
472///
473/// The helper fires ONE GET against the configured peer's
474/// `/api/v1/sync/since` endpoint using the exact same header set
475/// `catchup_once_with_store` does (including the #935 `x-api-key`
476/// forward when `config.api_key.is_some()`), then logs the
477/// canonical `catchup: pull: <peer-id> ok` line on success so
478/// regression coverage can pin the wire-level wording.
479///
480/// This is a no-side-effect probe: no memories are applied, no
481/// sync-state is advanced. Production code MUST continue to call
482/// `spawn_catchup_loop_with_store` (SAL) or `spawn_catchup_loop`
483/// (sqlite-only) — this helper is `#[cfg(any(test, ...))]`-gated
484/// for the integration test only.
485#[doc(hidden)]
486pub async fn catchup_once_for_tests(config: &FederationConfig) {
487    let local_id = config.sender_agent_id.clone();
488    for peer in &config.peers {
489        let base = peer
490            .sync_push_url
491            .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
492            .to_string();
493        let url = sync_since_url(&base, &local_id, None);
494
495        let mut req = config
496            .client
497            .get(&url)
498            .header(crate::HEADER_AGENT_ID, local_id.as_str())
499            .header(
500                crate::federation::peer_attestation::PEER_ID_HEADER,
501                local_id.as_str(),
502            );
503        if let Some(ref key) = config.api_key {
504            req = req.header(crate::HEADER_API_KEY, key);
505        }
506        let resp = match req.send().await {
507            Ok(r) if r.status().is_success() => r,
508            Ok(r) => {
509                log_catchup_http_skip(&peer.id, r.status());
510                continue;
511            }
512            Err(e) => {
513                log_catchup_unreachable(&peer.id, e);
514                continue;
515            }
516        };
517
518        let body: serde_json::Value = match resp.json().await {
519            Ok(v) => v,
520            Err(e) => {
521                log_catchup_unparseable_body(&peer.id, e);
522                continue;
523            }
524        };
525        let memories = body
526            .get("memories")
527            .and_then(|v| v.as_array())
528            .map(Vec::as_slice)
529            .unwrap_or(&[]);
530        log_catchup_pull_ok(&peer.id, memories.len());
531    }
532}
533
534/// Build the outbound `/api/v1/sync/since` catch-up URL for `base`
535/// (the peer base URL with the push suffix already trimmed): optional
536/// `since` vector-clock cursor + the local peer id. ONE builder so the
537/// three catch-up paths (store-backed, legacy, test harness) cannot
538/// drift on the query shape (#1558 batch 4).
539fn sync_since_url(base: &str, local_id: &str, since: Option<&str>) -> String {
540    match since {
541        Some(s) => format!(
542            "{base}{}?since={}&peer={local_id}",
543            crate::handlers::routes::SYNC_SINCE,
544            urlencoding_encode(s)
545        ),
546        None => format!(
547            "{base}{}?peer={local_id}",
548            crate::handlers::routes::SYNC_SINCE
549        ),
550    }
551}
552
553// Minimal RFC 3986 percent-encoder for the `since` timestamp. Only covers
554// what RFC 3339 + our namespace/id charsets can produce. We intentionally
555// avoid pulling in a url-encoding crate for a 12-character string.
556pub(super) fn urlencoding_encode(s: &str) -> String {
557    let mut out = String::with_capacity(s.len() + 6);
558    for b in s.bytes() {
559        match b {
560            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
561                out.push(b as char);
562            }
563            _ => {
564                use std::fmt::Write;
565                let _ = write!(out, "%{b:02X}");
566            }
567        }
568    }
569    out
570}
571
572#[cfg(test)]
573mod issue_1687_tests {
574    use super::advance_catchup_watermark;
575
576    #[test]
577    fn advances_on_success_monotonically_when_not_halted() {
578        let mut ts = None;
579        advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:01Z");
580        assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:01Z"));
581        advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:02Z");
582        assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:02Z"));
583        // an older ts never moves the watermark backward
584        advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:01Z");
585        assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:02Z"));
586    }
587
588    #[test]
589    fn does_not_advance_past_a_failed_row_once_halted() {
590        // row1 ok -> t1; row2 FAILED (caller sets halted); row3 ok but later ts
591        // -> watermark MUST stay at t1 so row2 is re-fetched next delta (#1687).
592        let mut ts = None;
593        advance_catchup_watermark(&mut ts, false, "t1");
594        advance_catchup_watermark(&mut ts, true, "t3");
595        assert_eq!(
596            ts.as_deref(),
597            Some("t1"),
598            "watermark must stop at the last pre-failure success"
599        );
600    }
601}