use std::time::Duration;
use crate::replication::QuorumPolicy;
use super::{FederationConfig, PeerEndpoint};
const FED_CLIENT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
const FED_CLIENT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
impl FederationConfig {
#[allow(clippy::too_many_arguments)]
pub fn build(
quorum_writes: usize,
peer_urls: &[String],
timeout: Duration,
client_cert_path: Option<&std::path::Path>,
client_key_path: Option<&std::path::Path>,
ca_cert_path: Option<&std::path::Path>,
sender_agent_id: String,
api_key: Option<String>,
) -> anyhow::Result<Option<Self>> {
if quorum_writes == 0 || peer_urls.is_empty() {
return Ok(None);
}
let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
for raw in peer_urls {
let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
if !seen_urls.insert(normalized.clone()) {
return Err(anyhow::anyhow!(
"duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
— duplicates would let a single peer contribute to quorum more than once"
));
}
}
let n = 1 + peer_urls.len(); let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
let peers: Vec<PeerEndpoint> = peer_urls
.iter()
.enumerate()
.map(|(i, raw)| {
let trimmed = raw.trim_end_matches('/');
tracing::debug!(
target: "federation",
peer_index = i,
url = trimmed,
"registered peer"
);
PeerEndpoint {
id: format!("peer-{i}"),
sync_push_url: format!("{trimmed}/api/v1/sync/push"),
}
})
.collect();
let mut client_builder = reqwest::Client::builder()
.timeout(timeout)
.connect_timeout(Duration::from_secs(2))
.pool_idle_timeout(FED_CLIENT_POOL_IDLE_TIMEOUT)
.tcp_keepalive(FED_CLIENT_TCP_KEEPALIVE)
.use_rustls_tls();
if let Some(ca_path) = ca_cert_path {
let ca_pem = std::fs::read(ca_path)
.map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
let has_pem_marker = ca_pem.windows(11).any(|w| w == b"-----BEGIN ");
if !has_pem_marker {
anyhow::bail!(
"parse --quorum-ca-cert: input at {} contains no PEM `-----BEGIN ` marker",
ca_path.display()
);
}
let ca = reqwest::Certificate::from_pem(&ca_pem)
.map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
client_builder = client_builder.add_root_certificate(ca);
}
if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
let cert_pem =
std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
let key_pem =
std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
let mut pem = cert_pem;
pem.extend_from_slice(b"\n");
pem.extend_from_slice(&key_pem);
let identity = reqwest::Identity::from_pem(&pem)
.map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
client_builder = client_builder.identity(identity);
}
let client = client_builder
.build()
.map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
let signing_key = resolve_daemon_signing_key(
crate::governance::audit::load_daemon_signing_key(&sender_agent_id),
&sender_agent_id,
);
Ok(Some(Self {
policy,
peers,
client,
sender_agent_id,
api_key,
signing_key,
#[cfg(feature = "sal")]
dlq_sink: None,
}))
}
#[must_use]
pub fn peer_count(&self) -> usize {
self.peers.len()
}
}
fn resolve_daemon_signing_key(
loaded: anyhow::Result<Option<ed25519_dalek::SigningKey>>,
sender_agent_id: &str,
) -> Option<std::sync::Arc<ed25519_dalek::SigningKey>> {
match loaded {
Ok(maybe_key) => maybe_key.map(std::sync::Arc::new),
Err(e) => {
tracing::warn!(
target: "federation",
sender_agent_id = %sender_agent_id,
error = %e,
"could not resolve the daemon key directory; federation \
posts will be sent UNSIGNED — peers with require_sig \
enabled will silently reject them (partition risk). \
Fix the key directory permissions/path."
);
None
}
}
}
#[cfg(test)]
mod resolve_signing_key_tests {
use super::resolve_daemon_signing_key;
#[test]
fn key_dir_error_degrades_to_unsigned() {
let got = resolve_daemon_signing_key(
Err(anyhow::anyhow!("synthetic key-dir resolution failure")),
"ai:unsigned-builder",
);
assert!(got.is_none(), "key-dir error must degrade to unsigned");
}
#[test]
fn absent_key_resolves_to_none() {
let got = resolve_daemon_signing_key(Ok(None), "ai:no-key");
assert!(got.is_none(), "no enrolled key resolves to None");
}
#[test]
fn present_key_is_wrapped_in_arc() {
use ed25519_dalek::SigningKey;
let key = SigningKey::from_bytes(&[7u8; 32]);
let expected = key.to_bytes();
let got = resolve_daemon_signing_key(Ok(Some(key)), "ai:signed");
let got = got.expect("present key must resolve to Some");
assert_eq!(got.to_bytes(), expected, "the loaded key is preserved");
}
}