pub mod identity;
pub mod peer;
pub mod peer_attestation;
#[cfg(feature = "sal")]
pub mod push_dlq;
pub mod quorum;
pub mod receive;
pub mod reflection_bookkeeping;
pub mod signing;
pub mod sync;
pub mod vector_clock;
pub use quorum::*;
pub use receive::spawn_catchup_loop;
#[cfg(feature = "sal")]
pub use receive::spawn_catchup_loop_with_store;
pub use receive::catchup_once_for_tests;
pub use sync::*;
#[cfg(feature = "sal")]
pub use push_dlq::{
FederationDlqSink, FederationPushDlqRow, REPLAY_BATCH_SIZE, replay_once,
spawn_replay_federation_push_dlq,
};
use crate::replication::QuorumPolicy;
pub(crate) const SYNC_TRACE_TARGET: &str = "ai_memory::federation::sync";
pub(crate) const SIGNING_TRACE_TARGET: &str = "federation::signing";
#[derive(Clone)]
pub struct FederationConfig {
pub policy: QuorumPolicy,
pub peers: Vec<PeerEndpoint>,
pub client: reqwest::Client,
pub sender_agent_id: String,
pub api_key: Option<String>,
pub signing_key: Option<std::sync::Arc<ed25519_dalek::SigningKey>>,
#[cfg(feature = "sal")]
pub dlq_sink: Option<std::sync::Arc<dyn push_dlq::FederationDlqSink>>,
}
#[derive(Clone, Debug)]
pub struct PeerEndpoint {
pub id: String,
pub sync_push_url: String,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ShippedEmbedding {
pub memory_id: String,
pub model: String,
pub dim: usize,
pub vector: Vec<f32>,
}
impl ShippedEmbedding {
#[must_use]
pub fn new(memory_id: String, model: String, vector: Vec<f32>) -> Self {
Self {
memory_id,
model,
dim: vector.len(),
vector,
}
}
}
pub const SHIPPED_VECTOR_NORM_TOLERANCE: f32 = 1e-3;
#[must_use]
pub fn sanitize_shipped_vector(vector: &[f32]) -> Option<Vec<f32>> {
if vector.is_empty() || vector.iter().any(|x| !x.is_finite()) {
return None;
}
let norm_sq: f32 = vector.iter().map(|x| x * x).sum();
if !norm_sq.is_finite() || norm_sq <= 0.0 {
return None;
}
let norm = norm_sq.sqrt();
if (norm - 1.0).abs() <= SHIPPED_VECTOR_NORM_TOLERANCE {
return Some(vector.to_vec());
}
let inv = 1.0 / norm;
Some(vector.iter().map(|x| x * inv).collect())
}
#[cfg(test)]
mod sanitize_shipped_vector_tests {
use super::sanitize_shipped_vector;
#[test]
fn rejects_non_finite_components() {
assert!(sanitize_shipped_vector(&[0.6, f32::NAN, 0.8]).is_none());
assert!(sanitize_shipped_vector(&[f32::INFINITY, 0.0]).is_none());
assert!(sanitize_shipped_vector(&[1.0, f32::NEG_INFINITY]).is_none());
}
#[test]
fn rejects_zero_and_empty() {
assert!(sanitize_shipped_vector(&[]).is_none());
assert!(sanitize_shipped_vector(&[0.0, 0.0, 0.0]).is_none());
}
#[test]
fn unit_norm_vector_passes_through() {
let v = vec![0.6_f32, 0.8]; let out = sanitize_shipped_vector(&v).expect("unit-norm accepted");
assert_eq!(out, v);
}
#[test]
fn high_magnitude_vector_is_normalized() {
let v = vec![30.0_f32, 40.0]; let out = sanitize_shipped_vector(&v).expect("finite vector normalized");
let norm: f32 = out.iter().map(|x| x * x).sum::<f32>().sqrt();
assert!(
(norm - 1.0).abs() < 1e-6,
"normalized to unit norm; got {norm}"
);
assert!((out[0] - 0.6).abs() < 1e-6 && (out[1] - 0.8).abs() < 1e-6);
}
}
#[cfg(test)]
mod tests {
use super::receive::{catchup_once, urlencoding_encode};
use super::sync::AckOutcome;
use super::*;
use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
use axum::Router;
use axum::extract::Json as AxumJson;
use axum::http::StatusCode;
use axum::routing::post;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::net::TcpListener;
use tokio::sync::Mutex;
fn sample_memory() -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: "fed-test".to_string(),
tier: crate::models::Tier::Mid,
namespace: "app".to_string(),
title: "hello".to_string(),
content: "world for federation test".to_string(),
tags: vec!["t".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id":"ai:test"}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[derive(Clone, Copy)]
enum MockBehaviour {
Ack,
Fail,
Hang,
FailThenAck {
fail_until: usize,
},
}
#[derive(Clone)]
struct MockState {
behaviour: MockBehaviour,
count: Arc<AtomicUsize>,
}
async fn mock_handler(
axum::extract::State(state): axum::extract::State<MockState>,
AxumJson(_body): AxumJson<serde_json::Value>,
) -> (StatusCode, AxumJson<serde_json::Value>) {
let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
match state.behaviour {
MockBehaviour::Ack => (
StatusCode::OK,
AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
),
MockBehaviour::Fail => (
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"stub failure"})),
),
MockBehaviour::Hang => {
tokio::time::sleep(Duration::from_secs(10)).await;
(StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
}
MockBehaviour::FailThenAck { fail_until } => {
if call <= fail_until {
(
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"stub transient failure"})),
)
} else {
(
StatusCode::OK,
AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
)
}
}
}
}
async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
let call_count = Arc::new(AtomicUsize::new(0));
let state = MockState {
behaviour,
count: call_count.clone(),
};
let app = Router::new()
.route("/api/v1/sync/push", post(mock_handler))
.with_state(state);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
(format!("http://{addr}"), call_count)
}
fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(timeout_ms))
.build()
.unwrap();
let n = 1 + peers.len();
FederationConfig {
policy: QuorumPolicy::new(
n,
w,
Duration::from_millis(timeout_ms),
Duration::from_secs(30),
)
.unwrap(),
peers: peers
.into_iter()
.enumerate()
.map(|(i, url)| PeerEndpoint {
id: format!("peer-{i}:{url}"),
sync_push_url: format!("{url}/api/v1/sync/push"),
})
.collect(),
client,
sender_agent_id: "ai:fed-test".to_string(),
api_key: None,
signing_key: None,
#[cfg(feature = "sal")]
dlq_sink: None,
}
}
#[tokio::test]
async fn happy_path_two_peers_quorum_met() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
assert!(result.is_ok(), "expected quorum met, got {result:?}");
let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
assert!(calls >= 1);
}
#[tokio::test]
async fn broadcast_emits_entry_line_log_for_track_d_grep() {
use tracing_subscriber::Registry;
use tracing_subscriber::layer::SubscriberExt;
#[derive(Clone, Default)]
struct CaptureLayer(Arc<std::sync::Mutex<Vec<String>>>);
impl<S: tracing::Subscriber> tracing_subscriber::Layer<S> for CaptureLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
struct Visit<'a>(&'a mut Vec<String>);
impl tracing::field::Visit for Visit<'_> {
fn record_debug(
&mut self,
field: &tracing::field::Field,
value: &dyn std::fmt::Debug,
) {
if field.name() == "message" {
self.0.push(format!("{value:?}"));
}
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "message" {
self.0.push(value.to_string());
}
}
}
let mut local: Vec<String> = Vec::new();
event.record(&mut Visit(&mut local));
if let Ok(mut buf) = self.0.lock() {
buf.extend(local);
}
}
}
let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1], 1, 1000);
let layer = CaptureLayer::default();
let messages = layer.0.clone();
let dispatch = tracing::Dispatch::new(Registry::default().with(layer));
{
let _guard = tracing::dispatcher::set_default(&dispatch);
let _ = broadcast_store_quorum(&cfg, &sample_memory())
.await
.expect("broadcast must succeed");
}
let captured = messages.lock().unwrap().clone();
let joined = captured.join("\n");
assert!(
joined.contains("federation::broadcast: store"),
"expected entry-line log `federation::broadcast: store ... -> 1 peer(s)`; got:\n{joined}"
);
}
#[tokio::test]
async fn post_quorum_fanout_reaches_all_peers() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 must receive the write post-quorum"
);
assert_eq!(
count2.load(Ordering::Relaxed),
1,
"peer-2 must receive the write post-quorum"
);
}
#[tokio::test]
async fn transient_peer_failure_is_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 acked first time, no retry"
);
assert_eq!(
count2.load(Ordering::Relaxed),
2,
"peer-2 must see exactly two attempts (first fail, retry ack)"
);
}
#[tokio::test]
async fn persistent_peer_failure_stops_after_one_retry() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(800)).await;
assert_eq!(
count2.load(Ordering::Relaxed),
2,
"persistently-failing peer must be called exactly twice (1 + 1 retry)"
);
}
#[tokio::test]
async fn bulk_catchup_push_hits_every_peer_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory(), sample_memory(), sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert!(
errors.is_empty(),
"catchup must succeed on healthy peers, got {errors:?}"
);
assert_eq!(
count1.load(Ordering::Relaxed),
1,
"peer-1 must receive exactly one catchup batch"
);
assert_eq!(
count2.load(Ordering::Relaxed),
1,
"peer-2 must receive exactly one catchup batch"
);
}
#[tokio::test]
async fn bulk_catchup_push_reports_peer_failures() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
assert!(
errors[0].1.contains("500") || errors[0].1.contains("http"),
"error must name the HTTP failure, got {:?}",
errors[0]
);
}
#[tokio::test]
async fn bulk_catchup_push_empty_inputs_are_noop() {
let cfg = build_config(vec![], 1, 500);
assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1], 1, 500);
assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
assert_eq!(
count1.load(Ordering::Relaxed),
0,
"no catchup POST must fire when the row set is empty"
);
}
#[tokio::test]
async fn partition_minority_fails_quorum() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1, "local commit only");
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn timeout_on_hanging_peer_classified_timeout() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let cfg = build_config(vec![url1], 2, 200);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { reason, .. } => {
assert!(
matches!(
reason,
QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
),
"unexpected reason {reason:?}"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn majority_quorum_tolerates_one_peer_down() {
let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url_up, url_down], 2, 2000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
assert!(
result.is_ok(),
"majority should tolerate 1 peer down, got {result:?}"
);
}
#[test]
fn config_build_disabled_when_w_zero() {
let cfg = FederationConfig::build(
0,
&["http://example.com".to_string()],
Duration::from_millis(500),
None,
None,
None,
"ai:test".to_string(),
None,
)
.unwrap();
assert!(cfg.is_none());
}
#[test]
fn config_build_disabled_when_peers_empty() {
let cfg = FederationConfig::build(
2,
&[],
Duration::from_millis(500),
None,
None,
None,
"ai:test".to_string(),
None,
)
.unwrap();
assert!(cfg.is_none());
}
#[test]
fn quorum_not_met_payload_from_err() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 3,
reason: QuorumFailureReason::Timeout,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 1);
assert_eq!(payload.needed, 3);
assert_eq!(payload.reason, "timeout");
}
#[tokio::test]
async fn archive_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
let result = finalise_quorum(&tracker);
assert!(result.is_ok(), "expected quorum met, got {result:?}");
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn archive_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1);
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn delete_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn delete_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, needed, .. } => {
assert_eq!(got, 1);
assert_eq!(needed, 3);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn restore_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn restore_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_link() -> MemoryLink {
MemoryLink {
source_id: "mem-a".to_string(),
target_id: "mem-b".to_string(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
signature: None,
observed_by: None,
valid_from: None,
valid_until: None,
attest_level: None,
}
}
#[tokio::test]
async fn link_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn link_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[tokio::test]
async fn consolidate_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let new_mem = sample_memory();
let sources = vec!["src-a".to_string(), "src-b".to_string()];
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn consolidate_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let new_mem = sample_memory();
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_pending() -> PendingAction {
PendingAction {
id: "pa-1".to_string(),
action_type: "delete".to_string(),
memory_id: Some("mem-x".to_string()),
namespace: "app".to_string(),
payload: serde_json::json!({}),
requested_by: "ai:test".to_string(),
requested_at: chrono::Utc::now().to_rfc3339(),
status: "pending".to_string(),
decided_by: None,
decided_at: None,
approvals: vec![],
}
}
#[tokio::test]
async fn pending_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn pending_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_decision() -> PendingDecision {
PendingDecision {
id: "pa-1".to_string(),
approved: true,
decider: "ai:approver".to_string(),
}
}
#[tokio::test]
async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn pending_decision_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
fn sample_namespace_meta() -> NamespaceMetaEntry {
NamespaceMetaEntry {
namespace: "app/team".to_string(),
standard_id: "mem-std-1".to_string(),
parent_namespace: Some("app".to_string()),
updated_at: chrono::Utc::now().to_rfc3339(),
}
}
#[tokio::test]
async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn namespace_meta_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[tokio::test]
async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
assert!(finalise_quorum(&tracker).is_ok());
for _ in 0..20 {
if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn namespace_meta_clear_quorum_partition_minority_fails() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 3, 500);
let namespaces = vec!["app/team".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[test]
fn quorum_not_met_payload_unreachable_reason() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::Unreachable,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "unreachable");
}
#[test]
fn quorum_not_met_payload_id_drift_reason() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::IdDrift,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "id_drift");
}
#[test]
fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
let err = QuorumError::QuorumNotMet {
got: 1,
needed: 2,
reason: QuorumFailureReason::InFlight,
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.reason, "timeout");
}
#[test]
fn quorum_not_met_payload_invalid_policy_branch() {
let err = QuorumError::InvalidPolicy {
detail: "bad-thing".to_string(),
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 0);
assert_eq!(payload.needed, 0);
assert!(payload.reason.starts_with("invalid_policy:"));
assert!(payload.reason.contains("bad-thing"));
}
#[test]
fn quorum_not_met_payload_local_write_failed_branch() {
let err = QuorumError::LocalWriteFailed {
detail: "disk-full".to_string(),
};
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert!(payload.reason.starts_with("local_write_failed:"));
assert!(payload.reason.contains("disk-full"));
}
#[test]
fn config_build_constructs_when_w_and_peers_set() {
let cfg = FederationConfig::build(
2,
&[
"http://peer-a.example/".to_string(),
"http://peer-b.example".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:builder".to_string(),
None,
)
.unwrap()
.expect("config should be Some when w>0 and peers nonempty");
assert_eq!(cfg.peer_count(), 2);
assert_eq!(cfg.peers[0].id, "peer-0");
assert_eq!(cfg.peers[1].id, "peer-1");
assert_eq!(
cfg.peers[0].sync_push_url,
"http://peer-a.example/api/v1/sync/push"
);
assert_eq!(
cfg.peers[1].sync_push_url,
"http://peer-b.example/api/v1/sync/push"
);
assert_eq!(cfg.sender_agent_id, "ai:builder");
}
#[test]
fn config_build_rejects_duplicate_peer_urls() {
let result = FederationConfig::build(
2,
&[
"http://peer.example".to_string(),
"http://peer.example/".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:builder".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("expected duplicate-URL rejection"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-URL rejection, got {msg:?}"
);
}
#[test]
fn config_build_rejects_missing_ca_cert_path() {
let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
None,
None,
Some(&bogus),
"ai:builder".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("expected ca-cert read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --quorum-ca-cert"),
"expected ca-cert read error, got {msg:?}"
);
}
#[test]
fn config_build_rejects_invalid_ca_cert_pem() {
let dir = tempfile::tempdir().unwrap();
let bad = dir.path().join("not-a-cert.pem");
std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
None,
None,
Some(&bad),
"ai:builder".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("expected ca-cert parse error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
"expected ca-cert parse error, got {msg:?}"
);
}
#[test]
fn config_build_rejects_missing_client_cert_path() {
let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&bogus_cert),
Some(&bogus_key),
None,
"ai:builder".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("expected client-cert read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --client-cert"),
"expected client-cert read error, got {msg:?}"
);
}
#[test]
fn peer_count_matches_peer_list() {
let cfg = build_config(
vec![
"http://a.example".to_string(),
"http://b.example".to_string(),
"http://c.example".to_string(),
],
2,
500,
);
assert_eq!(cfg.peer_count(), 3);
}
#[test]
fn urlencoding_encode_passthrough_safe_chars() {
let encoded = urlencoding_encode("abcXYZ-09_.~");
assert_eq!(encoded, "abcXYZ-09_.~");
}
#[test]
fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
assert!(
encoded.contains("%3A"),
"expected colon to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%2B"),
"expected + to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%2F"),
"expected / to be percent-encoded: {encoded}"
);
assert!(
encoded.contains("%20"),
"expected space to be percent-encoded: {encoded}"
);
assert!(
!encoded.contains("%2D"),
"hyphen must pass through unencoded: {encoded}"
);
}
#[test]
fn urlencoding_encode_empty_string() {
assert_eq!(urlencoding_encode(""), "");
}
async fn id_drift_handler(
AxumJson(_body): AxumJson<serde_json::Value>,
) -> (StatusCode, AxumJson<serde_json::Value>) {
(
StatusCode::OK,
AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
)
}
async fn spawn_id_drift_peer() -> String {
let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
format!("http://{addr}")
}
#[tokio::test]
async fn id_drift_peer_does_not_count_as_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let result = finalise_quorum(&tracker);
let err = result.unwrap_err();
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => {
assert_eq!(got, 1, "only local should count");
assert_eq!(needed, 2);
assert!(
matches!(
reason,
QuorumFailureReason::IdDrift
| QuorumFailureReason::Timeout
| QuorumFailureReason::InFlight
),
"expected IdDrift / Timeout / InFlight, got {reason:?}"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[derive(Clone)]
enum SinceMockBehaviour {
ReturnMemories(Vec<Memory>),
Error500,
Hang(Duration),
MalformedBody,
}
#[derive(Clone)]
struct SinceMockState {
behaviour: SinceMockBehaviour,
hits: Arc<AtomicUsize>,
last_since: Arc<Mutex<Option<String>>>,
last_peer: Arc<Mutex<Option<String>>>,
}
async fn since_handler(
axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
axum::extract::State(state): axum::extract::State<SinceMockState>,
) -> axum::response::Response {
use axum::response::IntoResponse;
state.hits.fetch_add(1, Ordering::Relaxed);
{
let mut s = state.last_since.lock().await;
*s = q.get("since").cloned();
}
{
let mut p = state.last_peer.lock().await;
*p = q.get("peer").cloned();
}
match &state.behaviour {
SinceMockBehaviour::ReturnMemories(mems) => {
let body = serde_json::json!({"memories": mems});
(StatusCode::OK, AxumJson(body)).into_response()
}
SinceMockBehaviour::Error500 => (
StatusCode::INTERNAL_SERVER_ERROR,
AxumJson(serde_json::json!({"error":"oops"})),
)
.into_response(),
SinceMockBehaviour::Hang(d) => {
tokio::time::sleep(*d).await;
(
StatusCode::OK,
AxumJson(serde_json::json!({"memories": []})),
)
.into_response()
}
SinceMockBehaviour::MalformedBody => {
(
[(axum::http::header::CONTENT_TYPE, crate::MIME_JSON)],
"this is not json {{{",
)
.into_response()
}
}
}
async fn spawn_since_peer(
behaviour: SinceMockBehaviour,
) -> (
String,
Arc<AtomicUsize>,
Arc<Mutex<Option<String>>>,
Arc<Mutex<Option<String>>>,
) {
let hits = Arc::new(AtomicUsize::new(0));
let last_since = Arc::new(Mutex::new(None));
let last_peer = Arc::new(Mutex::new(None));
let state = SinceMockState {
behaviour,
hits: hits.clone(),
last_since: last_since.clone(),
last_peer: last_peer.clone(),
};
let app = Router::new()
.route("/api/v1/sync/since", axum::routing::get(since_handler))
.with_state(state);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
(format!("http://{addr}"), hits, last_since, last_peer)
}
fn build_test_db() -> crate::handlers::Db {
let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
let path = std::path::PathBuf::from(":memory:");
Arc::new(Mutex::new((
conn,
path,
crate::config::ResolvedTtl::default(),
true,
)))
}
fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(timeout_ms))
.build()
.unwrap();
FederationConfig {
policy: QuorumPolicy::new(
2,
1,
Duration::from_millis(timeout_ms),
Duration::from_secs(30),
)
.unwrap(),
peers: vec![PeerEndpoint {
id: "peer-0".to_string(),
sync_push_url: format!("{peer_url}/api/v1/sync/push"),
}],
client,
sender_agent_id: "ai:catchup-test".to_string(),
api_key: None,
signing_key: None,
#[cfg(feature = "sal")]
dlq_sink: None,
}
}
fn catchup_memory(title: &str, updated_at: &str) -> Memory {
Memory {
id: format!("cat-{title}"),
tier: crate::models::Tier::Mid,
namespace: "catchup".to_string(),
title: title.to_string(),
content: format!("content for {title}"),
tags: vec!["catchup".to_string()],
priority: 5,
confidence: 1.0,
source: "system".to_string(),
access_count: 0,
created_at: updated_at.to_string(),
updated_at: updated_at.to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({
"agent_id": "ai:peer-0",
"scope": "collective",
}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[tokio::test]
async fn test_catchup_once_pulls_since_cursor_advances_state() {
let mems = vec![
catchup_memory("a", "2026-04-26T10:00:00Z"),
catchup_memory("b", "2026-04-26T10:00:01Z"),
catchup_memory("c", "2026-04-26T10:00:02Z"),
catchup_memory("d", "2026-04-26T10:00:03Z"),
catchup_memory("e", "2026-04-26T10:00:04Z"),
];
let latest_ts = mems.last().unwrap().updated_at.clone();
let (url, hits, last_since, last_peer) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
assert!(
last_since.lock().await.is_none(),
"first catchup must omit since"
);
assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
let lock = db.lock().await;
let clock =
crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some(latest_ts.as_str()),
"sync state advanced to latest pulled memory's updated_at"
);
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 5, "all five memories inserted");
}
#[tokio::test]
async fn test_catchup_once_no_new_memories_no_op() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"empty response must not advance sync_state"
);
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_catchup_once_peer_500_error_logged_no_panic() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"500 must not advance sync state"
);
}
#[tokio::test]
async fn test_catchup_once_peer_timeout_handled() {
let (url, hits, _, _) =
spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
let cfg = build_catchup_cfg(&url, 200);
let db = build_test_db();
let start = Instant::now();
catchup_once(&cfg, &db).await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(1500),
"catchup_once should honour the client timeout, took {elapsed:?}"
);
assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(clock.entries.get("peer-0").is_none());
}
#[tokio::test]
async fn test_catchup_once_malformed_response_handled() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"malformed body must not advance sync state"
);
}
#[tokio::test]
async fn test_catchup_once_inserts_only_newer_memories() {
let db = build_test_db();
{
let lock = db.lock().await;
let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
crate::db::insert_if_newer(&lock.0, &local).unwrap();
let cnt: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(cnt, 1, "pre-seeded shared row");
}
let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
stale_shared.content = "stale-from-catchup-peer".to_string();
stale_shared.id = "cat-shared-OLD".to_string();
let stale_shared_content = stale_shared.content.clone();
let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
stale_shared,
new_fresh,
]))
.await;
let cfg = build_catchup_cfg(&url, 2000);
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let cnt: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(cnt, 2, "fresh row inserted, shared kept");
let shared_content: String = lock
.0
.query_row(
"SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
[],
|r| r.get(0),
)
.unwrap();
assert_ne!(
shared_content, stale_shared_content,
"older catchup memory must NOT overwrite newer local row"
);
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some("2026-04-26T10:00:02Z"),
);
}
#[tokio::test(start_paused = true)]
async fn test_spawn_catchup_loop_runs_at_interval() {
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 5000);
let db = build_test_db();
let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
for _ in 0..6 {
tokio::time::advance(Duration::from_secs(1)).await;
tokio::task::yield_now().await;
}
for _ in 0..50 {
if hits.load(Ordering::Relaxed) >= 1 {
break;
}
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_millis(10)).await;
}
assert!(
hits.load(Ordering::Relaxed) >= 1,
"first catchup tick must hit the mock peer (got {})",
hits.load(Ordering::Relaxed),
);
handle.abort();
}
#[tokio::test]
async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(crate::SECS_PER_HOUR as u64));
handle.abort();
let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
let join = result.expect("aborted handle must resolve within 500ms");
assert!(
join.is_err() && join.unwrap_err().is_cancelled(),
"handle.abort() must surface as is_cancelled() == true"
);
}
#[test]
fn test_build_config_mtls_with_valid_files() {
let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_cert.pem");
let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_key_pkcs8.pem");
assert!(cert.exists(), "missing test fixture: {cert:?}");
assert!(key.exists(), "missing test fixture: {key:?}");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&cert),
Some(&key),
None,
"ai:builder".to_string(),
None,
);
let cfg = match result {
Ok(Some(c)) => c,
Ok(None) => panic!("expected Some(FederationConfig), got None"),
Err(e) => panic!("expected Ok, got Err: {e}"),
};
assert_eq!(cfg.peer_count(), 1);
}
#[test]
fn test_build_config_mtls_with_missing_files_returns_error() {
let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tls/valid_cert.pem");
let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
assert!(cert.exists(), "missing test fixture: {cert:?}");
let result = FederationConfig::build(
2,
&["http://peer.example".to_string()],
Duration::from_millis(500),
Some(&cert),
Some(&bogus_key),
None,
"ai:builder".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("expected client-key read error"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("read --client-key"),
"expected client-key read error, got {msg:?}"
);
}
#[tokio::test]
async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
let target = format!("{url}/api/v1/sync/push");
let outcome =
post_and_classify(&client, &target, &body, "mem-x", Some("mem-x"), None, None).await;
match outcome {
AckOutcome::Fail(reason) => {
assert!(
reason.contains("first:") && reason.contains("retry:"),
"expected both attempts in reason, got {reason:?}"
);
assert!(
reason.contains("http 500"),
"expected 5xx in reason, got {reason:?}"
);
}
other => panic!("expected AckOutcome::Fail, got {other:?}"),
}
assert_eq!(
count.load(Ordering::Relaxed),
2,
"first attempt + one retry = exactly two POSTs"
);
}
#[tokio::test]
async fn post_and_classify_id_drift_does_not_retry() {
let count = Arc::new(AtomicUsize::new(0));
let cnt_clone = count.clone();
let app = Router::new().route(
"/api/v1/sync/push",
post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
let c = cnt_clone.clone();
async move {
c.fetch_add(1, Ordering::Relaxed);
(
StatusCode::OK,
AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
)
}
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}/api/v1/sync/push");
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
let outcome =
post_and_classify(&client, &url, &body, "mem-x", Some("mem-x"), None, None).await;
assert!(
matches!(outcome, AckOutcome::IdDrift),
"expected IdDrift, got {outcome:?}"
);
assert_eq!(
count.load(Ordering::Relaxed),
1,
"IdDrift must NOT trigger the retry path (only one POST)"
);
}
#[tokio::test]
async fn bulk_catchup_push_no_peers_is_noop() {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(500))
.build()
.unwrap();
let cfg = FederationConfig {
policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
.unwrap(),
peers: Vec::new(),
client,
sender_agent_id: "ai:no-peers".to_string(),
api_key: None,
signing_key: None,
#[cfg(feature = "sal")]
dlq_sink: None,
};
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert!(
errors.is_empty(),
"no-peers catchup must return empty error vec immediately, got {errors:?}"
);
}
#[tokio::test]
async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mems = vec![sample_memory()];
let errors = bulk_catchup_push(&cfg, &mems).await;
assert_eq!(
errors.len(),
1,
"exactly one failing peer should be in errors, got {errors:?}"
);
let (peer_id, reason) = &errors[0];
assert!(
peer_id.starts_with("peer-1"),
"failing peer should be peer-1, got {peer_id}"
);
assert!(
reason.contains("http 500"),
"expected http 500 reason, got {reason}"
);
assert_eq!(count1.load(Ordering::Relaxed), 1);
assert_eq!(count2.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn quorum_w1_local_commit_alone_is_sufficient() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 1, 1000);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
assert_eq!(count, 1, "W=1 quorum returns local-only count");
}
#[test]
fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
let mut t = AckTracker::new(p3, Instant::now());
t.record_local();
assert!(
!t.is_quorum_met(Instant::now()),
"majority-of-3 needs more than local"
);
t.record_peer_ack("peer-a");
assert!(
t.is_quorum_met(Instant::now()),
"local + 1 peer ack = 2 = majority of 3"
);
let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
let mut t5 = AckTracker::new(p5, Instant::now());
t5.record_local();
t5.record_peer_ack("a");
assert!(
!t5.is_quorum_met(Instant::now()),
"majority-of-5 needs 3 acks"
);
t5.record_peer_ack("b");
assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
}
#[test]
fn quorum_policy_majority_rejects_zero() {
let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
match err {
QuorumError::InvalidPolicy { detail } => {
assert!(
detail.contains("n must be"),
"expected n>=1 message, got {detail}"
);
}
other => panic!("expected InvalidPolicy, got {other:?}"),
}
}
#[test]
fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
let result = FederationConfig::build(
2,
&[
"http://peer.example".to_string(),
"http://peer.example/".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:dup-test".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("trailing-slash dup must be rejected"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-peer error, got {msg}"
);
}
#[test]
fn config_build_rejects_duplicate_peers_differing_only_in_case() {
let result = FederationConfig::build(
2,
&[
"http://Peer.Example".to_string(),
"http://peer.example".to_string(),
],
Duration::from_millis(500),
None,
None,
None,
"ai:dup-case-test".to_string(),
None,
);
let err = match result {
Ok(_) => panic!("case-only dup must be rejected"),
Err(e) => e,
};
let msg = format!("{err}");
assert!(
msg.contains("duplicate peer URL"),
"expected duplicate-peer error, got {msg}"
);
}
#[tokio::test]
async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
let cfg = build_config(vec![url1, url2], 2, 200);
let start = Instant::now();
let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"archive_quorum must exit at deadline, took {elapsed:?}"
);
let err = finalise_quorum(&tracker).unwrap_err();
assert!(
matches!(err, QuorumError::QuorumNotMet { .. }),
"expected QuorumNotMet, got {err:?}"
);
}
#[tokio::test]
async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2], 2, 100);
let tracker = broadcast_store_quorum(&cfg, &sample_memory())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let err = finalise_quorum(&tracker).unwrap_err();
let payload = QuorumNotMetPayload::from_err(&err);
assert_eq!(payload.error, "quorum_not_met");
assert_eq!(payload.got, 1, "only local commit");
assert_eq!(payload.needed, 2);
assert!(
payload.reason == "unreachable" || payload.reason == "timeout",
"expected unreachable/timeout, got {}",
payload.reason
);
}
#[tokio::test]
async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
let (url, hits, _, last_peer) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(2000))
.build()
.unwrap();
let cfg = FederationConfig {
policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
.unwrap(),
peers: vec![PeerEndpoint {
id: "peer-0".to_string(),
sync_push_url: url.clone(),
}],
client,
sender_agent_id: "ai:no-suffix".to_string(),
api_key: None,
signing_key: None,
#[cfg(feature = "sal")]
dlq_sink: None,
};
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
assert_eq!(
last_peer.lock().await.as_deref(),
Some("ai:no-suffix"),
"local agent id should be forwarded as ?peer="
);
}
#[tokio::test]
async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
bad.source = "made-up-source-not-in-allowlist".to_string();
let mems = vec![valid.clone(), bad];
let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "only the valid memory should land");
let title: String = lock
.0
.query_row(
"SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(title, "ok-mem");
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some("2026-04-26T10:00:00Z"),
"sync_state tracks latest_ts of validate-passing rows"
);
}
#[tokio::test]
async fn l11_catchup_preserves_original_agent_id_through_replication() {
let mut alice_mem = catchup_memory("alice-note", "2026-05-10T10:00:00Z");
alice_mem.metadata = serde_json::json!({
"agent_id": "ai:alice@plan-c",
"shared": "alice wrote this"
});
let (url, hits, _, _) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![alice_mem.clone()])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
assert_eq!(hits.load(Ordering::Relaxed), 1, "catchup should hit once");
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "alice's row must land on the receiver");
let (raw_metadata,): (String,) = lock
.0
.query_row(
"SELECT metadata FROM memories WHERE title='alice-note'",
[],
|r| Ok((r.get(0)?,)),
)
.unwrap();
let stored: serde_json::Value = serde_json::from_str(&raw_metadata).unwrap();
assert_eq!(
stored.get("agent_id").and_then(serde_json::Value::as_str),
Some("ai:alice@plan-c"),
"agent_id must survive federation replication verbatim — \
observed rewrite to receiver identity is the L11 NHI-D \
regression"
);
assert_eq!(
stored.get("shared").and_then(serde_json::Value::as_str),
Some("alice wrote this"),
"sibling metadata fields must round-trip alongside agent_id"
);
}
#[test]
fn ack_tracker_record_peer_ack_is_idempotent() {
let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
.expect("policy");
let mut t = AckTracker::new(policy, Instant::now());
t.record_local();
t.record_peer_ack("peer-a");
t.record_peer_ack("peer-a"); assert!(t.is_quorum_met(Instant::now()));
t.record_peer_ack("peer-b");
assert!(t.is_quorum_met(Instant::now()));
}
#[tokio::test]
async fn catchup_once_body_without_memories_key_is_skipped() {
let app = Router::new().route(
"/api/v1/sync/since",
axum::routing::get(|| async {
(
StatusCode::OK,
AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
)
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}");
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 0, "no memories key → no inserts");
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert!(
clock.entries.get("peer-0").is_none(),
"no memories key → sync_state untouched"
);
}
#[tokio::test]
async fn catchup_once_unparseable_individual_memory_is_skipped() {
let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
let app = Router::new().route(
"/api/v1/sync/since",
axum::routing::get(move || {
let valid = valid_mem.clone();
let bad = bad_mem.clone();
async move {
(
StatusCode::OK,
AxumJson(serde_json::json!({"memories": [valid, bad]})),
)
}
}),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let url = format!("http://{addr}");
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once(&cfg, &db).await;
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "only parseable memory inserted");
}
#[tokio::test]
async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(
matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
"expected QuorumNotMet got=1, got {err:?}"
);
assert_eq!(
tracker.id_drift_count(),
2,
"both peers should be recorded as drift"
);
}
#[tokio::test]
async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn link_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let new_mem = sample_memory();
let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
let url1 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1], 2, 1000);
let namespaces = vec!["app/team".to_string()];
let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
assert_eq!(tracker.id_drift_count(), 1);
}
#[tokio::test]
async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
let cfg = build_config(vec![url1, url2, url3], 2, 2000);
let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
for _ in 0..100 {
if count1.load(Ordering::Relaxed) >= 1
&& count2.load(Ordering::Relaxed) >= 1
&& count3.load(Ordering::Relaxed) >= 1
{
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(
count3.load(Ordering::Relaxed) >= 1,
"failing peer must be reached by the detached fanout"
);
}
#[test]
fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
.expect("policy");
let now = Instant::now();
let mut t = AckTracker::new(policy, now);
t.record_local();
let err = t.finalise(now).unwrap_err();
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => {
assert_eq!(got, 1);
assert_eq!(needed, 2);
assert_eq!(
reason,
QuorumFailureReason::InFlight,
"pre-deadline insufficient-ack must classify as InFlight"
);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn delete_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_delete_quorum(&cfg, "mem-del-retry")
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
count2.load(Ordering::Relaxed),
2,
"transient failure must retry"
);
}
#[tokio::test]
async fn archive_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_archive_quorum(&cfg, "mem-arc-retry")
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn restore_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_restore_quorum(&cfg, "mem-res-retry")
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn link_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn consolidate_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let mem = sample_memory();
let sources = vec!["src-1".to_string(), "src-2".to_string()];
let _tracker = broadcast_consolidate_quorum(&cfg, &mem, &sources)
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn pending_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_pending_quorum(&cfg, &sample_pending())
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn pending_decision_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn namespace_meta_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let _tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn namespace_meta_clear_quorum_transient_peer_failure_retried_once() {
let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
let cfg = build_config(vec![url1, url2], 2, 2000);
let namespaces = vec!["ns/x".to_string()];
let _tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
.await
.unwrap();
for _ in 0..200 {
if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(count2.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn delete_quorum_id_drift_does_not_count_as_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_delete_quorum(&cfg, "mem-del-drift")
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
match err {
QuorumError::QuorumNotMet { got, .. } => assert_eq!(got, 1),
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[tokio::test]
async fn archive_quorum_id_drift_does_not_count_as_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_archive_quorum(&cfg, "mem-arc-drift")
.await
.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[tokio::test]
async fn link_quorum_id_drift_does_not_count_as_ack() {
let url1 = spawn_id_drift_peer().await;
let url2 = spawn_id_drift_peer().await;
let cfg = build_config(vec![url1, url2], 2, 1000);
let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
let err = finalise_quorum(&tracker).unwrap_err();
assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn catchup_once_with_store_applies_via_sal_handle() {
use super::receive::catchup_once_with_store;
use crate::store::MemoryStore;
let mem = catchup_memory("sal-applied", "2026-04-26T10:00:00Z");
let (url, hits, _, _) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem.clone()])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
let dir = tempfile::tempdir().expect("tempdir");
let store_path = dir.path().join("store.db");
let store: Arc<dyn MemoryStore> = Arc::new(
crate::store::sqlite::SqliteStore::open(&store_path).expect("open SqliteStore"),
);
catchup_once_with_store(&cfg, &db, Some(&store)).await;
assert_eq!(hits.load(Ordering::Relaxed), 1, "peer must be hit once");
let ctx = crate::store::CallerContext::for_agent("test");
let got = store
.get(&ctx, &mem.id)
.await
.expect("SAL store should have the catchup memory");
assert_eq!(got.title, "sal-applied");
let lock = db.lock().await;
let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
assert_eq!(
clock.entries.get("peer-0").map(String::as_str),
Some("2026-04-26T10:00:00Z"),
);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn catchup_once_with_store_none_uses_legacy_rusqlite() {
use super::receive::catchup_once_with_store;
let mem = catchup_memory("legacy-applied", "2026-04-26T10:00:00Z");
let (url, hits, _, _) =
spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem])).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
catchup_once_with_store(&cfg, &db, None).await;
assert_eq!(hits.load(Ordering::Relaxed), 1);
let lock = db.lock().await;
let count: i64 = lock
.0
.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
.unwrap();
assert_eq!(count, 1, "legacy path must insert the row locally");
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn catchup_once_with_store_skips_invalid_memory_via_sal_path() {
use super::receive::catchup_once_with_store;
let valid = catchup_memory("sal-valid", "2026-04-26T10:00:00Z");
let mut bad = catchup_memory("sal-bad", "2026-04-26T10:00:01Z");
bad.source = "not-in-allowlist".to_string();
let mems = vec![valid.clone(), bad];
let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
let cfg = build_catchup_cfg(&url, 2000);
let db = build_test_db();
let dir = tempfile::tempdir().expect("tempdir");
let store: Arc<dyn crate::store::MemoryStore> = Arc::new(
crate::store::sqlite::SqliteStore::open(dir.path().join("store.db"))
.expect("open SqliteStore"),
);
catchup_once_with_store(&cfg, &db, Some(&store)).await;
let ctx = crate::store::CallerContext::for_agent("test");
assert!(
store.get(&ctx, &valid.id).await.is_ok(),
"valid memory must land via SAL store"
);
}
}