use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
#[derive(Clone)]
pub struct FederationConfig {
pub policy: QuorumPolicy,
pub peers: Vec<PeerEndpoint>,
pub client: reqwest::Client,
pub sender_agent_id: String,
}
#[derive(Clone, Debug)]
pub struct PeerEndpoint {
pub id: String,
pub sync_push_url: String,
}
impl FederationConfig {
pub fn build(
quorum_writes: usize,
peer_urls: &[String],
timeout: Duration,
client_cert_path: Option<&std::path::Path>,
client_key_path: Option<&std::path::Path>,
ca_cert_path: Option<&std::path::Path>,
sender_agent_id: String,
) -> anyhow::Result<Option<Self>> {
if quorum_writes == 0 || peer_urls.is_empty() {
return Ok(None);
}
let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
for raw in peer_urls {
let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
if !seen_urls.insert(normalized.clone()) {
return Err(anyhow::anyhow!(
"duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
— duplicates would let a single peer contribute to quorum more than once"
));
}
}
let n = 1 + peer_urls.len(); let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
let peers: Vec<PeerEndpoint> = peer_urls
.iter()
.enumerate()
.map(|(i, raw)| {
let trimmed = raw.trim_end_matches('/');
tracing::debug!(
target = "federation",
peer_index = i,
url = trimmed,
"registered peer"
);
PeerEndpoint {
id: format!("peer-{i}"),
sync_push_url: format!("{trimmed}/api/v1/sync/push"),
}
})
.collect();
let mut client_builder = reqwest::Client::builder()
.timeout(timeout)
.connect_timeout(Duration::from_secs(2))
.use_rustls_tls();
if let Some(ca_path) = ca_cert_path {
let ca_pem = std::fs::read(ca_path)
.map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
let ca = reqwest::Certificate::from_pem(&ca_pem)
.map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
client_builder = client_builder.add_root_certificate(ca);
}
if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
let cert_pem =
std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
let key_pem =
std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
let mut pem = cert_pem;
pem.extend_from_slice(b"\n");
pem.extend_from_slice(&key_pem);
let identity = reqwest::Identity::from_pem(&pem)
.map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
client_builder = client_builder.identity(identity);
}
let client = client_builder
.build()
.map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
Ok(Some(Self {
policy,
peers,
client,
sender_agent_id,
}))
}
#[must_use]
pub fn peer_count(&self) -> usize {
self.peers.len()
}
}
pub async fn broadcast_store_quorum(
config: &FederationConfig,
mem: &Memory,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [mem],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let id = peer.id.clone();
let mem_id = mem.id.clone();
let payload = body.clone();
joins.spawn(async move {
let outcome = post_and_classify(&client, &url, &payload, &mem_id, Some(&mem_id)).await;
(id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: peer join error: {e}");
}
Ok(None) | Err(_) => break, }
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
let mem_id = mem.id.clone();
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
match res {
Ok((peer_id, AckOutcome::Ack)) => {
tracing::debug!("federation: post-quorum ack from {peer_id}");
}
Ok((peer_id, AckOutcome::IdDrift)) => {
tracing::warn!(
"federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
);
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["id_drift"])
.inc();
}
Ok((peer_id, AckOutcome::Fail(reason))) => {
tracing::warn!(
"federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
);
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["peer_fail"])
.inc();
}
Err(e) => {
tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["join_error"])
.inc();
}
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
#[derive(Debug)]
enum AckOutcome {
Ack,
IdDrift,
Fail(String),
}
async fn post_once(
client: &reqwest::Client,
url: &str,
body: &serde_json::Value,
expected_id: &str,
idempotency_key: Option<&str>,
) -> AckOutcome {
let mut req = client.post(url).json(body);
if let Some(key) = idempotency_key {
req = req.header("Idempotency-Key", key);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<serde_json::Value>().await {
Ok(v) => {
if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
&& !ids.is_empty()
&& !ids.iter().any(|x| x.as_str() == Some(expected_id))
{
return AckOutcome::IdDrift;
}
AckOutcome::Ack
}
Err(_) => AckOutcome::Ack, }
}
Ok(resp) => AckOutcome::Fail(format!("http {}", resp.status())),
Err(e) => AckOutcome::Fail(format!("network: {e}")),
}
}
const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
async fn post_and_classify(
client: &reqwest::Client,
url: &str,
body: &serde_json::Value,
expected_id: &str,
idempotency_key: Option<&str>,
) -> AckOutcome {
match post_once(client, url, body, expected_id, idempotency_key).await {
AckOutcome::Ack => AckOutcome::Ack,
AckOutcome::IdDrift => AckOutcome::IdDrift,
AckOutcome::Fail(first_reason) => {
tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
match post_once(client, url, body, expected_id, idempotency_key).await {
AckOutcome::Ack => {
tracing::debug!(
"federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
);
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["ok"])
.inc();
AckOutcome::Ack
}
AckOutcome::IdDrift => {
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["id_drift"])
.inc();
AckOutcome::IdDrift
}
AckOutcome::Fail(retry_reason) => {
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["fail"])
.inc();
AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
}
}
}
}
}
pub async fn bulk_catchup_push(
config: &FederationConfig,
memories: &[Memory],
) -> Vec<(String, String)> {
if memories.is_empty() || config.peers.is_empty() {
return Vec::new();
}
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": memories,
"dry_run": false,
});
let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let id = peer.id.clone();
let payload = body.clone();
joins.spawn(async move {
let mut req = client.post(&url).json(&payload);
req = req.header("X-Catchup", "bulk");
let outcome = match req.send().await {
Ok(resp) if resp.status().is_success() => Ok(()),
Ok(resp) => Err(format!("http {}", resp.status())),
Err(e) => Err(format!("network: {e}")),
};
(id, outcome)
});
}
let mut errors = Vec::new();
while let Some(res) = joins.join_next().await {
match res {
Ok((peer_id, Err(err))) => {
tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
errors.push((peer_id, err));
}
Ok((_, Ok(()))) => {}
Err(e) => {
tracing::warn!("bulk_catchup_push: join error: {e:?}");
errors.push(("unknown".to_string(), e.to_string()));
}
}
}
errors
}
pub fn finalise_quorum(tracker: &AckTracker) -> Result<usize, QuorumError> {
tracker.finalise(Instant::now())
}
pub async fn broadcast_delete_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"deletions": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: delete peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum delete peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_archive_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"archives": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: archive peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum archive peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_restore_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"restores": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: restore peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum restore peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_link_quorum(
config: &FederationConfig,
link: &MemoryLink,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"links": [link],
"dry_run": false,
});
let log_id = format!("{}→{}", link.source_id, link.target_id);
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let log_id = log_id.clone();
joins.spawn(async move {
let outcome = post_and_classify(&client, &url, &payload, &log_id, Some(&log_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: link peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum link peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_consolidate_quorum(
config: &FederationConfig,
new_mem: &Memory,
source_ids: &[String],
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [new_mem],
"deletions": source_ids,
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = new_mem.id.clone();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: consolidate peer {peer_id} failed for {}: {reason}",
new_mem.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: consolidate peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_pending_quorum(
config: &FederationConfig,
pending: &PendingAction,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"pendings": [pending],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = pending.id.clone();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: pending peer {peer_id} failed for {}: {reason}",
pending.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: pending peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum pending peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_pending_decision_quorum(
config: &FederationConfig,
decision: &PendingDecision,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"pending_decisions": [decision],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = decision.id.clone();
joins.spawn(async move {
let outcome =
post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: pending-decision peer {peer_id} failed for {}: {reason}",
decision.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: pending-decision peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_namespace_meta_quorum(
config: &FederationConfig,
entry: &NamespaceMetaEntry,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"namespace_meta": [entry],
"dry_run": false,
});
let target_id = entry.namespace.clone();
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target = target_id.clone();
joins.spawn(async move {
let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: namespace_meta peer {peer_id} failed for {}: {reason}",
entry.namespace
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: namespace_meta peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_namespace_meta_clear_quorum(
config: &FederationConfig,
namespaces: &[String],
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
"sender_agent_id": config.sender_agent_id,
"memories": [],
"namespace_meta_clears": namespaces,
"dry_run": false,
});
let target_id = namespaces.join(",");
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target = target_id.clone();
joins.spawn(async move {
let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
target_id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: "tracker arc still referenced at finalise".to_string(),
})?
.into_inner();
Ok(tracker)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct QuorumNotMetPayload {
pub error: &'static str,
pub got: usize,
pub needed: usize,
pub reason: String,
}
impl QuorumNotMetPayload {
#[must_use]
pub fn from_err(err: &QuorumError) -> Self {
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => Self {
error: "quorum_not_met",
got: *got,
needed: *needed,
reason: match reason {
QuorumFailureReason::Unreachable => "unreachable".to_string(),
QuorumFailureReason::Timeout | QuorumFailureReason::InFlight => {
"timeout".to_string()
}
QuorumFailureReason::IdDrift => "id_drift".to_string(),
},
},
QuorumError::InvalidPolicy { detail } => Self {
error: "quorum_not_met",
got: 0,
needed: 0,
reason: format!("invalid_policy:{detail}"),
},
QuorumError::LocalWriteFailed { detail } => Self {
error: "quorum_not_met",
got: 0,
needed: 0,
reason: format!("local_write_failed:{detail}"),
},
}
}
}
pub fn spawn_catchup_loop(
config: FederationConfig,
db: crate::handlers::Db,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
catchup_once(&config, &db).await;
tokio::time::sleep(interval).await;
}
})
}
async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
let local_id = config.sender_agent_id.clone();
for peer in &config.peers {
let base = peer
.sync_push_url
.trim_end_matches("/api/v1/sync/push")
.to_string();
let since_opt: Option<String> = {
let lock = db.lock().await;
match crate::db::sync_state_load(&lock.0, &local_id) {
Ok(clock) => clock.entries.get(&peer.id).cloned(),
Err(_) => None,
}
};
let url = match since_opt.as_deref() {
Some(s) => format!(
"{base}/api/v1/sync/since?since={}&peer={local_id}",
urlencoding_encode(s)
),
None => format!("{base}/api/v1/sync/since?peer={local_id}"),
};
let resp = match config.client.get(&url).send().await {
Ok(r) if r.status().is_success() => r,
Ok(r) => {
tracing::debug!(
"catchup: peer {} returned HTTP {} — skipping this tick",
peer.id,
r.status()
);
continue;
}
Err(e) => {
tracing::debug!("catchup: peer {} unreachable: {e}", peer.id);
continue;
}
};
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(e) => {
tracing::warn!("catchup: peer {} returned unparseable body: {e}", peer.id);
continue;
}
};
let memories = match body.get("memories").and_then(|v| v.as_array()) {
Some(arr) => arr.clone(),
None => continue,
};
if memories.is_empty() {
continue;
}
let mut applied = 0usize;
let mut latest_ts: Option<String> = None;
{
let lock = db.lock().await;
for raw in &memories {
let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
Ok(m) => m,
Err(e) => {
tracing::warn!("catchup: unparseable memory from peer {}: {e}", peer.id);
continue;
}
};
if crate::validate::validate_memory(&mem).is_err() {
continue;
}
if latest_ts
.as_deref()
.is_none_or(|cur| mem.updated_at.as_str() > cur)
{
latest_ts = Some(mem.updated_at.clone());
}
if crate::db::insert_if_newer(&lock.0, &mem).is_ok() {
applied += 1;
}
}
if let Some(ts) = latest_ts.as_deref()
&& let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
{
tracing::warn!("catchup: sync_state_observe failed for {}: {e}", peer.id);
}
}
if applied > 0 {
tracing::info!(
"catchup: applied {applied} memories from peer {} (since={})",
peer.id,
since_opt.as_deref().unwrap_or("<full-snapshot>"),
);
}
}
}
fn urlencoding_encode(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 6);
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char);
}
_ => {
use std::fmt::Write;
let _ = write!(out, "%{b:02X}");
}
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use axum::Router;
use axum::extract::Json as AxumJson;
use axum::http::StatusCode;
use axum::routing::post;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::net::TcpListener;
fn sample_memory() -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: "fed-test".to_string(),
tier: crate::models::Tier::Mid,
namespace: "app".to_string(),
title: "hello".to_string(),
content: "world for federation test".to_string(),
tags: vec!["t".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id":"ai:test"}),
}
}
#[derive(Clone, Copy)]
enum MockBehaviour {
Ack,
Fail,
Hang,
FailThenAck {
fail_until: usize,
},
}
#[derive(Clone)]
struct MockState {
behaviour: MockBehaviour,
count: Arc<AtomicUsize>,
}
async fn mock_handler(
axum::extract::State(state): axum::extract::State<MockState>,
AxumJson(_body): AxumJson<serde_json::Value>,
) -> (StatusCode, AxumJson<serde_json::Value>) {
let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
match state.behaviour {
MockBehaviour::Ack => (
StatusCode::OK,
AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
),
MockBehaviour::Fail => (
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"stub failure"})),
),
MockBehaviour::Hang => {
tokio::time::sleep(Duration::from_secs(10)).await;
(StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
}
MockBehaviour::FailThenAck { fail_until } => {
if call <= fail_until {
(
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"stub transient failure"})),
)
} else {
(
StatusCode::OK,
AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
)
}
}
}
}
async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
let call_count = Arc::new(AtomicUsize::new(0));
let state = MockState {
behaviour,
count: call_count.clone(),
};
let app = Router::new()
.route("/api/v1/sync/push", post(mock_handler))
.with_state(state);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
(format!("http://{addr}"), call_count)
}
fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(timeout_ms))
.build()
.unwrap();
let n = 1 + peers.len();
FederationConfig {
policy: QuorumPolicy::new(
n,
w,
Duration::from_millis(timeout_ms),
Duration::from_secs(30),
)
.unwrap(),
peers: peers
.into_iter()
.enumerate()
.map(|(i, url)| PeerEndpoint {
id: format!("peer-{i}:{url}"),
sync_push_url: format!("{url}/api/v1/sync/push"),
})
.collect(),
client,
sender_agent_id: "ai:fed-test".to_string(),
}
}
#[tokio::test]
async fn happy_path_two_peers_quorum_met() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
assert!(result.is_ok(), "expected quorum met, got {result:?}");
let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
assert!(calls >= 1);
}
#[tokio::test]
async fn post_quorum_fanout_reaches_all_peers() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 must receive the write post-quorum"
);
assert_eq!(
count2.load(Ordering::Relaxed),
1,
"peer-2 must receive the write post-quorum"
);
}
#[tokio::test]
async fn transient_peer_failure_is_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 acked first time, no retry"
);
assert_eq!(
count2.load(Ordering::Relaxed),
2,
"peer-2 must see exactly two attempts (first fail, retry ack)"
);
}
#[tokio::test]
async fn persistent_peer_failure_stops_after_one_retry() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(800)).await;
assert_eq!(
count2.load(Ordering::Relaxed),
2,
"persistently-failing peer must be called exactly twice (1 + 1 retry)"
);
}
#[tokio::test]
async fn bulk_catchup_push_hits_every_peer_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory(), sample_memory(), sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert!(
errors.is_empty(),
"catchup must succeed on healthy peers, got {errors:?}"
);
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 must receive exactly one catchup batch"
);
assert_eq!(
count2.load(Ordering::Relaxed),
1,
"peer-2 must receive exactly one catchup batch"
);
}
#[tokio::test]
async fn bulk_catchup_push_reports_peer_failures() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
assert!(
errors[0].1.contains("500") || errors[0].1.contains("http"),
"error must name the HTTP failure, got {:?}",
errors[0]
);
}
#[tokio::test]
async fn bulk_catchup_push_empty_inputs_are_noop() {
let cfg = build_config(vec![], 1, 500);
assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1], 1, 500);
assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
assert_eq!(
count1.load(Ordering::Relaxed),
0,
"no catchup POST must fire when the row set is empty"
);
}
#[tokio::test]
async fn partition_minority_fails_quorum() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1, "local commit only");
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn timeout_on_hanging_peer_classified_timeout() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let cfg = build_config(vec![url1], 2, 200);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { reason, .. } => {
assert!(
matches!(
reason,
QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
),
"unexpected reason {reason:?}"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn majority_quorum_tolerates_one_peer_down() {
let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url_up, url_down], 2, 2000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
assert!(
result.is_ok(),
"majority should tolerate 1 peer down, got {result:?}"
);
}
#[test]
fn config_build_disabled_when_w_zero() {
let cfg = FederationConfig::build(
0,
&["http://example.com".to_string()],
Duration::from_millis(500),
None,
None,
None,
"ai:test".to_string(),
)
.unwrap();
assert!(cfg.is_none());
}
#[test]
fn config_build_disabled_when_peers_empty() {
let cfg = FederationConfig::build(
2,
&[],
Duration::from_millis(500),
None,
None,
None,
"ai:test".to_string(),
)
.unwrap();
assert!(cfg.is_none());
}
#[test]
fn quorum_not_met_payload_from_err() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 3,
reason: QuorumFailureReason::Timeout,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 1);
assert_eq!(payload.needed, 3);
assert_eq!(payload.reason, "timeout");
}
#[tokio::test]
async fn archive_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
let result = finalise_quorum(&tracker);
assert!(result.is_ok(), "expected quorum met, got {result:?}");
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn archive_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1);
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn delete_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn delete_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1);
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn restore_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn restore_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_link() -> MemoryLink {
MemoryLink {
source_id: "mem-a".to_string(),
target_id: "mem-b".to_string(),
relation: "related_to".to_string(),
created_at: chrono::Utc::now().to_rfc3339(),
}
}
#[tokio::test]
async fn link_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn link_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[tokio::test]
async fn consolidate_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let new_mem = sample_memory();
let sources = vec!["src-a".to_string(), "src-b".to_string()];
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn consolidate_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let new_mem = sample_memory();
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_pending() -> PendingAction {
PendingAction {
id: "pa-1".to_string(),
action_type: "delete".to_string(),
memory_id: Some("mem-x".to_string()),
namespace: "app".to_string(),
payload: serde_json::json!({}),
requested_by: "ai:test".to_string(),
requested_at: chrono::Utc::now().to_rfc3339(),
status: "pending".to_string(),
decided_by: None,
decided_at: None,
approvals: vec![],
}
}
#[tokio::test]
async fn pending_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn pending_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_decision() -> PendingDecision {
PendingDecision {
id: "pa-1".to_string(),
approved: true,
decider: "ai:approver".to_string(),
}
}
#[tokio::test]
async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn pending_decision_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_namespace_meta() -> NamespaceMetaEntry {
NamespaceMetaEntry {
namespace: "app/team".to_string(),
standard_id: "mem-std-1".to_string(),
parent_namespace: Some("app".to_string()),
updated_at: chrono::Utc::now().to_rfc3339(),
}
}
#[tokio::test]
async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn namespace_meta_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[tokio::test]
async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn namespace_meta_clear_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let namespaces = vec!["app/team".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[test]
fn quorum_not_met_payload_unreachable_reason() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::Unreachable,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "unreachable");
}
#[test]
fn quorum_not_met_payload_id_drift_reason() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::IdDrift,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "id_drift");
}
#[test]
fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::InFlight,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "timeout");
}
#[test]
fn quorum_not_met_payload_invalid_policy_branch() {
let err = QuorumError::InvalidPolicy {
detail: "bad-thing".to_string(),
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 0);
assert_eq!(payload.needed, 0);
assert!(payload.reason.starts_with("invalid_policy:"));
assert!(payload.reason.contains("bad-thing"));
}
#[test]
fn quorum_not_met_payload_local_write_failed_branch() {
let err = QuorumError::LocalWriteFailed {
detail: "disk-full".to_string(),
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert!(payload.reason.starts_with("local_write_failed:"));
assert!(payload.reason.contains("disk-full"));
}
#[test]
fn config_build_constructs_when_w_and_peers_set() {
let cfg = FederationConfig::build(
2,
&[
"http://peer-a.example/".to_string(),
"http://peer-b.example".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:builder".to_string(),
)
.unwrap()
.expect("config should be Some when w>0 and peers nonempty");
assert_eq!(cfg.peer_count(), 2);
assert_eq!(cfg.peers[0].id, "peer-0");
assert_eq!(cfg.peers[1].id, "peer-1");
assert_eq!(
cfg.peers[0].sync_push_url,
"http://peer-a.example/api/v1/sync/push"
);
assert_eq!(
cfg.peers[1].sync_push_url,
"http://peer-b.example/api/v1/sync/push"
);
assert_eq!(cfg.sender_agent_id, "ai:builder");
}
#[test]
fn config_build_rejects_duplicate_peer_urls() {
let result = FederationConfig::build(
2,
&[
"http://peer.example".to_string(),
"http://peer.example/".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:builder".to_string(),
);
let err = match result {
Ok(_) => panic!("expected duplicate-URL rejection"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-URL rejection, got {msg:?}"
);
}
#[test]
fn config_build_rejects_missing_ca_cert_path() {
let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
None,
None,
Some(&bogus),
"ai:builder".to_string(),
);
let err = match result {
Ok(_) => panic!("expected ca-cert read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --quorum-ca-cert"),
"expected ca-cert read error, got {msg:?}"
);
}
#[test]
fn config_build_rejects_invalid_ca_cert_pem() {
let dir = tempfile::tempdir().unwrap();
let bad = dir.path().join("not-a-cert.pem");
std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
None,
None,
Some(&bad),
"ai:builder".to_string(),
);
let err = match result {
Ok(_) => panic!("expected ca-cert parse error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
"expected ca-cert parse error, got {msg:?}"
);
}
#[test]
fn config_build_rejects_missing_client_cert_path() {
let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&bogus_cert),
Some(&bogus_key),
None,
"ai:builder".to_string(),
);
let err = match result {
Ok(_) => panic!("expected client-cert read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --client-cert"),
"expected client-cert read error, got {msg:?}"
);
}
#[test]
fn peer_count_matches_peer_list() {
let cfg = build_config(
vec![
"http://a.example".to_string(),
"http://b.example".to_string(),
"http://c.example".to_string(),
],
2,
500,
);
assert_eq!(cfg.peer_count(), 3);
}
#[test]
fn urlencoding_encode_passthrough_safe_chars() {
let encoded = urlencoding_encode("abcXYZ-09_.~");
assert_eq!(encoded, "abcXYZ-09_.~");
}
#[test]
fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
assert!(
encoded.contains("%3A"),
"expected colon to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%2B"),
"expected + to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%2F"),
"expected / to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%20"),
"expected space to be percent-encoded: {encoded}"
);
assert!(
!encoded.contains("%2D"),
"hyphen must pass through unencoded: {encoded}"
);
}
#[test]
fn urlencoding_encode_empty_string() {
assert_eq!(urlencoding_encode(""), "");
}
async fn id_drift_handler(
AxumJson(_body): AxumJson<serde_json::Value>,
) -> (StatusCode, AxumJson<serde_json::Value>) {
(
StatusCode::OK,
AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
)
}
async fn spawn_id_drift_peer() -> String {
let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
format!("http://{addr}")
}
#[tokio::test]
async fn id_drift_peer_does_not_count_as_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
let err = result.unwrap_err();
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => {
assert_eq!(got, 1, "only local should count");
assert_eq!(needed, 2);
assert!(
matches!(
reason,
QuorumFailureReason::IdDrift
| QuorumFailureReason::Timeout
| QuorumFailureReason::InFlight
),
"expected IdDrift / Timeout / InFlight, got {reason:?}"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[derive(Clone)]
enum SinceMockBehaviour {
ReturnMemories(Vec<Memory>),
Error500,
Hang(Duration),
MalformedBody,
}
#[derive(Clone)]
struct SinceMockState {
behaviour: SinceMockBehaviour,
hits: Arc<AtomicUsize>,
last_since: Arc<Mutex<Option<String>>>,
last_peer: Arc<Mutex<Option<String>>>,
}
async fn since_handler(
axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
axum::extract::State(state): axum::extract::State<SinceMockState>,
) -> axum::response::Response {
use axum::response::IntoResponse;
state.hits.fetch_add(1, Ordering::Relaxed);
{
let mut s = state.last_since.lock().await;
*s = q.get("since").cloned();
}
{
let mut p = state.last_peer.lock().await;
*p = q.get("peer").cloned();
}
match &state.behaviour {
SinceMockBehaviour::ReturnMemories(mems) => {
let body = serde_json::json!({"memories": mems});
(StatusCode::OK, AxumJson(body)).into_response()
}
SinceMockBehaviour::Error500 => (
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"oops"})),
)
.into_response(),
SinceMockBehaviour::Hang(d) => {
tokio::time::sleep(*d).await;
(
StatusCode::OK,
AxumJson(serde_json::json!({"memories": []})),
)
.into_response()
}
SinceMockBehaviour::MalformedBody => {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
"this is not json {{{",
)
.into_response()
}
}
}
async fn spawn_since_peer(
behaviour: SinceMockBehaviour,
) -> (
String,
Arc<AtomicUsize>,
Arc<Mutex<Option<String>>>,
Arc<Mutex<Option<String>>>,
) {
let hits = Arc::new(AtomicUsize::new(0));
let last_since = Arc::new(Mutex::new(None));
let last_peer = Arc::new(Mutex::new(None));
let state = SinceMockState {
behaviour,
hits: hits.clone(),
last_since: last_since.clone(),
last_peer: last_peer.clone(),
};
let app = Router::new()
.route("/api/v1/sync/since", axum::routing::get(since_handler))
.with_state(state);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
(format!("http://{addr}"), hits, last_since, last_peer)
}
fn build_test_db() -> crate::handlers::Db {
let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
let path = std::path::PathBuf::from(":memory:");
Arc::new(Mutex::new((
conn,
path,
crate::config::ResolvedTtl::default(),
true,
)))
}
fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(timeout_ms))
.build()
.unwrap();
FederationConfig {
policy: QuorumPolicy::new(
2,
1,
Duration::from_millis(timeout_ms),
Duration::from_secs(30),
)
.unwrap(),
peers: vec![PeerEndpoint {
id: "peer-0".to_string(),
sync_push_url: format!("{peer_url}/api/v1/sync/push"),
}],
client,
sender_agent_id: "ai:catchup-test".to_string(),
}
}
fn catchup_memory(title: &str, updated_at: &str) -> Memory {
Memory {
id: format!("cat-{title}"),
tier: crate::models::Tier::Mid,
namespace: "catchup".to_string(),
title: title.to_string(),
content: format!("content for {title}"),
tags: vec!["catchup".to_string()],
priority: 5,
confidence: 1.0,
source: "system".to_string(),
access_count: 0,
created_at: updated_at.to_string(),
updated_at: updated_at.to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id":"ai:peer-0"}),
}
}
#[tokio::test]
async fn test_catchup_once_pulls_since_cursor_advances_state() {
let mems = vec![
catchup_memory("a", "2026-04-26T10:00:00Z"),
catchup_memory("b", "2026-04-26T10:00:01Z"),
catchup_memory("c", "2026-04-26T10:00:02Z"),
catchup_memory("d", "2026-04-26T10:00:03Z"),
catchup_memory("e", "2026-04-26T10:00:04Z"),
];
let latest_ts = mems.last().unwrap().updated_at.clone();
let (url, hits, last_since, last_peer) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
assert!(
last_since.lock().await.is_none(),
"first catchup must omit since"
);
assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
let lock = db.lock().await;
let clock =
crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some(latest_ts.as_str()),
"sync state advanced to latest pulled memory's updated_at"
);
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 5, "all five memories inserted");
}
#[tokio::test]
async fn test_catchup_once_no_new_memories_no_op() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"empty response must not advance sync_state"
);
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_catchup_once_peer_500_error_logged_no_panic() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"500 must not advance sync state"
);
}
#[tokio::test]
async fn test_catchup_once_peer_timeout_handled() {
let (url, hits, _, _) =
spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
let cfg = build_catchup_cfg(&url, 200);
let db = build_test_db();
let start = Instant::now();
catchup_once(&cfg, &db).await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(1500),
"catchup_once should honour the client timeout, took {elapsed:?}"
);
assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(clock.entries.get("peer-0").is_none());
}
#[tokio::test]
async fn test_catchup_once_malformed_response_handled() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"malformed body must not advance sync state"
);
}
#[tokio::test]
async fn test_catchup_once_inserts_only_newer_memories() {
let db = build_test_db();
{
let lock = db.lock().await;
let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
crate::db::insert_if_newer(&lock.0, &local).unwrap();
let cnt: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(cnt, 1, "pre-seeded shared row");
}
let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
stale_shared.content = "stale-from-catchup-peer".to_string();
stale_shared.id = "cat-shared-OLD".to_string();
let stale_shared_content = stale_shared.content.clone();
let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
stale_shared,
new_fresh,
]))
.await;
let cfg = build_catchup_cfg(&url, 2000);
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let cnt: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(cnt, 2, "fresh row inserted, shared kept");
let shared_content: String = lock
.0
.query_row(
"SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
[],
|r| r.get(0),
)
.unwrap();
assert_ne!(
shared_content, stale_shared_content,
"older catchup memory must NOT overwrite newer local row"
);
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some("2026-04-26T10:00:02Z"),
);
}
#[tokio::test(start_paused = true)]
async fn test_spawn_catchup_loop_runs_at_interval() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 5000);
let db = build_test_db();
let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
for _ in 0..6 {
tokio::time::advance(Duration::from_secs(1)).await;
tokio::task::yield_now().await;
}
for _ in 0..50 {
if hits.load(Ordering::Relaxed) >= 1 {
break;
}
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_millis(10)).await;
}
assert!(
hits.load(Ordering::Relaxed) >= 1,
"first catchup tick must hit the mock peer (got {})",
hits.load(Ordering::Relaxed),
);
handle.abort();
}
#[tokio::test]
async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(3600));
handle.abort();
let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
let join = result.expect("aborted handle must resolve within 500ms");
assert!(
join.is_err() && join.unwrap_err().is_cancelled(),
"handle.abort() must surface as is_cancelled() == true"
);
}
#[test]
fn test_build_config_mtls_with_valid_files() {
let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_cert.pem");
let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_key_pkcs8.pem");
assert!(cert.exists(), "missing test fixture: {cert:?}");
assert!(key.exists(), "missing test fixture: {key:?}");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&cert),
Some(&key),
None,
"ai:builder".to_string(),
);
let cfg = match result {
Ok(Some(c)) => c,
Ok(None) => panic!("expected Some(FederationConfig), got None"),
Err(e) => panic!("expected Ok, got Err: {e}"),
};
assert_eq!(cfg.peer_count(), 1);
}
#[test]
fn test_build_config_mtls_with_missing_files_returns_error() {
let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_cert.pem");
let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
assert!(cert.exists(), "missing test fixture: {cert:?}");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&cert),
Some(&bogus_key),
None,
"ai:builder".to_string(),
);
let err = match result {
Ok(_) => panic!("expected client-key read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --client-key"),
"expected client-key read error, got {msg:?}"
);
}
#[tokio::test]
async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
let target = format!("{url}/api/v1/sync/push");
let outcome = post_and_classify(&client, &target, &body, "mem-x", Some("mem-x")).await;
match outcome {
AckOutcome::Fail(reason) => {
assert!(
reason.contains("first:") && reason.contains("retry:"),
"expected both attempts in reason, got {reason:?}"
);
assert!(
reason.contains("http 500"),
"expected 5xx in reason, got {reason:?}"
);
}
other => panic!("expected AckOutcome::Fail, got {other:?}"),
}
assert_eq!(
count.load(Ordering::Relaxed),
2,
"first attempt + one retry = exactly two POSTs"
);
}
#[tokio::test]
async fn post_and_classify_id_drift_does_not_retry() {
let count = Arc::new(AtomicUsize::new(0));
let cnt_clone = count.clone();
let app = Router::new().route(
"/api/v1/sync/push",
post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
let c = cnt_clone.clone();
async move {
c.fetch_add(1, Ordering::Relaxed);
(
StatusCode::OK,
AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
)
}
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}/api/v1/sync/push");
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
let outcome = post_and_classify(&client, &url, &body, "mem-x", Some("mem-x")).await;
assert!(
matches!(outcome, AckOutcome::IdDrift),
"expected IdDrift, got {outcome:?}"
);
assert_eq!(
count.load(Ordering::Relaxed),
1,
"IdDrift must NOT trigger the retry path (only one POST)"
);
}
#[tokio::test]
async fn bulk_catchup_push_no_peers_is_noop() {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(500))
.build()
.unwrap();
let cfg = FederationConfig {
policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
.unwrap(),
peers: Vec::new(),
client,
sender_agent_id: "ai:no-peers".to_string(),
};
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert!(
errors.is_empty(),
"no-peers catchup must return empty error vec immediately, got {errors:?}"
);
}
#[tokio::test]
async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert_eq!(
errors.len(),
1,
"exactly one failing peer should be in errors, got {errors:?}"
);
let (peer_id, reason) = &errors[0];
assert!(
peer_id.starts_with("peer-1"),
"failing peer should be peer-1, got {peer_id}"
);
assert!(
reason.contains("http 500"),
"expected http 500 reason, got {reason}"
);
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn quorum_w1_local_commit_alone_is_sufficient() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 1, 1000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
assert_eq!(count, 1, "W=1 quorum returns local-only count");
}
#[test]
fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
let mut t = AckTracker::new(p3, Instant::now());
t.record_local();
assert!(
!t.is_quorum_met(Instant::now()),
"majority-of-3 needs more than local"
);
t.record_peer_ack("peer-a");
assert!(
t.is_quorum_met(Instant::now()),
"local + 1 peer ack = 2 = majority of 3"
);
let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
let mut t5 = AckTracker::new(p5, Instant::now());
t5.record_local();
t5.record_peer_ack("a");
assert!(
!t5.is_quorum_met(Instant::now()),
"majority-of-5 needs 3 acks"
);
t5.record_peer_ack("b");
assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
}
#[test]
fn quorum_policy_majority_rejects_zero() {
let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
match err {
QuorumError::InvalidPolicy { detail } => {
assert!(
detail.contains("n must be"),
"expected n>=1 message, got {detail}"
);
}
other => panic!("expected InvalidPolicy, got {other:?}"),
}
}
#[test]
fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
let result = FederationConfig::build(
2,
&[
"http://peer.example".to_string(),
"http://peer.example/".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:dup-test".to_string(),
);
let err = match result {
Ok(_) => panic!("trailing-slash dup must be rejected"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-peer error, got {msg}"
);
}
#[test]
fn config_build_rejects_duplicate_peers_differing_only_in_case() {
let result = FederationConfig::build(
2,
&[
"http://Peer.Example".to_string(),
"http://peer.example".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:dup-case-test".to_string(),
);
let err = match result {
Ok(_) => panic!("case-only dup must be rejected"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-peer error, got {msg}"
);
}
#[tokio::test]
async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let cfg = build_config(vec![url1, url2], 2, 200);
let start = Instant::now();
let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"archive_quorum must exit at deadline, took {elapsed:?}"
);
let err = finalise_quorum(&tracker).unwrap_err();
assert!(
matches!(err, QuorumError::QuorumNotMet { .. }),
"expected QuorumNotMet, got {err:?}"
);
}
#[tokio::test]
async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 100);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let err = finalise_quorum(&tracker).unwrap_err();
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 1, "only local commit");
assert_eq!(payload.needed, 2);
assert!(
payload.reason == "unreachable" || payload.reason == "timeout",
"expected unreachable/timeout, got {}",
payload.reason
);
}
#[tokio::test]
async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
let (url, hits, _, last_peer) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let cfg = FederationConfig {
policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
.unwrap(),
peers: vec![PeerEndpoint {
id: "peer-0".to_string(),
sync_push_url: url.clone(),
}],
client,
sender_agent_id: "ai:no-suffix".to_string(),
};
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
assert_eq!(
last_peer.lock().await.as_deref(),
Some("ai:no-suffix"),
"local agent id should be forwarded as ?peer="
);
}
#[tokio::test]
async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
bad.source = "made-up-source-not-in-allowlist".to_string();
let mems = vec![valid.clone(), bad];
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "only the valid memory should land");
let title: String = lock
.0
.query_row(
"SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(title, "ok-mem");
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some("2026-04-26T10:00:00Z"),
"sync_state tracks latest_ts of validate-passing rows"
);
}
#[test]
fn ack_tracker_record_peer_ack_is_idempotent() {
let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
.expect("policy");
let mut t = AckTracker::new(policy, Instant::now());
t.record_local();
t.record_peer_ack("peer-a");
t.record_peer_ack("peer-a"); assert!(t.is_quorum_met(Instant::now()));
t.record_peer_ack("peer-b");
assert!(t.is_quorum_met(Instant::now()));
}
#[tokio::test]
async fn catchup_once_body_without_memories_key_is_skipped() {
let app = Router::new().route(
"/api/v1/sync/since",
axum::routing::get(|| async {
(
StatusCode::OK,
AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
)
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}");
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 0, "no memories key → no inserts");
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"no memories key → sync_state untouched"
);
}
#[tokio::test]
async fn catchup_once_unparseable_individual_memory_is_skipped() {
let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
let app = Router::new().route(
"/api/v1/sync/since",
axum::routing::get(move || {
let valid = valid_mem.clone();
let bad = bad_mem.clone();
async move {
(
StatusCode::OK,
AxumJson(serde_json::json!({"memories": [valid, bad]})),
)
}
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}");
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "only parseable memory inserted");
}
#[tokio::test]
async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(
matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
"expected QuorumNotMet got=1, got {err:?}"
);
assert_eq!(
tracker.id_drift_count(),
2,
"both peers should be recorded as drift"
);
}
#[tokio::test]
async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn link_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let new_mem = sample_memory();
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let namespaces = vec!["app/team".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2, url3], 2, 2000);
let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
for _ in 0..100 {
if count1.load(Ordering::Relaxed) >= 1
&& count2.load(Ordering::Relaxed) >= 1
&& count3.load(Ordering::Relaxed) >= 1
{
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(
count3.load(Ordering::Relaxed) >= 1,
"failing peer must be reached by the detached fanout"
);
}
#[test]
fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
.expect("policy");
let now = Instant::now();
let mut t = AckTracker::new(policy, now);
t.record_local();
let err = t.finalise(now).unwrap_err();
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => {
assert_eq!(got, 1);
assert_eq!(needed, 2);
assert_eq!(
reason,
QuorumFailureReason::InFlight,
"pre-deadline insufficient-ack must classify as InFlight"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
}