1use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use serde::{Deserialize, Serialize};
41use tokio::sync::Mutex;
42use tokio::task::JoinSet;
43
44use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
45use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
46
47#[derive(Clone)]
50pub struct FederationConfig {
51 pub policy: QuorumPolicy,
52 pub peers: Vec<PeerEndpoint>,
53 pub client: reqwest::Client,
54 pub sender_agent_id: String,
55}
56
57#[derive(Clone, Debug)]
60pub struct PeerEndpoint {
61 pub id: String,
62 pub sync_push_url: String,
63}
64
65impl FederationConfig {
66 pub fn build(
75 quorum_writes: usize,
76 peer_urls: &[String],
77 timeout: Duration,
78 client_cert_path: Option<&std::path::Path>,
79 client_key_path: Option<&std::path::Path>,
80 ca_cert_path: Option<&std::path::Path>,
81 sender_agent_id: String,
82 ) -> anyhow::Result<Option<Self>> {
83 if quorum_writes == 0 || peer_urls.is_empty() {
84 return Ok(None);
85 }
86 let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
92 for raw in peer_urls {
93 let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
94 if !seen_urls.insert(normalized.clone()) {
95 return Err(anyhow::anyhow!(
96 "duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
97 — duplicates would let a single peer contribute to quorum more than once"
98 ));
99 }
100 }
101 let n = 1 + peer_urls.len(); let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
103 .map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
104 let peers: Vec<PeerEndpoint> = peer_urls
105 .iter()
106 .enumerate()
107 .map(|(i, raw)| {
108 let trimmed = raw.trim_end_matches('/');
113 tracing::debug!(
114 target = "federation",
115 peer_index = i,
116 url = trimmed,
117 "registered peer"
118 );
119 PeerEndpoint {
120 id: format!("peer-{i}"),
121 sync_push_url: format!("{trimmed}/api/v1/sync/push"),
122 }
123 })
124 .collect();
125
126 let mut client_builder = reqwest::Client::builder()
144 .timeout(timeout)
145 .connect_timeout(Duration::from_secs(2))
146 .use_rustls_tls();
147 if let Some(ca_path) = ca_cert_path {
155 let ca_pem = std::fs::read(ca_path)
156 .map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
157 let ca = reqwest::Certificate::from_pem(&ca_pem)
158 .map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
159 client_builder = client_builder.add_root_certificate(ca);
160 }
161 if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
162 let cert_pem =
163 std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
164 let key_pem =
165 std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
166 let mut pem = cert_pem;
167 pem.extend_from_slice(b"\n");
168 pem.extend_from_slice(&key_pem);
169 let identity = reqwest::Identity::from_pem(&pem)
170 .map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
171 client_builder = client_builder.identity(identity);
172 }
173 let client = client_builder
174 .build()
175 .map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
176
177 Ok(Some(Self {
178 policy,
179 peers,
180 client,
181 sender_agent_id,
182 }))
183 }
184
185 #[must_use]
188 pub fn peer_count(&self) -> usize {
189 self.peers.len()
190 }
191}
192
193pub async fn broadcast_store_quorum(
202 config: &FederationConfig,
203 mem: &Memory,
204) -> Result<AckTracker, QuorumError> {
205 let now = Instant::now();
206 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
207 tracker.lock().await.record_local();
208
209 let body = serde_json::json!({
210 "sender_agent_id": config.sender_agent_id,
211 "memories": [mem],
212 "dry_run": false,
213 });
214
215 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
216 for peer in &config.peers {
217 let client = config.client.clone();
218 let url = peer.sync_push_url.clone();
219 let id = peer.id.clone();
220 let mem_id = mem.id.clone();
221 let payload = body.clone();
222 joins.spawn(async move {
223 let outcome = post_and_classify(&client, &url, &payload, &mem_id, Some(&mem_id)).await;
224 (id, outcome)
225 });
226 }
227
228 let deadline = now + config.policy.ack_timeout;
234 loop {
235 let remaining = deadline.saturating_duration_since(Instant::now());
236 if remaining.is_zero() {
237 break;
238 }
239 match tokio::time::timeout(remaining, joins.join_next()).await {
240 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
241 tracker.lock().await.record_peer_ack(peer_id);
242 }
243 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
244 tracker.lock().await.record_id_drift(peer_id);
245 }
246 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
247 tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
248 }
249 Ok(Some(Err(e))) => {
250 tracing::warn!("federation: peer join error: {e}");
251 }
252 Ok(None) | Err(_) => break, }
254 if tracker.lock().await.is_quorum_met(Instant::now()) {
257 break;
258 }
259 }
260
261 if !joins.is_empty() {
278 let mem_id = mem.id.clone();
283 tokio::spawn(async move {
284 while let Some(res) = joins.join_next().await {
285 match res {
286 Ok((peer_id, AckOutcome::Ack)) => {
287 tracing::debug!("federation: post-quorum ack from {peer_id}");
288 }
289 Ok((peer_id, AckOutcome::IdDrift)) => {
290 tracing::warn!(
291 "federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
292 );
293 crate::metrics::registry()
294 .federation_fanout_dropped_total
295 .with_label_values(&["id_drift"])
296 .inc();
297 }
298 Ok((peer_id, AckOutcome::Fail(reason))) => {
299 tracing::warn!(
300 "federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
301 );
302 crate::metrics::registry()
303 .federation_fanout_dropped_total
304 .with_label_values(&["peer_fail"])
305 .inc();
306 }
307 Err(e) => {
308 tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
309 crate::metrics::registry()
310 .federation_fanout_dropped_total
311 .with_label_values(&["join_error"])
312 .inc();
313 }
314 }
315 }
316 });
317 }
318
319 let tracker = Arc::try_unwrap(tracker)
320 .map_err(|_| QuorumError::LocalWriteFailed {
321 detail: "tracker arc still referenced at finalise".to_string(),
322 })?
323 .into_inner();
324 Ok(tracker)
325}
326
327#[derive(Debug)]
328enum AckOutcome {
329 Ack,
330 IdDrift,
331 Fail(String),
332}
333
334async fn post_once(
338 client: &reqwest::Client,
339 url: &str,
340 body: &serde_json::Value,
341 expected_id: &str,
342 idempotency_key: Option<&str>,
343) -> AckOutcome {
344 let mut req = client.post(url).json(body);
351 if let Some(key) = idempotency_key {
352 req = req.header("Idempotency-Key", key);
353 }
354 match req.send().await {
355 Ok(resp) if resp.status().is_success() => {
356 match resp.json::<serde_json::Value>().await {
357 Ok(v) => {
358 if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
363 && !ids.is_empty()
364 && !ids.iter().any(|x| x.as_str() == Some(expected_id))
365 {
366 return AckOutcome::IdDrift;
367 }
368 AckOutcome::Ack
369 }
370 Err(_) => AckOutcome::Ack, }
372 }
373 Ok(resp) => AckOutcome::Fail(format!("http {}", resp.status())),
374 Err(e) => AckOutcome::Fail(format!("network: {e}")),
375 }
376}
377
378const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
383
384async fn post_and_classify(
406 client: &reqwest::Client,
407 url: &str,
408 body: &serde_json::Value,
409 expected_id: &str,
410 idempotency_key: Option<&str>,
411) -> AckOutcome {
412 match post_once(client, url, body, expected_id, idempotency_key).await {
413 AckOutcome::Ack => AckOutcome::Ack,
414 AckOutcome::IdDrift => AckOutcome::IdDrift,
415 AckOutcome::Fail(first_reason) => {
416 tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
417 match post_once(client, url, body, expected_id, idempotency_key).await {
418 AckOutcome::Ack => {
419 tracing::debug!(
420 "federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
421 );
422 crate::metrics::registry()
423 .federation_fanout_retry_total
424 .with_label_values(&["ok"])
425 .inc();
426 AckOutcome::Ack
427 }
428 AckOutcome::IdDrift => {
429 crate::metrics::registry()
430 .federation_fanout_retry_total
431 .with_label_values(&["id_drift"])
432 .inc();
433 AckOutcome::IdDrift
434 }
435 AckOutcome::Fail(retry_reason) => {
436 crate::metrics::registry()
437 .federation_fanout_retry_total
438 .with_label_values(&["fail"])
439 .inc();
440 AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
441 }
442 }
443 }
444 }
445}
446
447pub async fn bulk_catchup_push(
482 config: &FederationConfig,
483 memories: &[Memory],
484) -> Vec<(String, String)> {
485 if memories.is_empty() || config.peers.is_empty() {
486 return Vec::new();
487 }
488 let body = serde_json::json!({
489 "sender_agent_id": config.sender_agent_id,
490 "memories": memories,
491 "dry_run": false,
492 });
493 let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
494 for peer in &config.peers {
495 let client = config.client.clone();
496 let url = peer.sync_push_url.clone();
497 let id = peer.id.clone();
498 let payload = body.clone();
499 joins.spawn(async move {
500 let mut req = client.post(&url).json(&payload);
501 req = req.header("X-Catchup", "bulk");
505 let outcome = match req.send().await {
506 Ok(resp) if resp.status().is_success() => Ok(()),
507 Ok(resp) => Err(format!("http {}", resp.status())),
508 Err(e) => Err(format!("network: {e}")),
509 };
510 (id, outcome)
511 });
512 }
513 let mut errors = Vec::new();
514 while let Some(res) = joins.join_next().await {
515 match res {
516 Ok((peer_id, Err(err))) => {
517 tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
518 errors.push((peer_id, err));
519 }
520 Ok((_, Ok(()))) => {}
521 Err(e) => {
522 tracing::warn!("bulk_catchup_push: join error: {e:?}");
523 errors.push(("unknown".to_string(), e.to_string()));
524 }
525 }
526 }
527 errors
528}
529
530pub fn finalise_quorum(tracker: &AckTracker) -> Result<usize, QuorumError> {
539 tracker.finalise(Instant::now())
540}
541
542pub async fn broadcast_delete_quorum(
553 config: &FederationConfig,
554 id: &str,
555) -> Result<AckTracker, QuorumError> {
556 let now = Instant::now();
557 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
558 tracker.lock().await.record_local();
559
560 let body = serde_json::json!({
561 "sender_agent_id": config.sender_agent_id,
562 "memories": [],
563 "deletions": [id],
564 "dry_run": false,
565 });
566
567 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
568 for peer in &config.peers {
569 let client = config.client.clone();
570 let url = peer.sync_push_url.clone();
571 let peer_id = peer.id.clone();
572 let payload = body.clone();
573 let target_id = id.to_string();
574 joins.spawn(async move {
575 let outcome =
576 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
577 (peer_id, outcome)
578 });
579 }
580
581 let deadline = now + config.policy.ack_timeout;
582 loop {
583 let remaining = deadline.saturating_duration_since(Instant::now());
584 if remaining.is_zero() {
585 break;
586 }
587 match tokio::time::timeout(remaining, joins.join_next()).await {
588 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
589 tracker.lock().await.record_peer_ack(peer_id);
590 }
591 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
592 tracker.lock().await.record_id_drift(peer_id);
593 }
594 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
595 tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
596 }
597 Ok(Some(Err(e))) => {
598 tracing::warn!("federation: delete peer join error: {e}");
599 }
600 Ok(None) | Err(_) => break,
601 }
602 if tracker.lock().await.is_quorum_met(Instant::now()) {
603 break;
604 }
605 }
606
607 if !joins.is_empty() {
608 tokio::spawn(async move {
609 while let Some(res) = joins.join_next().await {
610 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
611 tracing::debug!(
612 "federation: post-quorum delete peer {peer_id} did not ack: {reason}"
613 );
614 }
615 }
616 });
617 }
618
619 let tracker = Arc::try_unwrap(tracker)
620 .map_err(|_| QuorumError::LocalWriteFailed {
621 detail: "tracker arc still referenced at finalise".to_string(),
622 })?
623 .into_inner();
624 Ok(tracker)
625}
626
627pub async fn broadcast_archive_quorum(
641 config: &FederationConfig,
642 id: &str,
643) -> Result<AckTracker, QuorumError> {
644 let now = Instant::now();
645 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
646 tracker.lock().await.record_local();
647
648 let body = serde_json::json!({
649 "sender_agent_id": config.sender_agent_id,
650 "memories": [],
651 "archives": [id],
652 "dry_run": false,
653 });
654
655 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
656 for peer in &config.peers {
657 let client = config.client.clone();
658 let url = peer.sync_push_url.clone();
659 let peer_id = peer.id.clone();
660 let payload = body.clone();
661 let target_id = id.to_string();
662 joins.spawn(async move {
663 let outcome =
664 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
665 (peer_id, outcome)
666 });
667 }
668
669 let deadline = now + config.policy.ack_timeout;
670 loop {
671 let remaining = deadline.saturating_duration_since(Instant::now());
672 if remaining.is_zero() {
673 break;
674 }
675 match tokio::time::timeout(remaining, joins.join_next()).await {
676 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
677 tracker.lock().await.record_peer_ack(peer_id);
678 }
679 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
680 tracker.lock().await.record_id_drift(peer_id);
681 }
682 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
683 tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
684 }
685 Ok(Some(Err(e))) => {
686 tracing::warn!("federation: archive peer join error: {e}");
687 }
688 Ok(None) | Err(_) => break,
689 }
690 if tracker.lock().await.is_quorum_met(Instant::now()) {
691 break;
692 }
693 }
694
695 if !joins.is_empty() {
696 tokio::spawn(async move {
697 while let Some(res) = joins.join_next().await {
698 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
699 tracing::debug!(
700 "federation: post-quorum archive peer {peer_id} did not ack: {reason}"
701 );
702 }
703 }
704 });
705 }
706
707 let tracker = Arc::try_unwrap(tracker)
708 .map_err(|_| QuorumError::LocalWriteFailed {
709 detail: "tracker arc still referenced at finalise".to_string(),
710 })?
711 .into_inner();
712 Ok(tracker)
713}
714
715pub async fn broadcast_restore_quorum(
730 config: &FederationConfig,
731 id: &str,
732) -> Result<AckTracker, QuorumError> {
733 let now = Instant::now();
734 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
735 tracker.lock().await.record_local();
736
737 let body = serde_json::json!({
738 "sender_agent_id": config.sender_agent_id,
739 "memories": [],
740 "restores": [id],
741 "dry_run": false,
742 });
743
744 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
745 for peer in &config.peers {
746 let client = config.client.clone();
747 let url = peer.sync_push_url.clone();
748 let peer_id = peer.id.clone();
749 let payload = body.clone();
750 let target_id = id.to_string();
751 joins.spawn(async move {
752 let outcome =
753 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
754 (peer_id, outcome)
755 });
756 }
757
758 let deadline = now + config.policy.ack_timeout;
759 loop {
760 let remaining = deadline.saturating_duration_since(Instant::now());
761 if remaining.is_zero() {
762 break;
763 }
764 match tokio::time::timeout(remaining, joins.join_next()).await {
765 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
766 tracker.lock().await.record_peer_ack(peer_id);
767 }
768 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
769 tracker.lock().await.record_id_drift(peer_id);
770 }
771 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
772 tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
773 }
774 Ok(Some(Err(e))) => {
775 tracing::warn!("federation: restore peer join error: {e}");
776 }
777 Ok(None) | Err(_) => break,
778 }
779 if tracker.lock().await.is_quorum_met(Instant::now()) {
780 break;
781 }
782 }
783
784 if !joins.is_empty() {
785 tokio::spawn(async move {
786 while let Some(res) = joins.join_next().await {
787 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
788 tracing::debug!(
789 "federation: post-quorum restore peer {peer_id} did not ack: {reason}"
790 );
791 }
792 }
793 });
794 }
795
796 let tracker = Arc::try_unwrap(tracker)
797 .map_err(|_| QuorumError::LocalWriteFailed {
798 detail: "tracker arc still referenced at finalise".to_string(),
799 })?
800 .into_inner();
801 Ok(tracker)
802}
803
804pub async fn broadcast_link_quorum(
813 config: &FederationConfig,
814 link: &MemoryLink,
815) -> Result<AckTracker, QuorumError> {
816 let now = Instant::now();
817 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
818 tracker.lock().await.record_local();
819
820 let body = serde_json::json!({
821 "sender_agent_id": config.sender_agent_id,
822 "memories": [],
823 "links": [link],
824 "dry_run": false,
825 });
826 let log_id = format!("{}→{}", link.source_id, link.target_id);
827
828 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
829 for peer in &config.peers {
830 let client = config.client.clone();
831 let url = peer.sync_push_url.clone();
832 let peer_id = peer.id.clone();
833 let payload = body.clone();
834 let log_id = log_id.clone();
835 joins.spawn(async move {
836 let outcome = post_and_classify(&client, &url, &payload, &log_id, Some(&log_id)).await;
837 (peer_id, outcome)
838 });
839 }
840
841 let deadline = now + config.policy.ack_timeout;
842 loop {
843 let remaining = deadline.saturating_duration_since(Instant::now());
844 if remaining.is_zero() {
845 break;
846 }
847 match tokio::time::timeout(remaining, joins.join_next()).await {
848 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
849 tracker.lock().await.record_peer_ack(peer_id);
850 }
851 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
852 tracker.lock().await.record_id_drift(peer_id);
853 }
854 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
855 tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
856 }
857 Ok(Some(Err(e))) => {
858 tracing::warn!("federation: link peer join error: {e}");
859 }
860 Ok(None) | Err(_) => break,
861 }
862 if tracker.lock().await.is_quorum_met(Instant::now()) {
863 break;
864 }
865 }
866
867 if !joins.is_empty() {
868 tokio::spawn(async move {
869 while let Some(res) = joins.join_next().await {
870 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
871 tracing::debug!(
872 "federation: post-quorum link peer {peer_id} did not ack: {reason}"
873 );
874 }
875 }
876 });
877 }
878
879 let tracker = Arc::try_unwrap(tracker)
880 .map_err(|_| QuorumError::LocalWriteFailed {
881 detail: "tracker arc still referenced at finalise".to_string(),
882 })?
883 .into_inner();
884 Ok(tracker)
885}
886
887pub async fn broadcast_consolidate_quorum(
896 config: &FederationConfig,
897 new_mem: &Memory,
898 source_ids: &[String],
899) -> Result<AckTracker, QuorumError> {
900 let now = Instant::now();
901 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
902 tracker.lock().await.record_local();
903
904 let body = serde_json::json!({
905 "sender_agent_id": config.sender_agent_id,
906 "memories": [new_mem],
907 "deletions": source_ids,
908 "dry_run": false,
909 });
910
911 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
912 for peer in &config.peers {
913 let client = config.client.clone();
914 let url = peer.sync_push_url.clone();
915 let peer_id = peer.id.clone();
916 let payload = body.clone();
917 let target_id = new_mem.id.clone();
918 joins.spawn(async move {
919 let outcome =
920 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
921 (peer_id, outcome)
922 });
923 }
924
925 let deadline = now + config.policy.ack_timeout;
926 loop {
927 let remaining = deadline.saturating_duration_since(Instant::now());
928 if remaining.is_zero() {
929 break;
930 }
931 match tokio::time::timeout(remaining, joins.join_next()).await {
932 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
933 tracker.lock().await.record_peer_ack(peer_id);
934 }
935 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
936 tracker.lock().await.record_id_drift(peer_id);
937 }
938 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
939 tracing::warn!(
940 "federation: consolidate peer {peer_id} failed for {}: {reason}",
941 new_mem.id
942 );
943 }
944 Ok(Some(Err(e))) => {
945 tracing::warn!("federation: consolidate peer join error: {e}");
946 }
947 Ok(None) | Err(_) => break,
948 }
949 if tracker.lock().await.is_quorum_met(Instant::now()) {
950 break;
951 }
952 }
953
954 if !joins.is_empty() {
955 tokio::spawn(async move {
956 while let Some(res) = joins.join_next().await {
957 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
958 tracing::debug!(
959 "federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
960 );
961 }
962 }
963 });
964 }
965
966 let tracker = Arc::try_unwrap(tracker)
967 .map_err(|_| QuorumError::LocalWriteFailed {
968 detail: "tracker arc still referenced at finalise".to_string(),
969 })?
970 .into_inner();
971 Ok(tracker)
972}
973
974pub async fn broadcast_pending_quorum(
986 config: &FederationConfig,
987 pending: &PendingAction,
988) -> Result<AckTracker, QuorumError> {
989 let now = Instant::now();
990 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
991 tracker.lock().await.record_local();
992
993 let body = serde_json::json!({
994 "sender_agent_id": config.sender_agent_id,
995 "memories": [],
996 "pendings": [pending],
997 "dry_run": false,
998 });
999
1000 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1001 for peer in &config.peers {
1002 let client = config.client.clone();
1003 let url = peer.sync_push_url.clone();
1004 let peer_id = peer.id.clone();
1005 let payload = body.clone();
1006 let target_id = pending.id.clone();
1007 joins.spawn(async move {
1008 let outcome =
1009 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
1010 (peer_id, outcome)
1011 });
1012 }
1013
1014 let deadline = now + config.policy.ack_timeout;
1015 loop {
1016 let remaining = deadline.saturating_duration_since(Instant::now());
1017 if remaining.is_zero() {
1018 break;
1019 }
1020 match tokio::time::timeout(remaining, joins.join_next()).await {
1021 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1022 tracker.lock().await.record_peer_ack(peer_id);
1023 }
1024 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1025 tracker.lock().await.record_id_drift(peer_id);
1026 }
1027 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1028 tracing::warn!(
1029 "federation: pending peer {peer_id} failed for {}: {reason}",
1030 pending.id
1031 );
1032 }
1033 Ok(Some(Err(e))) => {
1034 tracing::warn!("federation: pending peer join error: {e}");
1035 }
1036 Ok(None) | Err(_) => break,
1037 }
1038 if tracker.lock().await.is_quorum_met(Instant::now()) {
1039 break;
1040 }
1041 }
1042
1043 if !joins.is_empty() {
1044 tokio::spawn(async move {
1045 while let Some(res) = joins.join_next().await {
1046 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1047 tracing::debug!(
1048 "federation: post-quorum pending peer {peer_id} did not ack: {reason}"
1049 );
1050 }
1051 }
1052 });
1053 }
1054
1055 let tracker = Arc::try_unwrap(tracker)
1056 .map_err(|_| QuorumError::LocalWriteFailed {
1057 detail: "tracker arc still referenced at finalise".to_string(),
1058 })?
1059 .into_inner();
1060 Ok(tracker)
1061}
1062
1063pub async fn broadcast_pending_decision_quorum(
1074 config: &FederationConfig,
1075 decision: &PendingDecision,
1076) -> Result<AckTracker, QuorumError> {
1077 let now = Instant::now();
1078 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1079 tracker.lock().await.record_local();
1080
1081 let body = serde_json::json!({
1082 "sender_agent_id": config.sender_agent_id,
1083 "memories": [],
1084 "pending_decisions": [decision],
1085 "dry_run": false,
1086 });
1087
1088 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1089 for peer in &config.peers {
1090 let client = config.client.clone();
1091 let url = peer.sync_push_url.clone();
1092 let peer_id = peer.id.clone();
1093 let payload = body.clone();
1094 let target_id = decision.id.clone();
1095 joins.spawn(async move {
1096 let outcome =
1097 post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
1098 (peer_id, outcome)
1099 });
1100 }
1101
1102 let deadline = now + config.policy.ack_timeout;
1103 loop {
1104 let remaining = deadline.saturating_duration_since(Instant::now());
1105 if remaining.is_zero() {
1106 break;
1107 }
1108 match tokio::time::timeout(remaining, joins.join_next()).await {
1109 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1110 tracker.lock().await.record_peer_ack(peer_id);
1111 }
1112 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1113 tracker.lock().await.record_id_drift(peer_id);
1114 }
1115 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1116 tracing::warn!(
1117 "federation: pending-decision peer {peer_id} failed for {}: {reason}",
1118 decision.id
1119 );
1120 }
1121 Ok(Some(Err(e))) => {
1122 tracing::warn!("federation: pending-decision peer join error: {e}");
1123 }
1124 Ok(None) | Err(_) => break,
1125 }
1126 if tracker.lock().await.is_quorum_met(Instant::now()) {
1127 break;
1128 }
1129 }
1130
1131 if !joins.is_empty() {
1132 tokio::spawn(async move {
1133 while let Some(res) = joins.join_next().await {
1134 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1135 tracing::debug!(
1136 "federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
1137 );
1138 }
1139 }
1140 });
1141 }
1142
1143 let tracker = Arc::try_unwrap(tracker)
1144 .map_err(|_| QuorumError::LocalWriteFailed {
1145 detail: "tracker arc still referenced at finalise".to_string(),
1146 })?
1147 .into_inner();
1148 Ok(tracker)
1149}
1150
1151pub async fn broadcast_namespace_meta_quorum(
1163 config: &FederationConfig,
1164 entry: &NamespaceMetaEntry,
1165) -> Result<AckTracker, QuorumError> {
1166 let now = Instant::now();
1167 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1168 tracker.lock().await.record_local();
1169
1170 let body = serde_json::json!({
1171 "sender_agent_id": config.sender_agent_id,
1172 "memories": [],
1173 "namespace_meta": [entry],
1174 "dry_run": false,
1175 });
1176
1177 let target_id = entry.namespace.clone();
1178 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1179 for peer in &config.peers {
1180 let client = config.client.clone();
1181 let url = peer.sync_push_url.clone();
1182 let peer_id = peer.id.clone();
1183 let payload = body.clone();
1184 let target = target_id.clone();
1185 joins.spawn(async move {
1186 let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
1187 (peer_id, outcome)
1188 });
1189 }
1190
1191 let deadline = now + config.policy.ack_timeout;
1192 loop {
1193 let remaining = deadline.saturating_duration_since(Instant::now());
1194 if remaining.is_zero() {
1195 break;
1196 }
1197 match tokio::time::timeout(remaining, joins.join_next()).await {
1198 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1199 tracker.lock().await.record_peer_ack(peer_id);
1200 }
1201 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1202 tracker.lock().await.record_id_drift(peer_id);
1203 }
1204 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1205 tracing::warn!(
1206 "federation: namespace_meta peer {peer_id} failed for {}: {reason}",
1207 entry.namespace
1208 );
1209 }
1210 Ok(Some(Err(e))) => {
1211 tracing::warn!("federation: namespace_meta peer join error: {e}");
1212 }
1213 Ok(None) | Err(_) => break,
1214 }
1215 if tracker.lock().await.is_quorum_met(Instant::now()) {
1216 break;
1217 }
1218 }
1219
1220 if !joins.is_empty() {
1221 tokio::spawn(async move {
1222 while let Some(res) = joins.join_next().await {
1223 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1224 tracing::debug!(
1225 "federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
1226 );
1227 }
1228 }
1229 });
1230 }
1231
1232 let tracker = Arc::try_unwrap(tracker)
1233 .map_err(|_| QuorumError::LocalWriteFailed {
1234 detail: "tracker arc still referenced at finalise".to_string(),
1235 })?
1236 .into_inner();
1237 Ok(tracker)
1238}
1239
1240pub async fn broadcast_namespace_meta_clear_quorum(
1254 config: &FederationConfig,
1255 namespaces: &[String],
1256) -> Result<AckTracker, QuorumError> {
1257 let now = Instant::now();
1258 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1259 tracker.lock().await.record_local();
1260
1261 let body = serde_json::json!({
1262 "sender_agent_id": config.sender_agent_id,
1263 "memories": [],
1264 "namespace_meta_clears": namespaces,
1265 "dry_run": false,
1266 });
1267
1268 let target_id = namespaces.join(",");
1271 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1272 for peer in &config.peers {
1273 let client = config.client.clone();
1274 let url = peer.sync_push_url.clone();
1275 let peer_id = peer.id.clone();
1276 let payload = body.clone();
1277 let target = target_id.clone();
1278 joins.spawn(async move {
1279 let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
1280 (peer_id, outcome)
1281 });
1282 }
1283
1284 let deadline = now + config.policy.ack_timeout;
1285 loop {
1286 let remaining = deadline.saturating_duration_since(Instant::now());
1287 if remaining.is_zero() {
1288 break;
1289 }
1290 match tokio::time::timeout(remaining, joins.join_next()).await {
1291 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1292 tracker.lock().await.record_peer_ack(peer_id);
1293 }
1294 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1295 tracker.lock().await.record_id_drift(peer_id);
1296 }
1297 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1298 tracing::warn!(
1299 "federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
1300 target_id
1301 );
1302 }
1303 Ok(Some(Err(e))) => {
1304 tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
1305 }
1306 Ok(None) | Err(_) => break,
1307 }
1308 if tracker.lock().await.is_quorum_met(Instant::now()) {
1309 break;
1310 }
1311 }
1312
1313 if !joins.is_empty() {
1314 tokio::spawn(async move {
1315 while let Some(res) = joins.join_next().await {
1316 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1317 tracing::debug!(
1318 "federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
1319 );
1320 }
1321 }
1322 });
1323 }
1324
1325 let tracker = Arc::try_unwrap(tracker)
1326 .map_err(|_| QuorumError::LocalWriteFailed {
1327 detail: "tracker arc still referenced at finalise".to_string(),
1328 })?
1329 .into_inner();
1330 Ok(tracker)
1331}
1332
1333#[derive(Debug, Serialize, Deserialize)]
1335pub struct QuorumNotMetPayload {
1336 pub error: &'static str,
1337 pub got: usize,
1338 pub needed: usize,
1339 pub reason: String,
1340}
1341
1342impl QuorumNotMetPayload {
1343 #[must_use]
1344 pub fn from_err(err: &QuorumError) -> Self {
1345 match err {
1346 QuorumError::QuorumNotMet {
1347 got,
1348 needed,
1349 reason,
1350 } => Self {
1351 error: "quorum_not_met",
1352 got: *got,
1353 needed: *needed,
1354 reason: match reason {
1361 QuorumFailureReason::Unreachable => "unreachable".to_string(),
1362 QuorumFailureReason::Timeout | QuorumFailureReason::InFlight => {
1363 "timeout".to_string()
1364 }
1365 QuorumFailureReason::IdDrift => "id_drift".to_string(),
1366 },
1367 },
1368 QuorumError::InvalidPolicy { detail } => Self {
1369 error: "quorum_not_met",
1370 got: 0,
1371 needed: 0,
1372 reason: format!("invalid_policy:{detail}"),
1373 },
1374 QuorumError::LocalWriteFailed { detail } => Self {
1375 error: "quorum_not_met",
1376 got: 0,
1377 needed: 0,
1378 reason: format!("local_write_failed:{detail}"),
1379 },
1380 }
1381 }
1382}
1383
1384pub fn spawn_catchup_loop(
1407 config: FederationConfig,
1408 db: crate::handlers::Db,
1409 interval: Duration,
1410) -> tokio::task::JoinHandle<()> {
1411 tokio::spawn(async move {
1412 tokio::time::sleep(Duration::from_secs(5)).await;
1416 loop {
1417 catchup_once(&config, &db).await;
1418 tokio::time::sleep(interval).await;
1419 }
1420 })
1421}
1422
1423async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
1424 let local_id = config.sender_agent_id.clone();
1425 for peer in &config.peers {
1426 let base = peer
1429 .sync_push_url
1430 .trim_end_matches("/api/v1/sync/push")
1431 .to_string();
1432
1433 let since_opt: Option<String> = {
1437 let lock = db.lock().await;
1438 match crate::db::sync_state_load(&lock.0, &local_id) {
1439 Ok(clock) => clock.entries.get(&peer.id).cloned(),
1440 Err(_) => None,
1441 }
1442 };
1443
1444 let url = match since_opt.as_deref() {
1445 Some(s) => format!(
1446 "{base}/api/v1/sync/since?since={}&peer={local_id}",
1447 urlencoding_encode(s)
1448 ),
1449 None => format!("{base}/api/v1/sync/since?peer={local_id}"),
1450 };
1451
1452 let resp = match config.client.get(&url).send().await {
1453 Ok(r) if r.status().is_success() => r,
1454 Ok(r) => {
1455 tracing::debug!(
1456 "catchup: peer {} returned HTTP {} — skipping this tick",
1457 peer.id,
1458 r.status()
1459 );
1460 continue;
1461 }
1462 Err(e) => {
1463 tracing::debug!("catchup: peer {} unreachable: {e}", peer.id);
1464 continue;
1465 }
1466 };
1467
1468 let body: serde_json::Value = match resp.json().await {
1469 Ok(v) => v,
1470 Err(e) => {
1471 tracing::warn!("catchup: peer {} returned unparseable body: {e}", peer.id);
1472 continue;
1473 }
1474 };
1475
1476 let memories = match body.get("memories").and_then(|v| v.as_array()) {
1477 Some(arr) => arr.clone(),
1478 None => continue,
1479 };
1480
1481 if memories.is_empty() {
1482 continue;
1483 }
1484
1485 let mut applied = 0usize;
1486 let mut latest_ts: Option<String> = None;
1487 {
1488 let lock = db.lock().await;
1489 for raw in &memories {
1490 let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
1491 Ok(m) => m,
1492 Err(e) => {
1493 tracing::warn!("catchup: unparseable memory from peer {}: {e}", peer.id);
1494 continue;
1495 }
1496 };
1497 if crate::validate::validate_memory(&mem).is_err() {
1498 continue;
1499 }
1500 if latest_ts
1501 .as_deref()
1502 .is_none_or(|cur| mem.updated_at.as_str() > cur)
1503 {
1504 latest_ts = Some(mem.updated_at.clone());
1505 }
1506 if crate::db::insert_if_newer(&lock.0, &mem).is_ok() {
1507 applied += 1;
1508 }
1509 }
1510 if let Some(ts) = latest_ts.as_deref()
1511 && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
1512 {
1513 tracing::warn!("catchup: sync_state_observe failed for {}: {e}", peer.id);
1514 }
1515 }
1516
1517 if applied > 0 {
1518 tracing::info!(
1519 "catchup: applied {applied} memories from peer {} (since={})",
1520 peer.id,
1521 since_opt.as_deref().unwrap_or("<full-snapshot>"),
1522 );
1523 }
1524 }
1525}
1526
1527fn urlencoding_encode(s: &str) -> String {
1531 let mut out = String::with_capacity(s.len() + 6);
1532 for b in s.bytes() {
1533 match b {
1534 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1535 out.push(b as char);
1536 }
1537 _ => {
1538 use std::fmt::Write;
1539 let _ = write!(out, "%{b:02X}");
1540 }
1541 }
1542 }
1543 out
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548 use super::*;
1549 use axum::Router;
1550 use axum::extract::Json as AxumJson;
1551 use axum::http::StatusCode;
1552 use axum::routing::post;
1553 use std::sync::atomic::{AtomicUsize, Ordering};
1554 use tokio::net::TcpListener;
1555
1556 fn sample_memory() -> Memory {
1557 let now = chrono::Utc::now().to_rfc3339();
1558 Memory {
1559 id: "fed-test".to_string(),
1560 tier: crate::models::Tier::Mid,
1561 namespace: "app".to_string(),
1562 title: "hello".to_string(),
1563 content: "world for federation test".to_string(),
1564 tags: vec!["t".to_string()],
1565 priority: 5,
1566 confidence: 1.0,
1567 source: "test".to_string(),
1568 access_count: 0,
1569 created_at: now.clone(),
1570 updated_at: now,
1571 last_accessed_at: None,
1572 expires_at: None,
1573 metadata: serde_json::json!({"agent_id":"ai:test"}),
1574 }
1575 }
1576
1577 #[derive(Clone, Copy)]
1578 enum MockBehaviour {
1579 Ack,
1580 Fail,
1581 Hang,
1582 FailThenAck {
1585 fail_until: usize,
1586 },
1587 }
1588
1589 #[derive(Clone)]
1590 struct MockState {
1591 behaviour: MockBehaviour,
1592 count: Arc<AtomicUsize>,
1593 }
1594
1595 async fn mock_handler(
1596 axum::extract::State(state): axum::extract::State<MockState>,
1597 AxumJson(_body): AxumJson<serde_json::Value>,
1598 ) -> (StatusCode, AxumJson<serde_json::Value>) {
1599 let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
1600 match state.behaviour {
1601 MockBehaviour::Ack => (
1602 StatusCode::OK,
1603 AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
1604 ),
1605 MockBehaviour::Fail => (
1606 StatusCode::INTERNAL_SERVER_ERROR,
1607 AxumJson(serde_json::json!({"error":"stub failure"})),
1608 ),
1609 MockBehaviour::Hang => {
1610 tokio::time::sleep(Duration::from_secs(10)).await;
1611 (StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
1612 }
1613 MockBehaviour::FailThenAck { fail_until } => {
1614 if call <= fail_until {
1615 (
1616 StatusCode::INTERNAL_SERVER_ERROR,
1617 AxumJson(serde_json::json!({"error":"stub transient failure"})),
1618 )
1619 } else {
1620 (
1621 StatusCode::OK,
1622 AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
1623 )
1624 }
1625 }
1626 }
1627 }
1628
1629 async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
1630 let call_count = Arc::new(AtomicUsize::new(0));
1631 let state = MockState {
1632 behaviour,
1633 count: call_count.clone(),
1634 };
1635 let app = Router::new()
1636 .route("/api/v1/sync/push", post(mock_handler))
1637 .with_state(state);
1638 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1639 let addr = listener.local_addr().unwrap();
1640 tokio::spawn(async move {
1641 axum::serve(listener, app).await.ok();
1642 });
1643 (format!("http://{addr}"), call_count)
1644 }
1645
1646 fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
1647 let client = reqwest::Client::builder()
1648 .timeout(Duration::from_millis(timeout_ms))
1649 .build()
1650 .unwrap();
1651 let n = 1 + peers.len();
1652 FederationConfig {
1653 policy: QuorumPolicy::new(
1654 n,
1655 w,
1656 Duration::from_millis(timeout_ms),
1657 Duration::from_secs(30),
1658 )
1659 .unwrap(),
1660 peers: peers
1661 .into_iter()
1662 .enumerate()
1663 .map(|(i, url)| PeerEndpoint {
1664 id: format!("peer-{i}:{url}"),
1665 sync_push_url: format!("{url}/api/v1/sync/push"),
1666 })
1667 .collect(),
1668 client,
1669 sender_agent_id: "ai:fed-test".to_string(),
1670 }
1671 }
1672
1673 #[tokio::test]
1674 async fn happy_path_two_peers_quorum_met() {
1675 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1676 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1677 let cfg = build_config(vec![url1, url2], 2, 2000);
1678 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1679 .await
1680 .unwrap();
1681 let result = finalise_quorum(&tracker);
1682 assert!(result.is_ok(), "expected quorum met, got {result:?}");
1683 let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
1689 assert!(calls >= 1);
1690 }
1691
1692 #[tokio::test]
1693 async fn post_quorum_fanout_reaches_all_peers() {
1694 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1699 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1700 let cfg = build_config(vec![url1, url2], 2, 2000);
1701 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1702 .await
1703 .unwrap();
1704 for _ in 0..20 {
1707 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1708 break;
1709 }
1710 tokio::time::sleep(Duration::from_millis(10)).await;
1711 }
1712 assert_eq!(
1713 count1.load(Ordering::Relaxed),
1714 1,
1715 "peer-1 must receive the write post-quorum"
1716 );
1717 assert_eq!(
1718 count2.load(Ordering::Relaxed),
1719 1,
1720 "peer-2 must receive the write post-quorum"
1721 );
1722 }
1723
1724 #[tokio::test]
1725 async fn transient_peer_failure_is_retried_once() {
1726 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1732 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
1733 let cfg = build_config(vec![url1, url2], 2, 2000);
1734 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1735 .await
1736 .unwrap();
1737 for _ in 0..200 {
1739 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
1740 break;
1741 }
1742 tokio::time::sleep(Duration::from_millis(10)).await;
1743 }
1744 assert_eq!(
1745 count1.load(Ordering::Relaxed),
1746 1,
1747 "peer-1 acked first time, no retry"
1748 );
1749 assert_eq!(
1750 count2.load(Ordering::Relaxed),
1751 2,
1752 "peer-2 must see exactly two attempts (first fail, retry ack)"
1753 );
1754 }
1755
1756 #[tokio::test]
1757 async fn persistent_peer_failure_stops_after_one_retry() {
1758 let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1762 let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
1763 let cfg = build_config(vec![url1, url2], 2, 2000);
1764 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1765 .await
1766 .unwrap();
1767 tokio::time::sleep(Duration::from_millis(800)).await;
1769 assert_eq!(
1770 count2.load(Ordering::Relaxed),
1771 2,
1772 "persistently-failing peer must be called exactly twice (1 + 1 retry)"
1773 );
1774 }
1775
1776 #[tokio::test]
1777 async fn bulk_catchup_push_hits_every_peer_once() {
1778 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1781 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1782 let cfg = build_config(vec![url1, url2], 2, 2000);
1783 let mems = vec![sample_memory(), sample_memory(), sample_memory()];
1784 let errors = bulk_catchup_push(&cfg, &mems).await;
1785 assert!(
1786 errors.is_empty(),
1787 "catchup must succeed on healthy peers, got {errors:?}"
1788 );
1789 assert_eq!(
1790 count1.load(Ordering::Relaxed),
1791 1,
1792 "peer-1 must receive exactly one catchup batch"
1793 );
1794 assert_eq!(
1795 count2.load(Ordering::Relaxed),
1796 1,
1797 "peer-2 must receive exactly one catchup batch"
1798 );
1799 }
1800
1801 #[tokio::test]
1802 async fn bulk_catchup_push_reports_peer_failures() {
1803 let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1807 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1808 let cfg = build_config(vec![url1, url2], 2, 2000);
1809 let mems = vec![sample_memory()];
1810 let errors = bulk_catchup_push(&cfg, &mems).await;
1811 assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
1812 assert!(
1813 errors[0].1.contains("500") || errors[0].1.contains("http"),
1814 "error must name the HTTP failure, got {:?}",
1815 errors[0]
1816 );
1817 }
1818
1819 #[tokio::test]
1820 async fn bulk_catchup_push_empty_inputs_are_noop() {
1821 let cfg = build_config(vec![], 1, 500);
1823 assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
1824
1825 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1826 let cfg = build_config(vec![url1], 1, 500);
1827 assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
1828 assert_eq!(
1829 count1.load(Ordering::Relaxed),
1830 0,
1831 "no catchup POST must fire when the row set is empty"
1832 );
1833 }
1834
1835 #[tokio::test]
1836 async fn partition_minority_fails_quorum() {
1837 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1839 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1840 let cfg = build_config(vec![url1, url2], 3, 500);
1841 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1842 .await
1843 .unwrap();
1844 let err = finalise_quorum(&tracker).unwrap_err();
1845 match err {
1846 QuorumError::QuorumNotMet { got, needed, .. } => {
1847 assert_eq!(got, 1, "local commit only");
1848 assert_eq!(needed, 3);
1849 }
1850 other => panic!("expected QuorumNotMet, got {other:?}"),
1851 }
1852 }
1853
1854 #[tokio::test]
1855 async fn timeout_on_hanging_peer_classified_timeout() {
1856 let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
1858 let cfg = build_config(vec![url1], 2, 200);
1859 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1860 .await
1861 .unwrap();
1862 tokio::time::sleep(Duration::from_millis(50)).await;
1864 let err = finalise_quorum(&tracker).unwrap_err();
1865 match err {
1866 QuorumError::QuorumNotMet { reason, .. } => {
1867 assert!(
1868 matches!(
1869 reason,
1870 QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
1871 ),
1872 "unexpected reason {reason:?}"
1873 );
1874 }
1875 other => panic!("expected QuorumNotMet, got {other:?}"),
1876 }
1877 }
1878
1879 #[tokio::test]
1880 async fn majority_quorum_tolerates_one_peer_down() {
1881 let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1883 let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1884 let cfg = build_config(vec![url_up, url_down], 2, 2000);
1885 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1886 .await
1887 .unwrap();
1888 let result = finalise_quorum(&tracker);
1889 assert!(
1890 result.is_ok(),
1891 "majority should tolerate 1 peer down, got {result:?}"
1892 );
1893 }
1894
1895 #[test]
1896 fn config_build_disabled_when_w_zero() {
1897 let cfg = FederationConfig::build(
1898 0,
1899 &["http://example.com".to_string()],
1900 Duration::from_millis(500),
1901 None,
1902 None,
1903 None,
1904 "ai:test".to_string(),
1905 )
1906 .unwrap();
1907 assert!(cfg.is_none());
1908 }
1909
1910 #[test]
1911 fn config_build_disabled_when_peers_empty() {
1912 let cfg = FederationConfig::build(
1913 2,
1914 &[],
1915 Duration::from_millis(500),
1916 None,
1917 None,
1918 None,
1919 "ai:test".to_string(),
1920 )
1921 .unwrap();
1922 assert!(cfg.is_none());
1923 }
1924
1925 #[test]
1926 fn quorum_not_met_payload_from_err() {
1927 let err = QuorumError::QuorumNotMet {
1928 got: 1,
1929 needed: 3,
1930 reason: QuorumFailureReason::Timeout,
1931 };
1932 let payload = QuorumNotMetPayload::from_err(&err);
1933 assert_eq!(payload.error, "quorum_not_met");
1934 assert_eq!(payload.got, 1);
1935 assert_eq!(payload.needed, 3);
1936 assert_eq!(payload.reason, "timeout");
1937 }
1938
1939 #[tokio::test]
1942 async fn archive_quorum_two_peers_ack_meets_quorum() {
1943 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1944 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1945 let cfg = build_config(vec![url1, url2], 2, 2000);
1946 let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
1947 let result = finalise_quorum(&tracker);
1948 assert!(result.is_ok(), "expected quorum met, got {result:?}");
1949 for _ in 0..20 {
1951 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1952 break;
1953 }
1954 tokio::time::sleep(Duration::from_millis(10)).await;
1955 }
1956 assert_eq!(count1.load(Ordering::Relaxed), 1);
1957 assert_eq!(count2.load(Ordering::Relaxed), 1);
1958 }
1959
1960 #[tokio::test]
1961 async fn archive_quorum_partition_minority_fails() {
1962 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1964 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1965 let cfg = build_config(vec![url1, url2], 3, 500);
1966 let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
1967 let err = finalise_quorum(&tracker).unwrap_err();
1968 match err {
1969 QuorumError::QuorumNotMet { got, needed, .. } => {
1970 assert_eq!(got, 1);
1971 assert_eq!(needed, 3);
1972 }
1973 other => panic!("expected QuorumNotMet, got {other:?}"),
1974 }
1975 }
1976
1977 #[tokio::test]
1985 async fn delete_quorum_two_peers_ack_meets_quorum() {
1986 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1987 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1988 let cfg = build_config(vec![url1, url2], 2, 2000);
1989 let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
1990 assert!(finalise_quorum(&tracker).is_ok());
1991 for _ in 0..20 {
1992 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1993 break;
1994 }
1995 tokio::time::sleep(Duration::from_millis(10)).await;
1996 }
1997 assert_eq!(count1.load(Ordering::Relaxed), 1);
1998 assert_eq!(count2.load(Ordering::Relaxed), 1);
1999 }
2000
2001 #[tokio::test]
2002 async fn delete_quorum_partition_minority_fails() {
2003 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2004 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2005 let cfg = build_config(vec![url1, url2], 3, 500);
2006 let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
2007 let err = finalise_quorum(&tracker).unwrap_err();
2008 match err {
2009 QuorumError::QuorumNotMet { got, needed, .. } => {
2010 assert_eq!(got, 1);
2011 assert_eq!(needed, 3);
2012 }
2013 other => panic!("expected QuorumNotMet, got {other:?}"),
2014 }
2015 }
2016
2017 #[tokio::test]
2020 async fn restore_quorum_two_peers_ack_meets_quorum() {
2021 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2022 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2023 let cfg = build_config(vec![url1, url2], 2, 2000);
2024 let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
2025 assert!(finalise_quorum(&tracker).is_ok());
2026 for _ in 0..20 {
2027 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2028 break;
2029 }
2030 tokio::time::sleep(Duration::from_millis(10)).await;
2031 }
2032 assert_eq!(count1.load(Ordering::Relaxed), 1);
2033 assert_eq!(count2.load(Ordering::Relaxed), 1);
2034 }
2035
2036 #[tokio::test]
2037 async fn restore_quorum_partition_minority_fails() {
2038 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2039 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2040 let cfg = build_config(vec![url1, url2], 3, 500);
2041 let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
2042 let err = finalise_quorum(&tracker).unwrap_err();
2043 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2044 }
2045
2046 fn sample_link() -> MemoryLink {
2049 MemoryLink {
2050 source_id: "mem-a".to_string(),
2051 target_id: "mem-b".to_string(),
2052 relation: "related_to".to_string(),
2053 created_at: chrono::Utc::now().to_rfc3339(),
2054 }
2055 }
2056
2057 #[tokio::test]
2058 async fn link_quorum_two_peers_ack_meets_quorum() {
2059 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2060 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2061 let cfg = build_config(vec![url1, url2], 2, 2000);
2062 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2063 assert!(finalise_quorum(&tracker).is_ok());
2064 for _ in 0..20 {
2065 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2066 break;
2067 }
2068 tokio::time::sleep(Duration::from_millis(10)).await;
2069 }
2070 assert_eq!(count1.load(Ordering::Relaxed), 1);
2071 assert_eq!(count2.load(Ordering::Relaxed), 1);
2072 }
2073
2074 #[tokio::test]
2075 async fn link_quorum_partition_minority_fails() {
2076 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2077 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2078 let cfg = build_config(vec![url1, url2], 3, 500);
2079 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2080 let err = finalise_quorum(&tracker).unwrap_err();
2081 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2082 }
2083
2084 #[tokio::test]
2087 async fn consolidate_quorum_two_peers_ack_meets_quorum() {
2088 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2089 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2090 let cfg = build_config(vec![url1, url2], 2, 2000);
2091 let new_mem = sample_memory();
2092 let sources = vec!["src-a".to_string(), "src-b".to_string()];
2093 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
2094 .await
2095 .unwrap();
2096 assert!(finalise_quorum(&tracker).is_ok());
2097 for _ in 0..20 {
2098 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2099 break;
2100 }
2101 tokio::time::sleep(Duration::from_millis(10)).await;
2102 }
2103 assert_eq!(count1.load(Ordering::Relaxed), 1);
2104 assert_eq!(count2.load(Ordering::Relaxed), 1);
2105 }
2106
2107 #[tokio::test]
2108 async fn consolidate_quorum_partition_minority_fails() {
2109 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2110 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2111 let cfg = build_config(vec![url1, url2], 3, 500);
2112 let new_mem = sample_memory();
2113 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
2114 .await
2115 .unwrap();
2116 let err = finalise_quorum(&tracker).unwrap_err();
2117 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2118 }
2119
2120 fn sample_pending() -> PendingAction {
2123 PendingAction {
2124 id: "pa-1".to_string(),
2125 action_type: "delete".to_string(),
2126 memory_id: Some("mem-x".to_string()),
2127 namespace: "app".to_string(),
2128 payload: serde_json::json!({}),
2129 requested_by: "ai:test".to_string(),
2130 requested_at: chrono::Utc::now().to_rfc3339(),
2131 status: "pending".to_string(),
2132 decided_by: None,
2133 decided_at: None,
2134 approvals: vec![],
2135 }
2136 }
2137
2138 #[tokio::test]
2139 async fn pending_quorum_two_peers_ack_meets_quorum() {
2140 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2141 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2142 let cfg = build_config(vec![url1, url2], 2, 2000);
2143 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2144 .await
2145 .unwrap();
2146 assert!(finalise_quorum(&tracker).is_ok());
2147 for _ in 0..20 {
2148 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2149 break;
2150 }
2151 tokio::time::sleep(Duration::from_millis(10)).await;
2152 }
2153 assert_eq!(count1.load(Ordering::Relaxed), 1);
2154 assert_eq!(count2.load(Ordering::Relaxed), 1);
2155 }
2156
2157 #[tokio::test]
2158 async fn pending_quorum_partition_minority_fails() {
2159 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2160 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2161 let cfg = build_config(vec![url1, url2], 3, 500);
2162 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2163 .await
2164 .unwrap();
2165 let err = finalise_quorum(&tracker).unwrap_err();
2166 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2167 }
2168
2169 fn sample_decision() -> PendingDecision {
2172 PendingDecision {
2173 id: "pa-1".to_string(),
2174 approved: true,
2175 decider: "ai:approver".to_string(),
2176 }
2177 }
2178
2179 #[tokio::test]
2180 async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
2181 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2182 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2183 let cfg = build_config(vec![url1, url2], 2, 2000);
2184 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2185 .await
2186 .unwrap();
2187 assert!(finalise_quorum(&tracker).is_ok());
2188 for _ in 0..20 {
2189 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2190 break;
2191 }
2192 tokio::time::sleep(Duration::from_millis(10)).await;
2193 }
2194 assert_eq!(count1.load(Ordering::Relaxed), 1);
2195 assert_eq!(count2.load(Ordering::Relaxed), 1);
2196 }
2197
2198 #[tokio::test]
2199 async fn pending_decision_quorum_partition_minority_fails() {
2200 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2201 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2202 let cfg = build_config(vec![url1, url2], 3, 500);
2203 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2204 .await
2205 .unwrap();
2206 let err = finalise_quorum(&tracker).unwrap_err();
2207 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2208 }
2209
2210 fn sample_namespace_meta() -> NamespaceMetaEntry {
2213 NamespaceMetaEntry {
2214 namespace: "app/team".to_string(),
2215 standard_id: "mem-std-1".to_string(),
2216 parent_namespace: Some("app".to_string()),
2217 updated_at: chrono::Utc::now().to_rfc3339(),
2218 }
2219 }
2220
2221 #[tokio::test]
2222 async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
2223 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2224 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2225 let cfg = build_config(vec![url1, url2], 2, 2000);
2226 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2227 .await
2228 .unwrap();
2229 assert!(finalise_quorum(&tracker).is_ok());
2230 for _ in 0..20 {
2231 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2232 break;
2233 }
2234 tokio::time::sleep(Duration::from_millis(10)).await;
2235 }
2236 assert_eq!(count1.load(Ordering::Relaxed), 1);
2237 assert_eq!(count2.load(Ordering::Relaxed), 1);
2238 }
2239
2240 #[tokio::test]
2241 async fn namespace_meta_quorum_partition_minority_fails() {
2242 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2243 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2244 let cfg = build_config(vec![url1, url2], 3, 500);
2245 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2246 .await
2247 .unwrap();
2248 let err = finalise_quorum(&tracker).unwrap_err();
2249 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2250 }
2251
2252 #[tokio::test]
2255 async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
2256 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2257 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2258 let cfg = build_config(vec![url1, url2], 2, 2000);
2259 let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
2260 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2261 .await
2262 .unwrap();
2263 assert!(finalise_quorum(&tracker).is_ok());
2264 for _ in 0..20 {
2265 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2266 break;
2267 }
2268 tokio::time::sleep(Duration::from_millis(10)).await;
2269 }
2270 assert_eq!(count1.load(Ordering::Relaxed), 1);
2271 assert_eq!(count2.load(Ordering::Relaxed), 1);
2272 }
2273
2274 #[tokio::test]
2275 async fn namespace_meta_clear_quorum_partition_minority_fails() {
2276 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2277 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2278 let cfg = build_config(vec![url1, url2], 3, 500);
2279 let namespaces = vec!["app/team".to_string()];
2280 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2281 .await
2282 .unwrap();
2283 let err = finalise_quorum(&tracker).unwrap_err();
2284 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2285 }
2286
2287 #[test]
2294 fn quorum_not_met_payload_unreachable_reason() {
2295 let err = QuorumError::QuorumNotMet {
2296 got: 1,
2297 needed: 2,
2298 reason: QuorumFailureReason::Unreachable,
2299 };
2300 let payload = QuorumNotMetPayload::from_err(&err);
2301 assert_eq!(payload.reason, "unreachable");
2302 }
2303
2304 #[test]
2305 fn quorum_not_met_payload_id_drift_reason() {
2306 let err = QuorumError::QuorumNotMet {
2307 got: 1,
2308 needed: 2,
2309 reason: QuorumFailureReason::IdDrift,
2310 };
2311 let payload = QuorumNotMetPayload::from_err(&err);
2312 assert_eq!(payload.reason, "id_drift");
2313 }
2314
2315 #[test]
2316 fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
2317 let err = QuorumError::QuorumNotMet {
2320 got: 1,
2321 needed: 2,
2322 reason: QuorumFailureReason::InFlight,
2323 };
2324 let payload = QuorumNotMetPayload::from_err(&err);
2325 assert_eq!(payload.reason, "timeout");
2326 }
2327
2328 #[test]
2329 fn quorum_not_met_payload_invalid_policy_branch() {
2330 let err = QuorumError::InvalidPolicy {
2331 detail: "bad-thing".to_string(),
2332 };
2333 let payload = QuorumNotMetPayload::from_err(&err);
2334 assert_eq!(payload.error, "quorum_not_met");
2335 assert_eq!(payload.got, 0);
2336 assert_eq!(payload.needed, 0);
2337 assert!(payload.reason.starts_with("invalid_policy:"));
2338 assert!(payload.reason.contains("bad-thing"));
2339 }
2340
2341 #[test]
2342 fn quorum_not_met_payload_local_write_failed_branch() {
2343 let err = QuorumError::LocalWriteFailed {
2344 detail: "disk-full".to_string(),
2345 };
2346 let payload = QuorumNotMetPayload::from_err(&err);
2347 assert_eq!(payload.error, "quorum_not_met");
2348 assert!(payload.reason.starts_with("local_write_failed:"));
2349 assert!(payload.reason.contains("disk-full"));
2350 }
2351
2352 #[test]
2355 fn config_build_constructs_when_w_and_peers_set() {
2356 let cfg = FederationConfig::build(
2357 2,
2358 &[
2359 "http://peer-a.example/".to_string(),
2360 "http://peer-b.example".to_string(),
2361 ],
2362 Duration::from_millis(500),
2363 None,
2364 None,
2365 None,
2366 "ai:builder".to_string(),
2367 )
2368 .unwrap()
2369 .expect("config should be Some when w>0 and peers nonempty");
2370 assert_eq!(cfg.peer_count(), 2);
2371 assert_eq!(cfg.peers[0].id, "peer-0");
2372 assert_eq!(cfg.peers[1].id, "peer-1");
2373 assert_eq!(
2375 cfg.peers[0].sync_push_url,
2376 "http://peer-a.example/api/v1/sync/push"
2377 );
2378 assert_eq!(
2379 cfg.peers[1].sync_push_url,
2380 "http://peer-b.example/api/v1/sync/push"
2381 );
2382 assert_eq!(cfg.sender_agent_id, "ai:builder");
2383 }
2384
2385 #[test]
2386 fn config_build_rejects_duplicate_peer_urls() {
2387 let result = FederationConfig::build(
2388 2,
2389 &[
2390 "http://peer.example".to_string(),
2391 "http://peer.example/".to_string(),
2392 ],
2393 Duration::from_millis(500),
2394 None,
2395 None,
2396 None,
2397 "ai:builder".to_string(),
2398 );
2399 let err = match result {
2400 Ok(_) => panic!("expected duplicate-URL rejection"),
2401 Err(e) => e,
2402 };
2403 let msg = format!("{err}");
2404 assert!(
2405 msg.contains("duplicate peer URL"),
2406 "expected duplicate-URL rejection, got {msg:?}"
2407 );
2408 }
2409
2410 #[test]
2411 fn config_build_rejects_missing_ca_cert_path() {
2412 let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
2414 let result = FederationConfig::build(
2415 2,
2416 &["http://peer.example".to_string()],
2417 Duration::from_millis(500),
2418 None,
2419 None,
2420 Some(&bogus),
2421 "ai:builder".to_string(),
2422 );
2423 let err = match result {
2424 Ok(_) => panic!("expected ca-cert read error"),
2425 Err(e) => e,
2426 };
2427 let msg = format!("{err}");
2428 assert!(
2429 msg.contains("read --quorum-ca-cert"),
2430 "expected ca-cert read error, got {msg:?}"
2431 );
2432 }
2433
2434 #[test]
2435 fn config_build_rejects_invalid_ca_cert_pem() {
2436 let dir = tempfile::tempdir().unwrap();
2438 let bad = dir.path().join("not-a-cert.pem");
2439 std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
2440 let result = FederationConfig::build(
2441 2,
2442 &["http://peer.example".to_string()],
2443 Duration::from_millis(500),
2444 None,
2445 None,
2446 Some(&bad),
2447 "ai:builder".to_string(),
2448 );
2449 let err = match result {
2450 Ok(_) => panic!("expected ca-cert parse error"),
2451 Err(e) => e,
2452 };
2453 let msg = format!("{err}");
2454 assert!(
2455 msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
2456 "expected ca-cert parse error, got {msg:?}"
2457 );
2458 }
2459
2460 #[test]
2461 fn config_build_rejects_missing_client_cert_path() {
2462 let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
2463 let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
2464 let result = FederationConfig::build(
2465 2,
2466 &["http://peer.example".to_string()],
2467 Duration::from_millis(500),
2468 Some(&bogus_cert),
2469 Some(&bogus_key),
2470 None,
2471 "ai:builder".to_string(),
2472 );
2473 let err = match result {
2474 Ok(_) => panic!("expected client-cert read error"),
2475 Err(e) => e,
2476 };
2477 let msg = format!("{err}");
2478 assert!(
2479 msg.contains("read --client-cert"),
2480 "expected client-cert read error, got {msg:?}"
2481 );
2482 }
2483
2484 #[test]
2485 fn peer_count_matches_peer_list() {
2486 let cfg = build_config(
2487 vec![
2488 "http://a.example".to_string(),
2489 "http://b.example".to_string(),
2490 "http://c.example".to_string(),
2491 ],
2492 2,
2493 500,
2494 );
2495 assert_eq!(cfg.peer_count(), 3);
2496 }
2497
2498 #[test]
2501 fn urlencoding_encode_passthrough_safe_chars() {
2502 let encoded = urlencoding_encode("abcXYZ-09_.~");
2504 assert_eq!(encoded, "abcXYZ-09_.~");
2505 }
2506
2507 #[test]
2508 fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
2509 let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
2511 assert!(
2512 encoded.contains("%3A"),
2513 "expected colon to be percent-encoded: {encoded}"
2514 );
2515 assert!(
2516 encoded.contains("%2B"),
2517 "expected + to be percent-encoded: {encoded}"
2518 );
2519 assert!(
2520 encoded.contains("%2F"),
2521 "expected / to be percent-encoded: {encoded}"
2522 );
2523 assert!(
2524 encoded.contains("%20"),
2525 "expected space to be percent-encoded: {encoded}"
2526 );
2527 assert!(
2529 !encoded.contains("%2D"),
2530 "hyphen must pass through unencoded: {encoded}"
2531 );
2532 }
2533
2534 #[test]
2535 fn urlencoding_encode_empty_string() {
2536 assert_eq!(urlencoding_encode(""), "");
2537 }
2538
2539 async fn id_drift_handler(
2547 AxumJson(_body): AxumJson<serde_json::Value>,
2548 ) -> (StatusCode, AxumJson<serde_json::Value>) {
2549 (
2551 StatusCode::OK,
2552 AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
2553 )
2554 }
2555
2556 async fn spawn_id_drift_peer() -> String {
2557 let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
2558 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2559 let addr = listener.local_addr().unwrap();
2560 tokio::spawn(async move {
2561 axum::serve(listener, app).await.ok();
2562 });
2563 format!("http://{addr}")
2564 }
2565
2566 #[tokio::test]
2567 async fn id_drift_peer_does_not_count_as_ack() {
2568 let url1 = spawn_id_drift_peer().await;
2572 let url2 = spawn_id_drift_peer().await;
2573 let cfg = build_config(vec![url1, url2], 2, 1000);
2574 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2575 .await
2576 .unwrap();
2577 let result = finalise_quorum(&tracker);
2578 let err = result.unwrap_err();
2580 match err {
2581 QuorumError::QuorumNotMet {
2582 got,
2583 needed,
2584 reason,
2585 } => {
2586 assert_eq!(got, 1, "only local should count");
2587 assert_eq!(needed, 2);
2588 assert!(
2596 matches!(
2597 reason,
2598 QuorumFailureReason::IdDrift
2599 | QuorumFailureReason::Timeout
2600 | QuorumFailureReason::InFlight
2601 ),
2602 "expected IdDrift / Timeout / InFlight, got {reason:?}"
2603 );
2604 }
2605 other => panic!("expected QuorumNotMet, got {other:?}"),
2606 }
2607 }
2608
2609 #[derive(Clone)]
2624 enum SinceMockBehaviour {
2625 ReturnMemories(Vec<Memory>),
2627 Error500,
2629 Hang(Duration),
2631 MalformedBody,
2633 }
2634
2635 #[derive(Clone)]
2636 struct SinceMockState {
2637 behaviour: SinceMockBehaviour,
2638 hits: Arc<AtomicUsize>,
2639 last_since: Arc<Mutex<Option<String>>>,
2640 last_peer: Arc<Mutex<Option<String>>>,
2641 }
2642
2643 async fn since_handler(
2644 axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
2645 axum::extract::State(state): axum::extract::State<SinceMockState>,
2646 ) -> axum::response::Response {
2647 use axum::response::IntoResponse;
2648 state.hits.fetch_add(1, Ordering::Relaxed);
2649 {
2650 let mut s = state.last_since.lock().await;
2651 *s = q.get("since").cloned();
2652 }
2653 {
2654 let mut p = state.last_peer.lock().await;
2655 *p = q.get("peer").cloned();
2656 }
2657 match &state.behaviour {
2658 SinceMockBehaviour::ReturnMemories(mems) => {
2659 let body = serde_json::json!({"memories": mems});
2660 (StatusCode::OK, AxumJson(body)).into_response()
2661 }
2662 SinceMockBehaviour::Error500 => (
2663 StatusCode::INTERNAL_SERVER_ERROR,
2664 AxumJson(serde_json::json!({"error":"oops"})),
2665 )
2666 .into_response(),
2667 SinceMockBehaviour::Hang(d) => {
2668 tokio::time::sleep(*d).await;
2669 (
2670 StatusCode::OK,
2671 AxumJson(serde_json::json!({"memories": []})),
2672 )
2673 .into_response()
2674 }
2675 SinceMockBehaviour::MalformedBody => {
2676 (
2679 [(axum::http::header::CONTENT_TYPE, "application/json")],
2680 "this is not json {{{",
2681 )
2682 .into_response()
2683 }
2684 }
2685 }
2686
2687 async fn spawn_since_peer(
2690 behaviour: SinceMockBehaviour,
2691 ) -> (
2692 String,
2693 Arc<AtomicUsize>,
2694 Arc<Mutex<Option<String>>>,
2695 Arc<Mutex<Option<String>>>,
2696 ) {
2697 let hits = Arc::new(AtomicUsize::new(0));
2698 let last_since = Arc::new(Mutex::new(None));
2699 let last_peer = Arc::new(Mutex::new(None));
2700 let state = SinceMockState {
2701 behaviour,
2702 hits: hits.clone(),
2703 last_since: last_since.clone(),
2704 last_peer: last_peer.clone(),
2705 };
2706 let app = Router::new()
2707 .route("/api/v1/sync/since", axum::routing::get(since_handler))
2708 .with_state(state);
2709 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2710 let addr = listener.local_addr().unwrap();
2711 tokio::spawn(async move {
2712 axum::serve(listener, app).await.ok();
2713 });
2714 (format!("http://{addr}"), hits, last_since, last_peer)
2715 }
2716
2717 fn build_test_db() -> crate::handlers::Db {
2721 let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
2722 let path = std::path::PathBuf::from(":memory:");
2723 Arc::new(Mutex::new((
2724 conn,
2725 path,
2726 crate::config::ResolvedTtl::default(),
2727 true,
2728 )))
2729 }
2730
2731 fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
2739 let client = reqwest::Client::builder()
2740 .timeout(Duration::from_millis(timeout_ms))
2741 .build()
2742 .unwrap();
2743 FederationConfig {
2744 policy: QuorumPolicy::new(
2745 2,
2746 1,
2747 Duration::from_millis(timeout_ms),
2748 Duration::from_secs(30),
2749 )
2750 .unwrap(),
2751 peers: vec![PeerEndpoint {
2752 id: "peer-0".to_string(),
2753 sync_push_url: format!("{peer_url}/api/v1/sync/push"),
2754 }],
2755 client,
2756 sender_agent_id: "ai:catchup-test".to_string(),
2757 }
2758 }
2759
2760 fn catchup_memory(title: &str, updated_at: &str) -> Memory {
2765 Memory {
2766 id: format!("cat-{title}"),
2767 tier: crate::models::Tier::Mid,
2768 namespace: "catchup".to_string(),
2769 title: title.to_string(),
2770 content: format!("content for {title}"),
2771 tags: vec!["catchup".to_string()],
2772 priority: 5,
2773 confidence: 1.0,
2774 source: "system".to_string(),
2780 access_count: 0,
2781 created_at: updated_at.to_string(),
2782 updated_at: updated_at.to_string(),
2783 last_accessed_at: None,
2784 expires_at: None,
2785 metadata: serde_json::json!({"agent_id":"ai:peer-0"}),
2786 }
2787 }
2788
2789 #[tokio::test]
2792 async fn test_catchup_once_pulls_since_cursor_advances_state() {
2793 let mems = vec![
2797 catchup_memory("a", "2026-04-26T10:00:00Z"),
2798 catchup_memory("b", "2026-04-26T10:00:01Z"),
2799 catchup_memory("c", "2026-04-26T10:00:02Z"),
2800 catchup_memory("d", "2026-04-26T10:00:03Z"),
2801 catchup_memory("e", "2026-04-26T10:00:04Z"),
2802 ];
2803 let latest_ts = mems.last().unwrap().updated_at.clone();
2804 let (url, hits, last_since, last_peer) =
2805 spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
2806 let cfg = build_catchup_cfg(&url, 2000);
2807 let db = build_test_db();
2808
2809 catchup_once(&cfg, &db).await;
2810
2811 assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
2812 assert!(
2814 last_since.lock().await.is_none(),
2815 "first catchup must omit since"
2816 );
2817 assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
2819 let lock = db.lock().await;
2821 let clock =
2822 crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
2823 assert_eq!(
2824 clock.entries.get("peer-0").map(String::as_str),
2825 Some(latest_ts.as_str()),
2826 "sync state advanced to latest pulled memory's updated_at"
2827 );
2828 let count: i64 = lock
2830 .0
2831 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2832 .unwrap();
2833 assert_eq!(count, 5, "all five memories inserted");
2834 }
2835
2836 #[tokio::test]
2839 async fn test_catchup_once_no_new_memories_no_op() {
2840 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
2841 let cfg = build_catchup_cfg(&url, 2000);
2842 let db = build_test_db();
2843
2844 catchup_once(&cfg, &db).await;
2845
2846 assert_eq!(hits.load(Ordering::Relaxed), 1);
2847 let lock = db.lock().await;
2848 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2849 assert!(
2850 clock.entries.get("peer-0").is_none(),
2851 "empty response must not advance sync_state"
2852 );
2853 let count: i64 = lock
2854 .0
2855 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2856 .unwrap();
2857 assert_eq!(count, 0);
2858 }
2859
2860 #[tokio::test]
2863 async fn test_catchup_once_peer_500_error_logged_no_panic() {
2864 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
2865 let cfg = build_catchup_cfg(&url, 2000);
2866 let db = build_test_db();
2867
2868 catchup_once(&cfg, &db).await;
2870
2871 assert_eq!(hits.load(Ordering::Relaxed), 1);
2872 let lock = db.lock().await;
2873 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2874 assert!(
2875 clock.entries.get("peer-0").is_none(),
2876 "500 must not advance sync state"
2877 );
2878 }
2879
2880 #[tokio::test]
2883 async fn test_catchup_once_peer_timeout_handled() {
2884 let (url, hits, _, _) =
2887 spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
2888 let cfg = build_catchup_cfg(&url, 200);
2889 let db = build_test_db();
2890
2891 let start = Instant::now();
2892 catchup_once(&cfg, &db).await;
2893 let elapsed = start.elapsed();
2894
2895 assert!(
2898 elapsed < Duration::from_millis(1500),
2899 "catchup_once should honour the client timeout, took {elapsed:?}"
2900 );
2901 assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
2902 let lock = db.lock().await;
2903 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2904 assert!(clock.entries.get("peer-0").is_none());
2905 }
2906
2907 #[tokio::test]
2910 async fn test_catchup_once_malformed_response_handled() {
2911 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
2912 let cfg = build_catchup_cfg(&url, 2000);
2913 let db = build_test_db();
2914
2915 catchup_once(&cfg, &db).await;
2917
2918 assert_eq!(hits.load(Ordering::Relaxed), 1);
2919 let lock = db.lock().await;
2920 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2921 assert!(
2922 clock.entries.get("peer-0").is_none(),
2923 "malformed body must not advance sync state"
2924 );
2925 }
2926
2927 #[tokio::test]
2930 async fn test_catchup_once_inserts_only_newer_memories() {
2931 let db = build_test_db();
2936 {
2937 let lock = db.lock().await;
2938 let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
2939 crate::db::insert_if_newer(&lock.0, &local).unwrap();
2942 let cnt: i64 = lock
2944 .0
2945 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2946 .unwrap();
2947 assert_eq!(cnt, 1, "pre-seeded shared row");
2948 }
2949
2950 let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
2951 stale_shared.content = "stale-from-catchup-peer".to_string();
2954 stale_shared.id = "cat-shared-OLD".to_string();
2955 let stale_shared_content = stale_shared.content.clone();
2956 let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
2957 let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
2958 stale_shared,
2959 new_fresh,
2960 ]))
2961 .await;
2962 let cfg = build_catchup_cfg(&url, 2000);
2963
2964 catchup_once(&cfg, &db).await;
2965
2966 let lock = db.lock().await;
2967 let cnt: i64 = lock
2969 .0
2970 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2971 .unwrap();
2972 assert_eq!(cnt, 2, "fresh row inserted, shared kept");
2973 let shared_content: String = lock
2976 .0
2977 .query_row(
2978 "SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
2979 [],
2980 |r| r.get(0),
2981 )
2982 .unwrap();
2983 assert_ne!(
2984 shared_content, stale_shared_content,
2985 "older catchup memory must NOT overwrite newer local row"
2986 );
2987 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2991 assert_eq!(
2992 clock.entries.get("peer-0").map(String::as_str),
2993 Some("2026-04-26T10:00:02Z"),
2994 );
2995 }
2996
2997 #[tokio::test(start_paused = true)]
3000 async fn test_spawn_catchup_loop_runs_at_interval() {
3001 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3005 let cfg = build_catchup_cfg(&url, 5000);
3006 let db = build_test_db();
3007
3008 let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
3009
3010 for _ in 0..6 {
3015 tokio::time::advance(Duration::from_secs(1)).await;
3016 tokio::task::yield_now().await;
3017 }
3018 for _ in 0..50 {
3022 if hits.load(Ordering::Relaxed) >= 1 {
3023 break;
3024 }
3025 tokio::task::yield_now().await;
3026 tokio::time::advance(Duration::from_millis(10)).await;
3027 }
3028
3029 assert!(
3030 hits.load(Ordering::Relaxed) >= 1,
3031 "first catchup tick must hit the mock peer (got {})",
3032 hits.load(Ordering::Relaxed),
3033 );
3034
3035 handle.abort();
3036 }
3037
3038 #[tokio::test]
3041 async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
3042 let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3045 let cfg = build_catchup_cfg(&url, 2000);
3046 let db = build_test_db();
3047
3048 let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(3600));
3049 handle.abort();
3052 let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
3053 let join = result.expect("aborted handle must resolve within 500ms");
3054 assert!(
3055 join.is_err() && join.unwrap_err().is_cancelled(),
3056 "handle.abort() must surface as is_cancelled() == true"
3057 );
3058 }
3059
3060 #[test]
3063 fn test_build_config_mtls_with_valid_files() {
3064 let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3070 .join("tests/fixtures/tls/valid_cert.pem");
3071 let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3072 .join("tests/fixtures/tls/valid_key_pkcs8.pem");
3073 assert!(cert.exists(), "missing test fixture: {cert:?}");
3075 assert!(key.exists(), "missing test fixture: {key:?}");
3076
3077 let result = FederationConfig::build(
3078 2,
3079 &["http://peer.example".to_string()],
3080 Duration::from_millis(500),
3081 Some(&cert),
3082 Some(&key),
3083 None,
3084 "ai:builder".to_string(),
3085 );
3086 let cfg = match result {
3087 Ok(Some(c)) => c,
3088 Ok(None) => panic!("expected Some(FederationConfig), got None"),
3089 Err(e) => panic!("expected Ok, got Err: {e}"),
3090 };
3091 assert_eq!(cfg.peer_count(), 1);
3092 }
3093
3094 #[test]
3097 fn test_build_config_mtls_with_missing_files_returns_error() {
3098 let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3104 .join("tests/fixtures/tls/valid_cert.pem");
3105 let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
3106 assert!(cert.exists(), "missing test fixture: {cert:?}");
3107
3108 let result = FederationConfig::build(
3109 2,
3110 &["http://peer.example".to_string()],
3111 Duration::from_millis(500),
3112 Some(&cert),
3113 Some(&bogus_key),
3114 None,
3115 "ai:builder".to_string(),
3116 );
3117 let err = match result {
3118 Ok(_) => panic!("expected client-key read error"),
3119 Err(e) => e,
3120 };
3121 let msg = format!("{err}");
3122 assert!(
3123 msg.contains("read --client-key"),
3124 "expected client-key read error, got {msg:?}"
3125 );
3126 }
3127
3128 #[tokio::test]
3162 async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
3163 let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
3164 let client = reqwest::Client::builder()
3165 .timeout(Duration::from_millis(2000))
3166 .build()
3167 .unwrap();
3168 let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
3169 let target = format!("{url}/api/v1/sync/push");
3170
3171 let outcome = post_and_classify(&client, &target, &body, "mem-x", Some("mem-x")).await;
3172 match outcome {
3173 AckOutcome::Fail(reason) => {
3174 assert!(
3175 reason.contains("first:") && reason.contains("retry:"),
3176 "expected both attempts in reason, got {reason:?}"
3177 );
3178 assert!(
3180 reason.contains("http 500"),
3181 "expected 5xx in reason, got {reason:?}"
3182 );
3183 }
3184 other => panic!("expected AckOutcome::Fail, got {other:?}"),
3185 }
3186 assert_eq!(
3187 count.load(Ordering::Relaxed),
3188 2,
3189 "first attempt + one retry = exactly two POSTs"
3190 );
3191 }
3192
3193 #[tokio::test]
3199 async fn post_and_classify_id_drift_does_not_retry() {
3200 let count = Arc::new(AtomicUsize::new(0));
3202 let cnt_clone = count.clone();
3203 let app = Router::new().route(
3204 "/api/v1/sync/push",
3205 post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
3206 let c = cnt_clone.clone();
3207 async move {
3208 c.fetch_add(1, Ordering::Relaxed);
3209 (
3210 StatusCode::OK,
3211 AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
3212 )
3213 }
3214 }),
3215 );
3216 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3217 let addr = listener.local_addr().unwrap();
3218 tokio::spawn(async move {
3219 axum::serve(listener, app).await.ok();
3220 });
3221 let url = format!("http://{addr}/api/v1/sync/push");
3222
3223 let client = reqwest::Client::builder()
3224 .timeout(Duration::from_millis(2000))
3225 .build()
3226 .unwrap();
3227 let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
3228 let outcome = post_and_classify(&client, &url, &body, "mem-x", Some("mem-x")).await;
3229 assert!(
3230 matches!(outcome, AckOutcome::IdDrift),
3231 "expected IdDrift, got {outcome:?}"
3232 );
3233 assert_eq!(
3234 count.load(Ordering::Relaxed),
3235 1,
3236 "IdDrift must NOT trigger the retry path (only one POST)"
3237 );
3238 }
3239
3240 #[tokio::test]
3246 async fn bulk_catchup_push_no_peers_is_noop() {
3247 let client = reqwest::Client::builder()
3248 .timeout(Duration::from_millis(500))
3249 .build()
3250 .unwrap();
3251 let cfg = FederationConfig {
3252 policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
3253 .unwrap(),
3254 peers: Vec::new(),
3255 client,
3256 sender_agent_id: "ai:no-peers".to_string(),
3257 };
3258 let mems = vec![sample_memory()];
3261 let errors = bulk_catchup_push(&cfg, &mems).await;
3262 assert!(
3263 errors.is_empty(),
3264 "no-peers catchup must return empty error vec immediately, got {errors:?}"
3265 );
3266 }
3267
3268 #[tokio::test]
3275 async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
3276 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3277 let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
3278 let cfg = build_config(vec![url1, url2], 2, 2000);
3279 let mems = vec![sample_memory()];
3280 let errors = bulk_catchup_push(&cfg, &mems).await;
3281 assert_eq!(
3282 errors.len(),
3283 1,
3284 "exactly one failing peer should be in errors, got {errors:?}"
3285 );
3286 let (peer_id, reason) = &errors[0];
3287 assert!(
3290 peer_id.starts_with("peer-1"),
3291 "failing peer should be peer-1, got {peer_id}"
3292 );
3293 assert!(
3294 reason.contains("http 500"),
3295 "expected http 500 reason, got {reason}"
3296 );
3297 assert_eq!(count1.load(Ordering::Relaxed), 1);
3299 assert_eq!(count2.load(Ordering::Relaxed), 1);
3300 }
3301
3302 #[tokio::test]
3307 async fn quorum_w1_local_commit_alone_is_sufficient() {
3308 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3309 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3310 let cfg = build_config(vec![url1, url2], 1, 1000);
3312 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
3313 .await
3314 .unwrap();
3315 let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
3316 assert_eq!(count, 1, "W=1 quorum returns local-only count");
3317 }
3318
3319 #[test]
3324 fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
3325 let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
3326 let mut t = AckTracker::new(p3, Instant::now());
3330 t.record_local();
3331 assert!(
3333 !t.is_quorum_met(Instant::now()),
3334 "majority-of-3 needs more than local"
3335 );
3336 t.record_peer_ack("peer-a");
3337 assert!(
3338 t.is_quorum_met(Instant::now()),
3339 "local + 1 peer ack = 2 = majority of 3"
3340 );
3341
3342 let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
3343 let mut t5 = AckTracker::new(p5, Instant::now());
3344 t5.record_local();
3345 t5.record_peer_ack("a");
3346 assert!(
3347 !t5.is_quorum_met(Instant::now()),
3348 "majority-of-5 needs 3 acks"
3349 );
3350 t5.record_peer_ack("b");
3351 assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
3352 }
3353
3354 #[test]
3359 fn quorum_policy_majority_rejects_zero() {
3360 let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
3361 match err {
3362 QuorumError::InvalidPolicy { detail } => {
3363 assert!(
3364 detail.contains("n must be"),
3365 "expected n>=1 message, got {detail}"
3366 );
3367 }
3368 other => panic!("expected InvalidPolicy, got {other:?}"),
3369 }
3370 }
3371
3372 #[test]
3378 fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
3379 let result = FederationConfig::build(
3380 2,
3381 &[
3382 "http://peer.example".to_string(),
3383 "http://peer.example/".to_string(),
3384 ],
3385 Duration::from_millis(500),
3386 None,
3387 None,
3388 None,
3389 "ai:dup-test".to_string(),
3390 );
3391 let err = match result {
3392 Ok(_) => panic!("trailing-slash dup must be rejected"),
3393 Err(e) => e,
3394 };
3395 let msg = format!("{err}");
3396 assert!(
3397 msg.contains("duplicate peer URL"),
3398 "expected duplicate-peer error, got {msg}"
3399 );
3400 }
3401
3402 #[test]
3406 fn config_build_rejects_duplicate_peers_differing_only_in_case() {
3407 let result = FederationConfig::build(
3408 2,
3409 &[
3410 "http://Peer.Example".to_string(),
3411 "http://peer.example".to_string(),
3412 ],
3413 Duration::from_millis(500),
3414 None,
3415 None,
3416 None,
3417 "ai:dup-case-test".to_string(),
3418 );
3419 let err = match result {
3420 Ok(_) => panic!("case-only dup must be rejected"),
3421 Err(e) => e,
3422 };
3423 let msg = format!("{err}");
3424 assert!(
3425 msg.contains("duplicate peer URL"),
3426 "expected duplicate-peer error, got {msg}"
3427 );
3428 }
3429
3430 #[tokio::test]
3437 async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
3438 let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
3439 let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
3440 let cfg = build_config(vec![url1, url2], 2, 200);
3443 let start = Instant::now();
3444 let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
3445 let elapsed = start.elapsed();
3446 assert!(
3449 elapsed < Duration::from_secs(2),
3450 "archive_quorum must exit at deadline, took {elapsed:?}"
3451 );
3452 let err = finalise_quorum(&tracker).unwrap_err();
3453 assert!(
3454 matches!(err, QuorumError::QuorumNotMet { .. }),
3455 "expected QuorumNotMet, got {err:?}"
3456 );
3457 }
3458
3459 #[tokio::test]
3464 async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
3465 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3469 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3470 let cfg = build_config(vec![url1, url2], 2, 100);
3472 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
3473 .await
3474 .unwrap();
3475 tokio::time::sleep(Duration::from_millis(150)).await;
3479 let err = finalise_quorum(&tracker).unwrap_err();
3480 let payload = QuorumNotMetPayload::from_err(&err);
3481 assert_eq!(payload.error, "quorum_not_met");
3482 assert_eq!(payload.got, 1, "only local commit");
3483 assert_eq!(payload.needed, 2);
3484 assert!(
3485 payload.reason == "unreachable" || payload.reason == "timeout",
3486 "expected unreachable/timeout, got {}",
3487 payload.reason
3488 );
3489 }
3490
3491 #[tokio::test]
3497 async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
3498 let (url, hits, _, last_peer) =
3499 spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3500 let client = reqwest::Client::builder()
3504 .timeout(Duration::from_millis(2000))
3505 .build()
3506 .unwrap();
3507 let cfg = FederationConfig {
3508 policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
3509 .unwrap(),
3510 peers: vec![PeerEndpoint {
3511 id: "peer-0".to_string(),
3512 sync_push_url: url.clone(),
3515 }],
3516 client,
3517 sender_agent_id: "ai:no-suffix".to_string(),
3518 };
3519 let db = build_test_db();
3520 catchup_once(&cfg, &db).await;
3521 assert_eq!(hits.load(Ordering::Relaxed), 1);
3523 assert_eq!(
3524 last_peer.lock().await.as_deref(),
3525 Some("ai:no-suffix"),
3526 "local agent id should be forwarded as ?peer="
3527 );
3528 }
3529
3530 #[tokio::test]
3536 async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
3537 let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
3539 let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
3541 bad.source = "made-up-source-not-in-allowlist".to_string();
3542 let mems = vec![valid.clone(), bad];
3543
3544 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
3545 let cfg = build_catchup_cfg(&url, 2000);
3546 let db = build_test_db();
3547 catchup_once(&cfg, &db).await;
3548
3549 assert_eq!(hits.load(Ordering::Relaxed), 1);
3550 let lock = db.lock().await;
3551 let count: i64 = lock
3553 .0
3554 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3555 .unwrap();
3556 assert_eq!(count, 1, "only the valid memory should land");
3557 let title: String = lock
3558 .0
3559 .query_row(
3560 "SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
3561 [],
3562 |r| r.get(0),
3563 )
3564 .unwrap();
3565 assert_eq!(title, "ok-mem");
3566 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3571 assert_eq!(
3572 clock.entries.get("peer-0").map(String::as_str),
3573 Some("2026-04-26T10:00:00Z"),
3574 "sync_state tracks latest_ts of validate-passing rows"
3575 );
3576 }
3577
3578 #[test]
3583 fn ack_tracker_record_peer_ack_is_idempotent() {
3584 let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
3585 .expect("policy");
3586 let mut t = AckTracker::new(policy, Instant::now());
3587 t.record_local();
3588 t.record_peer_ack("peer-a");
3589 t.record_peer_ack("peer-a"); assert!(t.is_quorum_met(Instant::now()));
3592 t.record_peer_ack("peer-b");
3594 assert!(t.is_quorum_met(Instant::now()));
3595 }
3596
3597 #[tokio::test]
3603 async fn catchup_once_body_without_memories_key_is_skipped() {
3604 let app = Router::new().route(
3606 "/api/v1/sync/since",
3607 axum::routing::get(|| async {
3608 (
3609 StatusCode::OK,
3610 AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
3611 )
3612 }),
3613 );
3614 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3615 let addr = listener.local_addr().unwrap();
3616 tokio::spawn(async move {
3617 axum::serve(listener, app).await.ok();
3618 });
3619 let url = format!("http://{addr}");
3620 let cfg = build_catchup_cfg(&url, 2000);
3621 let db = build_test_db();
3622 catchup_once(&cfg, &db).await;
3623 let lock = db.lock().await;
3624 let count: i64 = lock
3625 .0
3626 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3627 .unwrap();
3628 assert_eq!(count, 0, "no memories key → no inserts");
3629 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3630 assert!(
3631 clock.entries.get("peer-0").is_none(),
3632 "no memories key → sync_state untouched"
3633 );
3634 }
3635
3636 #[tokio::test]
3641 async fn catchup_once_unparseable_individual_memory_is_skipped() {
3642 let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
3645 let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
3646 let app = Router::new().route(
3647 "/api/v1/sync/since",
3648 axum::routing::get(move || {
3649 let valid = valid_mem.clone();
3650 let bad = bad_mem.clone();
3651 async move {
3652 (
3653 StatusCode::OK,
3654 AxumJson(serde_json::json!({"memories": [valid, bad]})),
3655 )
3656 }
3657 }),
3658 );
3659 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3660 let addr = listener.local_addr().unwrap();
3661 tokio::spawn(async move {
3662 axum::serve(listener, app).await.ok();
3663 });
3664 let url = format!("http://{addr}");
3665 let cfg = build_catchup_cfg(&url, 2000);
3666 let db = build_test_db();
3667 catchup_once(&cfg, &db).await;
3668 let lock = db.lock().await;
3669 let count: i64 = lock
3671 .0
3672 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3673 .unwrap();
3674 assert_eq!(count, 1, "only parseable memory inserted");
3675 }
3676
3677 #[tokio::test]
3681 async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
3682 let url1 = spawn_id_drift_peer().await;
3683 let url2 = spawn_id_drift_peer().await;
3684 let cfg = build_config(vec![url1, url2], 2, 1000);
3685 let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
3686 let err = finalise_quorum(&tracker).unwrap_err();
3688 assert!(
3689 matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
3690 "expected QuorumNotMet got=1, got {err:?}"
3691 );
3692 assert_eq!(
3694 tracker.id_drift_count(),
3695 2,
3696 "both peers should be recorded as drift"
3697 );
3698 }
3699
3700 #[tokio::test]
3703 async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
3704 let url1 = spawn_id_drift_peer().await;
3705 let cfg = build_config(vec![url1], 2, 1000);
3706 let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
3707 let err = finalise_quorum(&tracker).unwrap_err();
3708 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3709 assert_eq!(tracker.id_drift_count(), 1);
3710 }
3711
3712 #[tokio::test]
3715 async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
3716 let url1 = spawn_id_drift_peer().await;
3717 let cfg = build_config(vec![url1], 2, 1000);
3718 let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
3719 let err = finalise_quorum(&tracker).unwrap_err();
3720 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3721 assert_eq!(tracker.id_drift_count(), 1);
3722 }
3723
3724 #[tokio::test]
3727 async fn link_quorum_id_drift_peer_records_drift_not_ack() {
3728 let url1 = spawn_id_drift_peer().await;
3729 let cfg = build_config(vec![url1], 2, 1000);
3730 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
3731 let err = finalise_quorum(&tracker).unwrap_err();
3732 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3733 assert_eq!(tracker.id_drift_count(), 1);
3734 }
3735
3736 #[tokio::test]
3739 async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
3740 let url1 = spawn_id_drift_peer().await;
3741 let cfg = build_config(vec![url1], 2, 1000);
3742 let new_mem = sample_memory();
3743 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
3744 .await
3745 .unwrap();
3746 let err = finalise_quorum(&tracker).unwrap_err();
3747 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3748 assert_eq!(tracker.id_drift_count(), 1);
3749 }
3750
3751 #[tokio::test]
3754 async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
3755 let url1 = spawn_id_drift_peer().await;
3756 let cfg = build_config(vec![url1], 2, 1000);
3757 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
3758 .await
3759 .unwrap();
3760 let err = finalise_quorum(&tracker).unwrap_err();
3761 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3762 assert_eq!(tracker.id_drift_count(), 1);
3763 }
3764
3765 #[tokio::test]
3768 async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
3769 let url1 = spawn_id_drift_peer().await;
3770 let cfg = build_config(vec![url1], 2, 1000);
3771 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
3772 .await
3773 .unwrap();
3774 let err = finalise_quorum(&tracker).unwrap_err();
3775 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3776 assert_eq!(tracker.id_drift_count(), 1);
3777 }
3778
3779 #[tokio::test]
3782 async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
3783 let url1 = spawn_id_drift_peer().await;
3784 let cfg = build_config(vec![url1], 2, 1000);
3785 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
3786 .await
3787 .unwrap();
3788 let err = finalise_quorum(&tracker).unwrap_err();
3789 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3790 assert_eq!(tracker.id_drift_count(), 1);
3791 }
3792
3793 #[tokio::test]
3796 async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
3797 let url1 = spawn_id_drift_peer().await;
3798 let cfg = build_config(vec![url1], 2, 1000);
3799 let namespaces = vec!["app/team".to_string()];
3800 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
3801 .await
3802 .unwrap();
3803 let err = finalise_quorum(&tracker).unwrap_err();
3804 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3805 assert_eq!(tracker.id_drift_count(), 1);
3806 }
3807
3808 #[tokio::test]
3814 async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
3815 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3816 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
3817 let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
3818 let cfg = build_config(vec![url1, url2, url3], 2, 2000);
3819 let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
3820 for _ in 0..100 {
3823 if count1.load(Ordering::Relaxed) >= 1
3824 && count2.load(Ordering::Relaxed) >= 1
3825 && count3.load(Ordering::Relaxed) >= 1
3826 {
3827 break;
3828 }
3829 tokio::time::sleep(Duration::from_millis(20)).await;
3830 }
3831 assert!(
3834 count3.load(Ordering::Relaxed) >= 1,
3835 "failing peer must be reached by the detached fanout"
3836 );
3837 }
3838
3839 #[test]
3844 fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
3845 let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
3847 .expect("policy");
3848 let now = Instant::now();
3849 let mut t = AckTracker::new(policy, now);
3850 t.record_local();
3851 let err = t.finalise(now).unwrap_err();
3853 match err {
3854 QuorumError::QuorumNotMet {
3855 got,
3856 needed,
3857 reason,
3858 } => {
3859 assert_eq!(got, 1);
3860 assert_eq!(needed, 2);
3861 assert_eq!(
3862 reason,
3863 QuorumFailureReason::InFlight,
3864 "pre-deadline insufficient-ack must classify as InFlight"
3865 );
3866 }
3867 other => panic!("expected QuorumNotMet, got {other:?}"),
3868 }
3869 }
3870}