ai_memory/federation/receive.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Post-partition catchup poller: spawn_catchup_loop, catchup_once,
5//! urlencoding_encode.
6
7#[cfg(feature = "sal")]
8use std::sync::Arc;
9use std::time::Duration;
10
11use super::FederationConfig;
12
13// ---------------------------------------------------------------------------
14// #1558 batch 5 wave 2 — file-local catchup log helpers.
15//
16// The three catchup variants (`catchup_once_with_store`,
17// `catchup_once_legacy`, `catchup_once_for_tests`) previously spelled
18// each of these tracing templates inline, tripling every wording. The
19// helpers below are the single spelling; message bytes are IDENTICAL
20// to the prior inline macros (`tracing` level per helper unchanged).
21// ---------------------------------------------------------------------------
22
23fn log_catchup_http_skip(peer_id: &str, status: impl std::fmt::Display) {
24 tracing::debug!("catchup: peer {peer_id} returned HTTP {status} — skipping this tick");
25}
26
27fn log_catchup_unreachable(peer_id: &str, e: impl std::fmt::Display) {
28 tracing::debug!("catchup: peer {peer_id} unreachable: {e}");
29}
30
31fn log_catchup_unparseable_body(peer_id: &str, e: impl std::fmt::Display) {
32 tracing::warn!("catchup: peer {peer_id} returned unparseable body: {e}");
33}
34
35fn log_catchup_pull_ok(peer_id: &str, rows: usize) {
36 tracing::info!("catchup: pull: {peer_id} ok ({rows} row(s) returned)");
37}
38
39fn log_catchup_unparseable_memory(peer_id: &str, e: impl std::fmt::Display) {
40 tracing::warn!("catchup: unparseable memory from peer {peer_id}: {e}");
41}
42
43fn log_catchup_sync_state_observe_failed(peer_id: &str, e: impl std::fmt::Display) {
44 tracing::warn!("catchup: sync_state_observe failed for {peer_id}: {e}");
45}
46
47/// #1687 — advance the per-peer catchup watermark for a row that just applied
48/// successfully, but never while `halted` (set once any earlier row in the
49/// batch failed to apply). Guarantees `sync_state` is never moved past an
50/// un-persisted row — which would silently drop it from every future delta.
51#[inline]
52fn advance_catchup_watermark(latest_ts: &mut Option<String>, halted: bool, row_ts: &str) {
53 if !halted && latest_ts.as_deref().is_none_or(|cur| row_ts > cur) {
54 *latest_ts = Some(row_ts.to_string());
55 }
56}
57
58/// v0.6.0.1 (#320) — post-partition catchup poller.
59///
60/// Previously a node rejoining the mesh after SIGSTOP / network blip / restart
61/// would only receive NEW writes that arrived AFTER resume; anything the
62/// other peers wrote during the outage stayed on those peers. r14 scenario-14
63/// observed this as node-3 seeing 2/20 writes post-SIGCONT.
64///
65/// This loop periodically calls `GET /api/v1/sync/since?peer=<local>` against
66/// each configured peer, applying returned memories via `insert_if_newer`.
67/// The `since` value is the receiver-side vector clock entry for that peer,
68/// so we never re-pull already-applied rows. First catchup after a restart
69/// runs with `since=None`, pulling a capped snapshot (limit=500).
70///
71/// Interval is operator-tunable via `--catchup-interval-secs`. 0 disables.
72/// The loop is a best-effort background task: errors are logged but never
73/// propagated. In the happy path a partitioned node converges within one
74/// interval after resume.
75///
76/// This is deliberately NOT a substitute for the synchronous quorum-write
77/// path — it's a safety net for the tail. Normal writes still fan out via
78/// `broadcast_store_quorum`; catchup only fires for rows that DIDN'T land
79/// during the original write deadline.
80pub fn spawn_catchup_loop(
81 config: FederationConfig,
82 db: crate::handlers::Db,
83 interval: Duration,
84) -> tokio::task::JoinHandle<()> {
85 // Pre-existing no-sal build break (caught by the #625 port subagent
86 // 2026-05-11): the historical bootstrap path forwarded through
87 // `spawn_catchup_loop_with_store`, which is `#[cfg(feature = "sal")]`
88 // only. With `sal` off the call site is unresolved. Inline the
89 // tokio::spawn loop here so the sqlite-only build compiles. Under
90 // `sal` we still route through the store-aware variant so
91 // postgres-backed daemons keep the M3 routing fix.
92 #[cfg(feature = "sal")]
93 {
94 spawn_catchup_loop_with_store(config, db, None, interval)
95 }
96 #[cfg(not(feature = "sal"))]
97 {
98 tokio::spawn(async move {
99 tokio::time::sleep(Duration::from_secs(5)).await;
100 loop {
101 catchup_once(&config, &db).await;
102 tokio::time::sleep(interval).await;
103 }
104 })
105 }
106}
107
108/// v0.7.0 M3 — same as [`spawn_catchup_loop`] but accepts an optional
109/// SAL-trait store handle. When `store` is `Some`, applied memories are
110/// written through `store.apply_remote_memory` (which routes through the
111/// active backend — postgres on `--store-url postgres://` deployments,
112/// sqlite otherwise). When `None`, the legacy `db::insert_if_newer` path
113/// over the shared rusqlite connection is preserved verbatim.
114///
115/// The split exists so the bootstrap can keep the historical
116/// `spawn_catchup_loop` signature (used by tests) intact while
117/// postgres-backed daemons get the routing fix.
118#[cfg(feature = "sal")]
119pub fn spawn_catchup_loop_with_store(
120 config: FederationConfig,
121 db: crate::handlers::Db,
122 store: Option<Arc<dyn crate::store::MemoryStore>>,
123 interval: Duration,
124) -> tokio::task::JoinHandle<()> {
125 tokio::spawn(async move {
126 // Small upfront delay so the first catchup doesn't fire before the
127 // HTTP server has bound — avoids spurious "connection refused" on
128 // node-1 during rolling start of a fresh cluster.
129 tokio::time::sleep(Duration::from_secs(5)).await;
130 loop {
131 catchup_once_with_store(&config, &db, store.as_ref()).await;
132 tokio::time::sleep(interval).await;
133 }
134 })
135}
136
137/// Legacy two-arg wrapper preserved so existing tests + non-SAL builds
138/// keep dispatching through the sqlite path. Postgres-backed daemons
139/// should invoke [`catchup_once_with_store`] directly via
140/// [`spawn_catchup_loop_with_store`].
141#[cfg_attr(not(test), allow(dead_code))]
142pub(super) async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
143 #[cfg(feature = "sal")]
144 {
145 catchup_once_with_store(config, db, None).await;
146 }
147 #[cfg(not(feature = "sal"))]
148 {
149 catchup_once_legacy(config, db).await;
150 }
151}
152
153#[cfg(feature = "sal")]
154pub(super) async fn catchup_once_with_store(
155 config: &FederationConfig,
156 db: &crate::handlers::Db,
157 store: Option<&Arc<dyn crate::store::MemoryStore>>,
158) {
159 let local_id = config.sender_agent_id.clone();
160 for peer in &config.peers {
161 // Rebuild the peer's base URL from sync_push_url to get the
162 // /api/v1/sync/since endpoint without recomputing peer config.
163 let base = peer
164 .sync_push_url
165 .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
166 .to_string();
167
168 // Load our local vector-clock entry for this peer so we only pull
169 // the delta. First-time-ever runs with no prior clock pull a full
170 // snapshot (capped below by ?limit=500 on the peer side).
171 let since_opt: Option<String> = {
172 let lock = db.lock().await;
173 match crate::db::sync_state_load(&lock.0, &local_id) {
174 Ok(clock) => clock.entries.get(&peer.id).cloned(),
175 Err(_) => None,
176 }
177 };
178
179 let url = sync_since_url(&base, &local_id, since_opt.as_deref());
180
181 // v0.7.0 #239 — attach `x-peer-id` to the outbound /sync/since
182 // GET so the peer's per-peer namespace allowlist can scope
183 // the returned rows. Without this, a v0.7.0 peer that's
184 // configured an allowlist will default-deny our catchup and
185 // hand back an empty page.
186 //
187 // #935 (v0.7.0 Track D, 2026-05-20): attach `x-api-key` when
188 // the daemon was configured with `[api] api_key` so peers
189 // running with api-key auth accept the catchup GET. The
190 // pre-#935 catchup loop omitted this header even though
191 // `sync_cycle_once` and `broadcast_store_quorum` both forward
192 // it, so alice's catchup-pull from bob 401'd on every tick
193 // while the broadcast path worked. The header is attached
194 // ONLY when `config.api_key` is `Some` so mTLS-only
195 // deployments keep the v0.6.x backwards-compatible header
196 // set (the inbound `/sync/since` auth bypass for mTLS
197 // listeners absorbs the missing header). Also attach
198 // `x-agent-id` for parity with `sync_cycle_once` so the
199 // receive-side identity gate (#238/#239) sees a consistent
200 // wire identity on every sync path.
201 let mut req = config
202 .client
203 .get(&url)
204 .header(crate::HEADER_AGENT_ID, local_id.as_str())
205 .header(
206 crate::federation::peer_attestation::PEER_ID_HEADER,
207 local_id.as_str(),
208 );
209 if let Some(ref key) = config.api_key {
210 req = req.header(crate::HEADER_API_KEY, key);
211 }
212 let resp = match req.send().await {
213 Ok(r) if r.status().is_success() => r,
214 Ok(r) => {
215 log_catchup_http_skip(&peer.id, r.status());
216 continue;
217 }
218 Err(e) => {
219 log_catchup_unreachable(&peer.id, e);
220 continue;
221 }
222 };
223
224 let body: serde_json::Value = match resp.json().await {
225 Ok(v) => v,
226 Err(e) => {
227 log_catchup_unparseable_body(&peer.id, e);
228 continue;
229 }
230 };
231
232 let memories = match body.get("memories").and_then(|v| v.as_array()) {
233 Some(arr) => arr.clone(),
234 None => continue,
235 };
236
237 // #935 (v0.7.0 Track D, 2026-05-20): emit an info-level
238 // success line on every accepted pull so operators tailing
239 // `docker logs alice | grep catchup` can confirm the
240 // catchup loop is healthy without enabling `RUST_LOG=trace`.
241 // The "pull: <peer-id> ok" tag pins the canonical wording
242 // pinned by the regression test in
243 // `tests/federation_catchup_api_key.rs`.
244 log_catchup_pull_ok(&peer.id, memories.len());
245
246 if memories.is_empty() {
247 continue;
248 }
249
250 let mut applied = 0usize;
251 let mut latest_ts: Option<String> = None;
252 // #1687 — once an apply fails, stop advancing the catchup watermark so
253 // sync_state never moves past an un-persisted row.
254 let mut catchup_halted = false;
255
256 // v0.7.0 M3 — when a SAL store handle is supplied (postgres-
257 // backed daemons) we dispatch each row through
258 // `store.apply_remote_memory`, which routes the write to the
259 // active backend instead of always landing in the local sqlite
260 // file. Default-None preserves the legacy behavior (sqlite via
261 // `db::insert_if_newer`) for daemons that don't yet have a SAL
262 // handle plumbed through (e.g. v0.6.x configurations).
263 if let Some(store) = store {
264 // #910 — federation catchup is operator-level (peer sync);
265 // it MUST round-trip every row regardless of metadata.scope
266 // so the receiving daemon has the full snapshot. Use the
267 // admin builder to bypass the SAL visibility filter.
268 let ctx = crate::store::CallerContext::for_admin(
269 crate::identity::sentinels::FEDERATION_CATCHUP,
270 );
271 for raw in &memories {
272 let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
273 Ok(m) => m,
274 Err(e) => {
275 log_catchup_unparseable_memory(&peer.id, e);
276 continue;
277 }
278 };
279 if crate::validate::validate_memory(&mem).is_err() {
280 continue;
281 }
282 // #1687 — advance the catchup watermark ONLY for rows that
283 // durably applied, halting at the first failure, so sync_state
284 // never moves past an un-persisted row (which would silently
285 // drop it from every future delta). Idempotent upserts make
286 // re-fetching post-failure rows next cycle harmless.
287 match store.apply_remote_memory(&ctx, &mem).await {
288 Ok(_) => {
289 applied += 1;
290 advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
291 }
292 Err(e) => {
293 catchup_halted = true;
294 tracing::warn!(
295 "catchup: apply_remote_memory failed for peer {}: {e}",
296 peer.id
297 );
298 }
299 }
300 }
301 if let Some(ts) = latest_ts.as_deref() {
302 let lock = db.lock().await;
303 if let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts) {
304 log_catchup_sync_state_observe_failed(&peer.id, e);
305 }
306 }
307 } else {
308 let lock = db.lock().await;
309 for raw in &memories {
310 let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
311 Ok(m) => m,
312 Err(e) => {
313 log_catchup_unparseable_memory(&peer.id, e);
314 continue;
315 }
316 };
317 if crate::validate::validate_memory(&mem).is_err() {
318 continue;
319 }
320 // #1687 — advance the catchup watermark only on a successful
321 // insert and halt at the first failure (see the SAL branch).
322 match crate::db::insert_if_newer(&lock.0, &mem) {
323 Ok(_) => {
324 applied += 1;
325 advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
326 }
327 Err(_) => catchup_halted = true,
328 }
329 }
330 if let Some(ts) = latest_ts.as_deref()
331 && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
332 {
333 log_catchup_sync_state_observe_failed(&peer.id, e);
334 }
335 }
336
337 if applied > 0 {
338 tracing::info!(
339 "catchup: applied {applied} memories from peer {} (since={})",
340 peer.id,
341 since_opt.as_deref().unwrap_or("<full-snapshot>"),
342 );
343 }
344 }
345}
346
347/// v0.7.0 M3 — non-SAL fallback. Default sqlite-only path is preserved
348/// verbatim for builds without `--features sal`. The signature parallels
349/// the SAL variant minus the `store` parameter so callers compiled
350/// against the legacy posture continue to dispatch through the local
351/// rusqlite connection.
352#[cfg(not(feature = "sal"))]
353async fn catchup_once_legacy(config: &FederationConfig, db: &crate::handlers::Db) {
354 let local_id = config.sender_agent_id.clone();
355 for peer in &config.peers {
356 let base = peer
357 .sync_push_url
358 .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
359 .to_string();
360
361 let since_opt: Option<String> = {
362 let lock = db.lock().await;
363 match crate::db::sync_state_load(&lock.0, &local_id) {
364 Ok(clock) => clock.entries.get(&peer.id).cloned(),
365 Err(_) => None,
366 }
367 };
368
369 let url = sync_since_url(&base, &local_id, since_opt.as_deref());
370
371 // v0.7.0 #239 — attach `x-peer-id` so the peer's per-peer
372 // namespace allowlist can scope the returned rows (sqlite
373 // catchup path, parity with the SAL-routed loop above).
374 //
375 // #935 (v0.7.0 Track D, 2026-05-20): attach `x-api-key` +
376 // `x-agent-id` for parity with the SAL branch and
377 // `sync_cycle_once`. See the matching block in
378 // `catchup_once_with_store` for the full RCA.
379 let mut req = config
380 .client
381 .get(&url)
382 .header(crate::HEADER_AGENT_ID, local_id.as_str())
383 .header(
384 crate::federation::peer_attestation::PEER_ID_HEADER,
385 local_id.as_str(),
386 );
387 if let Some(ref key) = config.api_key {
388 req = req.header(crate::HEADER_API_KEY, key);
389 }
390 let resp = match req.send().await {
391 Ok(r) if r.status().is_success() => r,
392 Ok(r) => {
393 log_catchup_http_skip(&peer.id, r.status());
394 continue;
395 }
396 Err(e) => {
397 log_catchup_unreachable(&peer.id, e);
398 continue;
399 }
400 };
401
402 let body: serde_json::Value = match resp.json().await {
403 Ok(v) => v,
404 Err(e) => {
405 log_catchup_unparseable_body(&peer.id, e);
406 continue;
407 }
408 };
409
410 let memories = match body.get("memories").and_then(|v| v.as_array()) {
411 Some(arr) => arr.clone(),
412 None => continue,
413 };
414
415 // #935 — emit the canonical "pull: <peer> ok" success line
416 // pinned by `tests/federation_catchup_api_key.rs`.
417 log_catchup_pull_ok(&peer.id, memories.len());
418
419 if memories.is_empty() {
420 continue;
421 }
422
423 let mut applied = 0usize;
424 let mut latest_ts: Option<String> = None;
425 // #1687 — once an apply fails, stop advancing the catchup watermark so
426 // sync_state never moves past an un-persisted row.
427 let mut catchup_halted = false;
428 {
429 let lock = db.lock().await;
430 for raw in &memories {
431 let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
432 Ok(m) => m,
433 Err(e) => {
434 log_catchup_unparseable_memory(&peer.id, e);
435 continue;
436 }
437 };
438 if crate::validate::validate_memory(&mem).is_err() {
439 continue;
440 }
441 // #1687 — advance the catchup watermark only on a successful
442 // insert and halt at the first failure (see the SAL branch).
443 match crate::db::insert_if_newer(&lock.0, &mem) {
444 Ok(_) => {
445 applied += 1;
446 advance_catchup_watermark(&mut latest_ts, catchup_halted, &mem.updated_at);
447 }
448 Err(_) => catchup_halted = true,
449 }
450 }
451 if let Some(ts) = latest_ts.as_deref()
452 && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
453 {
454 log_catchup_sync_state_observe_failed(&peer.id, e);
455 }
456 }
457
458 if applied > 0 {
459 tracing::info!(
460 "catchup: applied {applied} memories from peer {} (since={})",
461 peer.id,
462 since_opt.as_deref().unwrap_or("<full-snapshot>"),
463 );
464 }
465 }
466}
467
468/// v0.7.0 Track D #935 — minimal test-driver helper for the
469/// catchup GET path. Used by `tests/federation_catchup_api_key.rs`
470/// to assert the outbound request headers without bringing the
471/// full sqlite `Db` / `MemoryStore` plumbing into the test scope.
472///
473/// The helper fires ONE GET against the configured peer's
474/// `/api/v1/sync/since` endpoint using the exact same header set
475/// `catchup_once_with_store` does (including the #935 `x-api-key`
476/// forward when `config.api_key.is_some()`), then logs the
477/// canonical `catchup: pull: <peer-id> ok` line on success so
478/// regression coverage can pin the wire-level wording.
479///
480/// This is a no-side-effect probe: no memories are applied, no
481/// sync-state is advanced. Production code MUST continue to call
482/// `spawn_catchup_loop_with_store` (SAL) or `spawn_catchup_loop`
483/// (sqlite-only) — this helper is `#[cfg(any(test, ...))]`-gated
484/// for the integration test only.
485#[doc(hidden)]
486pub async fn catchup_once_for_tests(config: &FederationConfig) {
487 let local_id = config.sender_agent_id.clone();
488 for peer in &config.peers {
489 let base = peer
490 .sync_push_url
491 .trim_end_matches(crate::handlers::routes::SYNC_PUSH)
492 .to_string();
493 let url = sync_since_url(&base, &local_id, None);
494
495 let mut req = config
496 .client
497 .get(&url)
498 .header(crate::HEADER_AGENT_ID, local_id.as_str())
499 .header(
500 crate::federation::peer_attestation::PEER_ID_HEADER,
501 local_id.as_str(),
502 );
503 if let Some(ref key) = config.api_key {
504 req = req.header(crate::HEADER_API_KEY, key);
505 }
506 let resp = match req.send().await {
507 Ok(r) if r.status().is_success() => r,
508 Ok(r) => {
509 log_catchup_http_skip(&peer.id, r.status());
510 continue;
511 }
512 Err(e) => {
513 log_catchup_unreachable(&peer.id, e);
514 continue;
515 }
516 };
517
518 let body: serde_json::Value = match resp.json().await {
519 Ok(v) => v,
520 Err(e) => {
521 log_catchup_unparseable_body(&peer.id, e);
522 continue;
523 }
524 };
525 let memories = body
526 .get("memories")
527 .and_then(|v| v.as_array())
528 .map(Vec::as_slice)
529 .unwrap_or(&[]);
530 log_catchup_pull_ok(&peer.id, memories.len());
531 }
532}
533
534/// Build the outbound `/api/v1/sync/since` catch-up URL for `base`
535/// (the peer base URL with the push suffix already trimmed): optional
536/// `since` vector-clock cursor + the local peer id. ONE builder so the
537/// three catch-up paths (store-backed, legacy, test harness) cannot
538/// drift on the query shape (#1558 batch 4).
539fn sync_since_url(base: &str, local_id: &str, since: Option<&str>) -> String {
540 match since {
541 Some(s) => format!(
542 "{base}{}?since={}&peer={local_id}",
543 crate::handlers::routes::SYNC_SINCE,
544 urlencoding_encode(s)
545 ),
546 None => format!(
547 "{base}{}?peer={local_id}",
548 crate::handlers::routes::SYNC_SINCE
549 ),
550 }
551}
552
553// Minimal RFC 3986 percent-encoder for the `since` timestamp. Only covers
554// what RFC 3339 + our namespace/id charsets can produce. We intentionally
555// avoid pulling in a url-encoding crate for a 12-character string.
556pub(super) fn urlencoding_encode(s: &str) -> String {
557 let mut out = String::with_capacity(s.len() + 6);
558 for b in s.bytes() {
559 match b {
560 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
561 out.push(b as char);
562 }
563 _ => {
564 use std::fmt::Write;
565 let _ = write!(out, "%{b:02X}");
566 }
567 }
568 }
569 out
570}
571
572#[cfg(test)]
573mod issue_1687_tests {
574 use super::advance_catchup_watermark;
575
576 #[test]
577 fn advances_on_success_monotonically_when_not_halted() {
578 let mut ts = None;
579 advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:01Z");
580 assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:01Z"));
581 advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:02Z");
582 assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:02Z"));
583 // an older ts never moves the watermark backward
584 advance_catchup_watermark(&mut ts, false, "2026-06-15T00:00:01Z");
585 assert_eq!(ts.as_deref(), Some("2026-06-15T00:00:02Z"));
586 }
587
588 #[test]
589 fn does_not_advance_past_a_failed_row_once_halted() {
590 // row1 ok -> t1; row2 FAILED (caller sets halted); row3 ok but later ts
591 // -> watermark MUST stay at t1 so row2 is re-fetched next delta (#1687).
592 let mut ts = None;
593 advance_catchup_watermark(&mut ts, false, "t1");
594 advance_catchup_watermark(&mut ts, true, "t3");
595 assert_eq!(
596 ts.as_deref(),
597 Some("t1"),
598 "watermark must stop at the last pre-failure success"
599 );
600 }
601}