ai_memory/federation/peer.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer construction and `FederationConfig::build`.
5
6use std::time::Duration;
7
8use crate::replication::QuorumPolicy;
9
10use super::{FederationConfig, PeerEndpoint};
11
12/// #1579 B5 — keep idle per-peer connections pooled for 5 minutes.
13///
14/// reqwest's default `pool_idle_timeout` is 90s. Federation traffic is
15/// bursty: quorum pushes arrive on the operator's write cadence and
16/// the DLQ replay / catchup workers tick on 30–60s intervals, so the
17/// 90s default regularly evicted the warm connection between bursts
18/// and the next push paid a fresh mTLS handshake (~4.3×RTT — 0.39s
19/// cross-region vs ~0.09s reused, measured on do-1461, P3 leg 1/leg 2).
20/// 5 minutes comfortably spans the worker cadences without holding
21/// sockets forever on a reconfigured mesh.
22const FED_CLIENT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
23
24/// #1579 B5 — OS-level TCP keepalive probes on pooled federation
25/// connections, every 60s.
26///
27/// Deliberately conservative: PR #314 attempted `tcp_keepalive(1s)` +
28/// `pool_idle_timeout(5s)` and ship-gate run 21 showed that
29/// combination caused 40+ minute hangs from connection-pool churn +
30/// per-socket probe traffic (see the revert note in
31/// [`FederationConfig::build`]). 60s generates negligible probe load
32/// while still detecting half-open connections (NAT/conntrack drops on
33/// cross-region paths) well inside the 5-minute pool window, so a
34/// dead pooled socket is reaped instead of donating a connect error to
35/// the next quorum push.
36const FED_CLIENT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
37
38impl FederationConfig {
39 /// Build a `FederationConfig` from the serve-time CLI flags. Returns
40 /// `None` when federation is disabled (`quorum_writes == 0` or the
41 /// peer list is empty).
42 ///
43 /// `api_key` carries the local daemon's configured `[api] api_key`
44 /// (issue #702, v0.7.0 fold-A2A1.4). When `Some`, every outbound
45 /// federation POST attaches an `x-api-key` header so peers that
46 /// themselves run with api-key auth accept the request. `None`
47 /// preserves the backwards-compatible header set used by mTLS-only
48 /// deployments — outbound POSTs stay unauthenticated at the
49 /// application layer and rely on the TLS layer for trust.
50 ///
51 /// # Errors
52 ///
53 /// Returns an error if the reqwest client cannot be constructed
54 /// with the supplied certificate material.
55 #[allow(clippy::too_many_arguments)]
56 pub fn build(
57 quorum_writes: usize,
58 peer_urls: &[String],
59 timeout: Duration,
60 client_cert_path: Option<&std::path::Path>,
61 client_key_path: Option<&std::path::Path>,
62 ca_cert_path: Option<&std::path::Path>,
63 sender_agent_id: String,
64 api_key: Option<String>,
65 ) -> anyhow::Result<Option<Self>> {
66 if quorum_writes == 0 || peer_urls.is_empty() {
67 return Ok(None);
68 }
69 // Ultrareview #341: reject duplicate peer URLs at build time.
70 // If the same peer URL appears twice under different indices,
71 // both would count as distinct ack sources and the quorum
72 // guarantee is violated. Normalize (trim trailing slash,
73 // lowercase scheme+host) before comparing.
74 let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
75 for raw in peer_urls {
76 let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
77 if !seen_urls.insert(normalized.clone()) {
78 return Err(anyhow::anyhow!(
79 "duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
80 — duplicates would let a single peer contribute to quorum more than once"
81 ));
82 }
83 }
84 let n = 1 + peer_urls.len(); // local node + remotes
85 let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
86 .map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
87 let peers: Vec<PeerEndpoint> = peer_urls
88 .iter()
89 .enumerate()
90 .map(|(i, raw)| {
91 // `id` is used as a Prometheus metric label; keep it
92 // low-cardinality. The full URL is logged separately.
93 // (#304 nit — prior form `peer-{i}:{url}` blew up the
94 // label space as deployment size grew.)
95 let trimmed = raw.trim_end_matches('/');
96 tracing::debug!(
97 target: "federation",
98 peer_index = i,
99 url = trimmed,
100 "registered peer"
101 );
102 PeerEndpoint {
103 id: format!("peer-{i}"),
104 sync_push_url: format!("{trimmed}/api/v1/sync/push"),
105 }
106 })
107 .collect();
108
109 // Federation client tuning.
110 //
111 // An earlier PR #314 attempted tight `tcp_keepalive(1s)` +
112 // `pool_idle_timeout(5s)` on this builder to close the Phase
113 // 4 partition_minority convergence gap. Ship-gate run 21
114 // showed that combination caused Phase 4 to hang for 40+
115 // minutes — suspected cause was connection-pool churn on the
116 // chaos-client's local 3-process mesh exhausting ephemeral
117 // ports under continuous close+reopen cycles with the tight
118 // keepalive generating probe traffic on every idle socket.
119 //
120 // Reverted to the conservative-default client here. Partition-
121 // recovery under chaos is moved out of the required ship-gate
122 // and into an opt-in campaign shape. Real partition resilience
123 // is a v0.6.0.1+ investigation with instrumented cycle data
124 // (cycles_by_fault now landed in ship-gate, giving us per-cycle
125 // visibility the next time we attempt this).
126 //
127 // #1579 B5 (2026-06-10) — persistent outbound connections. This
128 // single client is shared by every outbound federation surface
129 // (quorum push, bulk catchup, DLQ replay, /sync/since pull), so
130 // tuning it is the one-stop connection-lifecycle fix. The two
131 // knobs below are an order of magnitude more conservative than
132 // the reverted #314 attempt (300s/60s vs 5s/1s) — they EXTEND
133 // pooling rather than churn it. Measured basis: fresh mTLS ≈
134 // 4.3×RTT (1.0s cold cross-globe), reused ≈ 1×RTT (P3 leg 1);
135 // DLQ replay serialized at ~0.3s/row/peer on fresh handshakes.
136 let mut client_builder = reqwest::Client::builder()
137 .timeout(timeout)
138 .connect_timeout(Duration::from_secs(2))
139 .pool_idle_timeout(FED_CLIENT_POOL_IDLE_TIMEOUT)
140 .tcp_keepalive(FED_CLIENT_TCP_KEEPALIVE)
141 .use_rustls_tls();
142 // --quorum-ca-cert: trust a caller-supplied root CA for outbound
143 // federation POSTs. Required whenever peers present a cert NOT
144 // rooted in webpki-roots (Mozilla CA bundle) — e.g. a self-
145 // signed / ephemeral CA generated for an isolated test fleet.
146 // Without this, reqwest's rustls-tls feature (webpki-roots
147 // only) rejects the peer cert and every quorum write times
148 // out as quorum_not_met. See alphaonedev/ai-memory-mcp#333.
149 if let Some(ca_path) = ca_cert_path {
150 let ca_pem = std::fs::read(ca_path)
151 .map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
152 // v0.7.0 #1070 follow-up — `reqwest::Certificate::from_pem`
153 // post-#1070 (rustls-only backend, no native-tls fallback)
154 // accepts a file with no PEM markers as an empty cert chain
155 // without erroring. That breaks the strict-validation
156 // contract `config_build_rejects_invalid_ca_cert_pem` pins.
157 // Pre-flight the file content for a `-----BEGIN ` marker so
158 // a non-PEM operator-supplied path produces the same
159 // explicit error message the legacy native-tls parser
160 // emitted.
161 let has_pem_marker = ca_pem.windows(11).any(|w| w == b"-----BEGIN ");
162 if !has_pem_marker {
163 anyhow::bail!(
164 "parse --quorum-ca-cert: input at {} contains no PEM `-----BEGIN ` marker",
165 ca_path.display()
166 );
167 }
168 let ca = reqwest::Certificate::from_pem(&ca_pem)
169 .map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
170 client_builder = client_builder.add_root_certificate(ca);
171 }
172 if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
173 let cert_pem =
174 std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
175 let key_pem =
176 std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
177 let mut pem = cert_pem;
178 pem.extend_from_slice(b"\n");
179 pem.extend_from_slice(&key_pem);
180 let identity = reqwest::Identity::from_pem(&pem)
181 .map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
182 client_builder = client_builder.identity(identity);
183 }
184 let client = client_builder
185 .build()
186 .map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
187
188 // v0.7.0 #791 — load the daemon's Ed25519 signing key (no-op
189 // stub here; the full #697 audit/identity module loads from
190 // disk). NON-FATAL: peers running `AI_MEMORY_FED_REQUIRE_SIG=0`
191 // accept unsigned posts even when the signing key is missing.
192 let signing_key = resolve_daemon_signing_key(
193 crate::governance::audit::load_daemon_signing_key(&sender_agent_id),
194 &sender_agent_id,
195 );
196 Ok(Some(Self {
197 policy,
198 peers,
199 client,
200 sender_agent_id,
201 api_key,
202 signing_key,
203 // v0.7.0 Track D #933 — federation push DLQ sink is
204 // populated by the daemon bootstrap AFTER the SAL store
205 // handle resolves (see `daemon_runtime.rs`). The build()
206 // path here returns `None` so an `ai-memory serve`
207 // invocation that never bootstraps a store (none today;
208 // belt-and-braces) doesn't trip a half-wired DLQ.
209 #[cfg(feature = "sal")]
210 dlq_sink: None,
211 }))
212 }
213
214 /// Count of peers in the mesh (excludes the local node). Useful for
215 /// metrics labels.
216 #[must_use]
217 pub fn peer_count(&self) -> usize {
218 self.peers.len()
219 }
220}
221
222/// Resolve the optional daemon signing key from a
223/// [`crate::governance::audit::load_daemon_signing_key`] result.
224///
225/// On success the key (if any) is wrapped in an `Arc`. On error the daemon
226/// degrades to UNSIGNED — it logs the key-directory resolution failure and
227/// returns `None` rather than failing the whole `build`, because peers
228/// running `AI_MEMORY_FED_REQUIRE_SIG=0` still accept unsigned posts.
229///
230/// Extracted from [`FederationConfig::build`] so the error branch is
231/// unit-testable: forcing `load_daemon_signing_key` to actually error
232/// requires an unresolvable host config dir, which is not deterministic in
233/// tests (the `dirs` crate falls back to `getpwuid` when `HOME` is unset).
234fn resolve_daemon_signing_key(
235 loaded: anyhow::Result<Option<ed25519_dalek::SigningKey>>,
236 sender_agent_id: &str,
237) -> Option<std::sync::Arc<ed25519_dalek::SigningKey>> {
238 match loaded {
239 Ok(maybe_key) => maybe_key.map(std::sync::Arc::new),
240 Err(e) => {
241 tracing::warn!(
242 target: "federation",
243 sender_agent_id = %sender_agent_id,
244 error = %e,
245 "could not resolve the daemon key directory; federation \
246 posts will be sent UNSIGNED — peers with require_sig \
247 enabled will silently reject them (partition risk). \
248 Fix the key directory permissions/path."
249 );
250 None
251 }
252 }
253}
254
255#[cfg(test)]
256mod resolve_signing_key_tests {
257 use super::resolve_daemon_signing_key;
258
259 #[test]
260 fn key_dir_error_degrades_to_unsigned() {
261 // #1533 — the build() key-load error branch must warn and continue
262 // UNSIGNED, never propagate. Driving the resolver with a synthetic
263 // Err covers the branch deterministically (the live error needs an
264 // unresolvable host config dir, which tests cannot force).
265 let got = resolve_daemon_signing_key(
266 Err(anyhow::anyhow!("synthetic key-dir resolution failure")),
267 "ai:unsigned-builder",
268 );
269 assert!(got.is_none(), "key-dir error must degrade to unsigned");
270 }
271
272 #[test]
273 fn absent_key_resolves_to_none() {
274 let got = resolve_daemon_signing_key(Ok(None), "ai:no-key");
275 assert!(got.is_none(), "no enrolled key resolves to None");
276 }
277
278 #[test]
279 fn present_key_is_wrapped_in_arc() {
280 use ed25519_dalek::SigningKey;
281 let key = SigningKey::from_bytes(&[7u8; 32]);
282 let expected = key.to_bytes();
283 let got = resolve_daemon_signing_key(Ok(Some(key)), "ai:signed");
284 let got = got.expect("present key must resolve to Some");
285 assert_eq!(got.to_bytes(), expected, "the loaded key is preserved");
286 }
287}