Skip to main content

ai_memory/federation/
peer.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer construction and `FederationConfig::build`.
5
6use std::time::Duration;
7
8use crate::replication::QuorumPolicy;
9
10use super::{FederationConfig, PeerEndpoint};
11
12/// #1579 B5 — keep idle per-peer connections pooled for 5 minutes.
13///
14/// reqwest's default `pool_idle_timeout` is 90s. Federation traffic is
15/// bursty: quorum pushes arrive on the operator's write cadence and
16/// the DLQ replay / catchup workers tick on 30–60s intervals, so the
17/// 90s default regularly evicted the warm connection between bursts
18/// and the next push paid a fresh mTLS handshake (~4.3×RTT — 0.39s
19/// cross-region vs ~0.09s reused, measured on do-1461, P3 leg 1/leg 2).
20/// 5 minutes comfortably spans the worker cadences without holding
21/// sockets forever on a reconfigured mesh.
22const FED_CLIENT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
23
24/// #1579 B5 — OS-level TCP keepalive probes on pooled federation
25/// connections, every 60s.
26///
27/// Deliberately conservative: PR #314 attempted `tcp_keepalive(1s)` +
28/// `pool_idle_timeout(5s)` and ship-gate run 21 showed that
29/// combination caused 40+ minute hangs from connection-pool churn +
30/// per-socket probe traffic (see the revert note in
31/// [`FederationConfig::build`]). 60s generates negligible probe load
32/// while still detecting half-open connections (NAT/conntrack drops on
33/// cross-region paths) well inside the 5-minute pool window, so a
34/// dead pooled socket is reaped instead of donating a connect error to
35/// the next quorum push.
36const FED_CLIENT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
37
38impl FederationConfig {
39    /// Build a `FederationConfig` from the serve-time CLI flags. Returns
40    /// `None` when federation is disabled (`quorum_writes == 0` or the
41    /// peer list is empty).
42    ///
43    /// `api_key` carries the local daemon's configured `[api] api_key`
44    /// (issue #702, v0.7.0 fold-A2A1.4). When `Some`, every outbound
45    /// federation POST attaches an `x-api-key` header so peers that
46    /// themselves run with api-key auth accept the request. `None`
47    /// preserves the backwards-compatible header set used by mTLS-only
48    /// deployments — outbound POSTs stay unauthenticated at the
49    /// application layer and rely on the TLS layer for trust.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the reqwest client cannot be constructed
54    /// with the supplied certificate material.
55    #[allow(clippy::too_many_arguments)]
56    pub fn build(
57        quorum_writes: usize,
58        peer_urls: &[String],
59        timeout: Duration,
60        client_cert_path: Option<&std::path::Path>,
61        client_key_path: Option<&std::path::Path>,
62        ca_cert_path: Option<&std::path::Path>,
63        sender_agent_id: String,
64        api_key: Option<String>,
65    ) -> anyhow::Result<Option<Self>> {
66        if quorum_writes == 0 || peer_urls.is_empty() {
67            return Ok(None);
68        }
69        // Ultrareview #341: reject duplicate peer URLs at build time.
70        // If the same peer URL appears twice under different indices,
71        // both would count as distinct ack sources and the quorum
72        // guarantee is violated. Normalize (trim trailing slash,
73        // lowercase scheme+host) before comparing.
74        let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
75        for raw in peer_urls {
76            let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
77            if !seen_urls.insert(normalized.clone()) {
78                return Err(anyhow::anyhow!(
79                    "duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
80                     — duplicates would let a single peer contribute to quorum more than once"
81                ));
82            }
83        }
84        let n = 1 + peer_urls.len(); // local node + remotes
85        let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
86            .map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
87        let peers: Vec<PeerEndpoint> = peer_urls
88            .iter()
89            .enumerate()
90            .map(|(i, raw)| {
91                // `id` is used as a Prometheus metric label; keep it
92                // low-cardinality. The full URL is logged separately.
93                // (#304 nit — prior form `peer-{i}:{url}` blew up the
94                // label space as deployment size grew.)
95                let trimmed = raw.trim_end_matches('/');
96                tracing::debug!(
97                    target: "federation",
98                    peer_index = i,
99                    url = trimmed,
100                    "registered peer"
101                );
102                PeerEndpoint {
103                    id: format!("peer-{i}"),
104                    sync_push_url: format!("{trimmed}/api/v1/sync/push"),
105                }
106            })
107            .collect();
108
109        // Federation client tuning.
110        //
111        // An earlier PR #314 attempted tight `tcp_keepalive(1s)` +
112        // `pool_idle_timeout(5s)` on this builder to close the Phase
113        // 4 partition_minority convergence gap. Ship-gate run 21
114        // showed that combination caused Phase 4 to hang for 40+
115        // minutes — suspected cause was connection-pool churn on the
116        // chaos-client's local 3-process mesh exhausting ephemeral
117        // ports under continuous close+reopen cycles with the tight
118        // keepalive generating probe traffic on every idle socket.
119        //
120        // Reverted to the conservative-default client here. Partition-
121        // recovery under chaos is moved out of the required ship-gate
122        // and into an opt-in campaign shape. Real partition resilience
123        // is a v0.6.0.1+ investigation with instrumented cycle data
124        // (cycles_by_fault now landed in ship-gate, giving us per-cycle
125        // visibility the next time we attempt this).
126        //
127        // #1579 B5 (2026-06-10) — persistent outbound connections. This
128        // single client is shared by every outbound federation surface
129        // (quorum push, bulk catchup, DLQ replay, /sync/since pull), so
130        // tuning it is the one-stop connection-lifecycle fix. The two
131        // knobs below are an order of magnitude more conservative than
132        // the reverted #314 attempt (300s/60s vs 5s/1s) — they EXTEND
133        // pooling rather than churn it. Measured basis: fresh mTLS ≈
134        // 4.3×RTT (1.0s cold cross-globe), reused ≈ 1×RTT (P3 leg 1);
135        // DLQ replay serialized at ~0.3s/row/peer on fresh handshakes.
136        let mut client_builder = reqwest::Client::builder()
137            .timeout(timeout)
138            .connect_timeout(Duration::from_secs(2))
139            .pool_idle_timeout(FED_CLIENT_POOL_IDLE_TIMEOUT)
140            .tcp_keepalive(FED_CLIENT_TCP_KEEPALIVE)
141            .use_rustls_tls();
142        // --quorum-ca-cert: trust a caller-supplied root CA for outbound
143        // federation POSTs. Required whenever peers present a cert NOT
144        // rooted in webpki-roots (Mozilla CA bundle) — e.g. a self-
145        // signed / ephemeral CA generated for an isolated test fleet.
146        // Without this, reqwest's rustls-tls feature (webpki-roots
147        // only) rejects the peer cert and every quorum write times
148        // out as quorum_not_met. See alphaonedev/ai-memory-mcp#333.
149        if let Some(ca_path) = ca_cert_path {
150            let ca_pem = std::fs::read(ca_path)
151                .map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
152            // v0.7.0 #1070 follow-up — `reqwest::Certificate::from_pem`
153            // post-#1070 (rustls-only backend, no native-tls fallback)
154            // accepts a file with no PEM markers as an empty cert chain
155            // without erroring. That breaks the strict-validation
156            // contract `config_build_rejects_invalid_ca_cert_pem` pins.
157            // Pre-flight the file content for a `-----BEGIN ` marker so
158            // a non-PEM operator-supplied path produces the same
159            // explicit error message the legacy native-tls parser
160            // emitted.
161            let has_pem_marker = ca_pem.windows(11).any(|w| w == b"-----BEGIN ");
162            if !has_pem_marker {
163                anyhow::bail!(
164                    "parse --quorum-ca-cert: input at {} contains no PEM `-----BEGIN ` marker",
165                    ca_path.display()
166                );
167            }
168            let ca = reqwest::Certificate::from_pem(&ca_pem)
169                .map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
170            client_builder = client_builder.add_root_certificate(ca);
171        }
172        if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
173            let cert_pem =
174                std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
175            let key_pem =
176                std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
177            let mut pem = cert_pem;
178            pem.extend_from_slice(b"\n");
179            pem.extend_from_slice(&key_pem);
180            let identity = reqwest::Identity::from_pem(&pem)
181                .map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
182            client_builder = client_builder.identity(identity);
183        }
184        let client = client_builder
185            .build()
186            .map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
187
188        // v0.7.0 #791 — load the daemon's Ed25519 signing key (no-op
189        // stub here; the full #697 audit/identity module loads from
190        // disk). NON-FATAL: peers running `AI_MEMORY_FED_REQUIRE_SIG=0`
191        // accept unsigned posts even when the signing key is missing.
192        let signing_key = resolve_daemon_signing_key(
193            crate::governance::audit::load_daemon_signing_key(&sender_agent_id),
194            &sender_agent_id,
195        );
196        Ok(Some(Self {
197            policy,
198            peers,
199            client,
200            sender_agent_id,
201            api_key,
202            signing_key,
203            // v0.7.0 Track D #933 — federation push DLQ sink is
204            // populated by the daemon bootstrap AFTER the SAL store
205            // handle resolves (see `daemon_runtime.rs`). The build()
206            // path here returns `None` so an `ai-memory serve`
207            // invocation that never bootstraps a store (none today;
208            // belt-and-braces) doesn't trip a half-wired DLQ.
209            #[cfg(feature = "sal")]
210            dlq_sink: None,
211        }))
212    }
213
214    /// Count of peers in the mesh (excludes the local node). Useful for
215    /// metrics labels.
216    #[must_use]
217    pub fn peer_count(&self) -> usize {
218        self.peers.len()
219    }
220}
221
222/// Resolve the optional daemon signing key from a
223/// [`crate::governance::audit::load_daemon_signing_key`] result.
224///
225/// On success the key (if any) is wrapped in an `Arc`. On error the daemon
226/// degrades to UNSIGNED — it logs the key-directory resolution failure and
227/// returns `None` rather than failing the whole `build`, because peers
228/// running `AI_MEMORY_FED_REQUIRE_SIG=0` still accept unsigned posts.
229///
230/// Extracted from [`FederationConfig::build`] so the error branch is
231/// unit-testable: forcing `load_daemon_signing_key` to actually error
232/// requires an unresolvable host config dir, which is not deterministic in
233/// tests (the `dirs` crate falls back to `getpwuid` when `HOME` is unset).
234fn resolve_daemon_signing_key(
235    loaded: anyhow::Result<Option<ed25519_dalek::SigningKey>>,
236    sender_agent_id: &str,
237) -> Option<std::sync::Arc<ed25519_dalek::SigningKey>> {
238    match loaded {
239        Ok(maybe_key) => maybe_key.map(std::sync::Arc::new),
240        Err(e) => {
241            tracing::warn!(
242                target: "federation",
243                sender_agent_id = %sender_agent_id,
244                error = %e,
245                "could not resolve the daemon key directory; federation \
246                 posts will be sent UNSIGNED — peers with require_sig \
247                 enabled will silently reject them (partition risk). \
248                 Fix the key directory permissions/path."
249            );
250            None
251        }
252    }
253}
254
255#[cfg(test)]
256mod resolve_signing_key_tests {
257    use super::resolve_daemon_signing_key;
258
259    #[test]
260    fn key_dir_error_degrades_to_unsigned() {
261        // #1533 — the build() key-load error branch must warn and continue
262        // UNSIGNED, never propagate. Driving the resolver with a synthetic
263        // Err covers the branch deterministically (the live error needs an
264        // unresolvable host config dir, which tests cannot force).
265        let got = resolve_daemon_signing_key(
266            Err(anyhow::anyhow!("synthetic key-dir resolution failure")),
267            "ai:unsigned-builder",
268        );
269        assert!(got.is_none(), "key-dir error must degrade to unsigned");
270    }
271
272    #[test]
273    fn absent_key_resolves_to_none() {
274        let got = resolve_daemon_signing_key(Ok(None), "ai:no-key");
275        assert!(got.is_none(), "no enrolled key resolves to None");
276    }
277
278    #[test]
279    fn present_key_is_wrapped_in_arc() {
280        use ed25519_dalek::SigningKey;
281        let key = SigningKey::from_bytes(&[7u8; 32]);
282        let expected = key.to_bytes();
283        let got = resolve_daemon_signing_key(Ok(Some(key)), "ai:signed");
284        let got = got.expect("present key must resolve to Some");
285        assert_eq!(got.to_bytes(), expected, "the loaded key is preserved");
286    }
287}