1pub mod identity;
41pub mod peer;
42pub mod peer_attestation;
43#[cfg(feature = "sal")]
51pub mod push_dlq;
52pub mod quorum;
53pub mod receive;
54pub mod reflection_bookkeeping;
55pub mod signing;
60pub mod sync;
61pub mod vector_clock;
62
63pub use quorum::*;
64pub use receive::spawn_catchup_loop;
65#[cfg(feature = "sal")]
66pub use receive::spawn_catchup_loop_with_store;
67pub use receive::catchup_once_for_tests;
73pub use sync::*;
74#[cfg(feature = "sal")]
77pub use push_dlq::{
78 FederationDlqSink, FederationPushDlqRow, REPLAY_BATCH_SIZE, replay_once,
79 spawn_replay_federation_push_dlq,
80};
81
82use crate::replication::QuorumPolicy;
83
84pub(crate) const SYNC_TRACE_TARGET: &str = "ai_memory::federation::sync";
88
89pub(crate) const SIGNING_TRACE_TARGET: &str = "federation::signing";
95
96#[derive(Clone)]
99pub struct FederationConfig {
100 pub policy: QuorumPolicy,
101 pub peers: Vec<PeerEndpoint>,
102 pub client: reqwest::Client,
103 pub sender_agent_id: String,
104 pub api_key: Option<String>,
113 pub signing_key: Option<std::sync::Arc<ed25519_dalek::SigningKey>>,
118 #[cfg(feature = "sal")]
131 pub dlq_sink: Option<std::sync::Arc<dyn push_dlq::FederationDlqSink>>,
132}
133
134#[derive(Clone, Debug)]
137pub struct PeerEndpoint {
138 pub id: String,
139 pub sync_push_url: String,
140}
141
142#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
186pub struct ShippedEmbedding {
187 pub memory_id: String,
189 pub model: String,
193 pub dim: usize,
197 pub vector: Vec<f32>,
199}
200
201impl ShippedEmbedding {
202 #[must_use]
206 pub fn new(memory_id: String, model: String, vector: Vec<f32>) -> Self {
207 Self {
208 memory_id,
209 model,
210 dim: vector.len(),
211 vector,
212 }
213 }
214}
215
216pub const SHIPPED_VECTOR_NORM_TOLERANCE: f32 = 1e-3;
221
222#[must_use]
255pub fn sanitize_shipped_vector(vector: &[f32]) -> Option<Vec<f32>> {
256 if vector.is_empty() || vector.iter().any(|x| !x.is_finite()) {
257 return None;
258 }
259 let norm_sq: f32 = vector.iter().map(|x| x * x).sum();
260 if !norm_sq.is_finite() || norm_sq <= 0.0 {
261 return None;
262 }
263 let norm = norm_sq.sqrt();
264 if (norm - 1.0).abs() <= SHIPPED_VECTOR_NORM_TOLERANCE {
265 return Some(vector.to_vec());
266 }
267 let inv = 1.0 / norm;
268 Some(vector.iter().map(|x| x * inv).collect())
269}
270
271#[cfg(test)]
272mod sanitize_shipped_vector_tests {
273 use super::sanitize_shipped_vector;
274
275 #[test]
278 fn rejects_non_finite_components() {
279 assert!(sanitize_shipped_vector(&[0.6, f32::NAN, 0.8]).is_none());
280 assert!(sanitize_shipped_vector(&[f32::INFINITY, 0.0]).is_none());
281 assert!(sanitize_shipped_vector(&[1.0, f32::NEG_INFINITY]).is_none());
282 }
283
284 #[test]
287 fn rejects_zero_and_empty() {
288 assert!(sanitize_shipped_vector(&[]).is_none());
289 assert!(sanitize_shipped_vector(&[0.0, 0.0, 0.0]).is_none());
290 }
291
292 #[test]
295 fn unit_norm_vector_passes_through() {
296 let v = vec![0.6_f32, 0.8]; let out = sanitize_shipped_vector(&v).expect("unit-norm accepted");
298 assert_eq!(out, v);
299 }
300
301 #[test]
305 fn high_magnitude_vector_is_normalized() {
306 let v = vec![30.0_f32, 40.0]; let out = sanitize_shipped_vector(&v).expect("finite vector normalized");
308 let norm: f32 = out.iter().map(|x| x * x).sum::<f32>().sqrt();
309 assert!(
310 (norm - 1.0).abs() < 1e-6,
311 "normalized to unit norm; got {norm}"
312 );
313 assert!((out[0] - 0.6).abs() < 1e-6 && (out[1] - 0.8).abs() < 1e-6);
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::receive::{catchup_once, urlencoding_encode};
321 use super::sync::AckOutcome;
322 use super::*;
323 use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
324 use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
325 use axum::Router;
326 use axum::extract::Json as AxumJson;
327 use axum::http::StatusCode;
328 use axum::routing::post;
329 use std::sync::Arc;
330 use std::sync::atomic::{AtomicUsize, Ordering};
331 use std::time::{Duration, Instant};
332 use tokio::net::TcpListener;
333 use tokio::sync::Mutex;
334
335 fn sample_memory() -> Memory {
336 let now = chrono::Utc::now().to_rfc3339();
337 Memory {
338 id: "fed-test".to_string(),
339 tier: crate::models::Tier::Mid,
340 namespace: "app".to_string(),
341 title: "hello".to_string(),
342 content: "world for federation test".to_string(),
343 tags: vec!["t".to_string()],
344 priority: 5,
345 confidence: 1.0,
346 source: "test".to_string(),
347 access_count: 0,
348 created_at: now.clone(),
349 updated_at: now,
350 last_accessed_at: None,
351 expires_at: None,
352 metadata: serde_json::json!({"agent_id":"ai:test"}),
353 reflection_depth: 0,
354 memory_kind: crate::models::MemoryKind::Observation,
355 entity_id: None,
356 persona_version: None,
357 citations: Vec::new(),
358 source_uri: None,
359 source_span: None,
360 confidence_source: crate::models::ConfidenceSource::CallerProvided,
361 confidence_signals: None,
362 confidence_decayed_at: None,
363 version: 1,
364 }
365 }
366
367 #[derive(Clone, Copy)]
368 enum MockBehaviour {
369 Ack,
370 Fail,
371 Hang,
372 FailThenAck {
375 fail_until: usize,
376 },
377 }
378
379 #[derive(Clone)]
380 struct MockState {
381 behaviour: MockBehaviour,
382 count: Arc<AtomicUsize>,
383 }
384
385 async fn mock_handler(
386 axum::extract::State(state): axum::extract::State<MockState>,
387 AxumJson(_body): AxumJson<serde_json::Value>,
388 ) -> (StatusCode, AxumJson<serde_json::Value>) {
389 let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
390 match state.behaviour {
391 MockBehaviour::Ack => (
392 StatusCode::OK,
393 AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
394 ),
395 MockBehaviour::Fail => (
396 StatusCode::INTERNAL_SERVER_ERROR,
397 AxumJson(serde_json::json!({"error":"stub failure"})),
398 ),
399 MockBehaviour::Hang => {
400 tokio::time::sleep(Duration::from_secs(10)).await;
401 (StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
402 }
403 MockBehaviour::FailThenAck { fail_until } => {
404 if call <= fail_until {
405 (
406 StatusCode::INTERNAL_SERVER_ERROR,
407 AxumJson(serde_json::json!({"error":"stub transient failure"})),
408 )
409 } else {
410 (
411 StatusCode::OK,
412 AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
413 )
414 }
415 }
416 }
417 }
418
419 async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
420 let call_count = Arc::new(AtomicUsize::new(0));
421 let state = MockState {
422 behaviour,
423 count: call_count.clone(),
424 };
425 let app = Router::new()
426 .route("/api/v1/sync/push", post(mock_handler))
427 .with_state(state);
428 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
429 let addr = listener.local_addr().unwrap();
430 tokio::spawn(async move {
431 axum::serve(listener, app).await.ok();
432 });
433 (format!("http://{addr}"), call_count)
434 }
435
436 fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
437 let client = reqwest::Client::builder()
438 .timeout(Duration::from_millis(timeout_ms))
439 .build()
440 .unwrap();
441 let n = 1 + peers.len();
442 FederationConfig {
443 policy: QuorumPolicy::new(
444 n,
445 w,
446 Duration::from_millis(timeout_ms),
447 Duration::from_secs(30),
448 )
449 .unwrap(),
450 peers: peers
451 .into_iter()
452 .enumerate()
453 .map(|(i, url)| PeerEndpoint {
454 id: format!("peer-{i}:{url}"),
455 sync_push_url: format!("{url}/api/v1/sync/push"),
456 })
457 .collect(),
458 client,
459 sender_agent_id: "ai:fed-test".to_string(),
460 api_key: None,
461 signing_key: None,
462 #[cfg(feature = "sal")]
463 dlq_sink: None,
464 }
465 }
466
467 #[tokio::test]
468 async fn happy_path_two_peers_quorum_met() {
469 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
470 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
471 let cfg = build_config(vec![url1, url2], 2, 2000);
472 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
473 .await
474 .unwrap();
475 let result = finalise_quorum(&tracker);
476 assert!(result.is_ok(), "expected quorum met, got {result:?}");
477 let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
483 assert!(calls >= 1);
484 }
485
486 #[tokio::test]
500 async fn broadcast_emits_entry_line_log_for_track_d_grep() {
501 use tracing_subscriber::Registry;
502 use tracing_subscriber::layer::SubscriberExt;
503
504 #[derive(Clone, Default)]
505 struct CaptureLayer(Arc<std::sync::Mutex<Vec<String>>>);
506 impl<S: tracing::Subscriber> tracing_subscriber::Layer<S> for CaptureLayer {
507 fn on_event(
508 &self,
509 event: &tracing::Event<'_>,
510 _ctx: tracing_subscriber::layer::Context<'_, S>,
511 ) {
512 struct Visit<'a>(&'a mut Vec<String>);
513 impl tracing::field::Visit for Visit<'_> {
514 fn record_debug(
515 &mut self,
516 field: &tracing::field::Field,
517 value: &dyn std::fmt::Debug,
518 ) {
519 if field.name() == "message" {
520 self.0.push(format!("{value:?}"));
521 }
522 }
523 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
524 if field.name() == "message" {
525 self.0.push(value.to_string());
526 }
527 }
528 }
529 let mut local: Vec<String> = Vec::new();
530 event.record(&mut Visit(&mut local));
531 if let Ok(mut buf) = self.0.lock() {
532 buf.extend(local);
533 }
534 }
535 }
536
537 let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
538 let cfg = build_config(vec![url1], 1, 1000);
539
540 let layer = CaptureLayer::default();
541 let messages = layer.0.clone();
542 let dispatch = tracing::Dispatch::new(Registry::default().with(layer));
543
544 {
550 let _guard = tracing::dispatcher::set_default(&dispatch);
551 let _ = broadcast_store_quorum(&cfg, &sample_memory())
552 .await
553 .expect("broadcast must succeed");
554 }
555
556 let captured = messages.lock().unwrap().clone();
557 let joined = captured.join("\n");
558 assert!(
559 joined.contains("federation::broadcast: store"),
560 "expected entry-line log `federation::broadcast: store ... -> 1 peer(s)`; got:\n{joined}"
561 );
562 }
563
564 #[tokio::test]
565 async fn post_quorum_fanout_reaches_all_peers() {
566 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
571 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
572 let cfg = build_config(vec![url1, url2], 2, 2000);
573 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
574 .await
575 .unwrap();
576 for _ in 0..20 {
579 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
580 break;
581 }
582 tokio::time::sleep(Duration::from_millis(10)).await;
583 }
584 assert_eq!(
585 count1.load(Ordering::Relaxed),
586 1,
587 "peer-1 must receive the write post-quorum"
588 );
589 assert_eq!(
590 count2.load(Ordering::Relaxed),
591 1,
592 "peer-2 must receive the write post-quorum"
593 );
594 }
595
596 #[tokio::test]
597 async fn transient_peer_failure_is_retried_once() {
598 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
604 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
605 let cfg = build_config(vec![url1, url2], 2, 2000);
606 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
607 .await
608 .unwrap();
609 for _ in 0..200 {
611 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
612 break;
613 }
614 tokio::time::sleep(Duration::from_millis(10)).await;
615 }
616 assert_eq!(
617 count1.load(Ordering::Relaxed),
618 1,
619 "peer-1 acked first time, no retry"
620 );
621 assert_eq!(
622 count2.load(Ordering::Relaxed),
623 2,
624 "peer-2 must see exactly two attempts (first fail, retry ack)"
625 );
626 }
627
628 #[tokio::test]
629 async fn persistent_peer_failure_stops_after_one_retry() {
630 let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
634 let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
635 let cfg = build_config(vec![url1, url2], 2, 2000);
636 let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
637 .await
638 .unwrap();
639 tokio::time::sleep(Duration::from_millis(800)).await;
641 assert_eq!(
642 count2.load(Ordering::Relaxed),
643 2,
644 "persistently-failing peer must be called exactly twice (1 + 1 retry)"
645 );
646 }
647
648 #[tokio::test]
649 async fn bulk_catchup_push_hits_every_peer_once() {
650 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
653 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
654 let cfg = build_config(vec![url1, url2], 2, 2000);
655 let mems = vec![sample_memory(), sample_memory(), sample_memory()];
656 let errors = bulk_catchup_push(&cfg, &mems).await;
657 assert!(
658 errors.is_empty(),
659 "catchup must succeed on healthy peers, got {errors:?}"
660 );
661 assert_eq!(
662 count1.load(Ordering::Relaxed),
663 1,
664 "peer-1 must receive exactly one catchup batch"
665 );
666 assert_eq!(
667 count2.load(Ordering::Relaxed),
668 1,
669 "peer-2 must receive exactly one catchup batch"
670 );
671 }
672
673 #[tokio::test]
674 async fn bulk_catchup_push_reports_peer_failures() {
675 let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
679 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
680 let cfg = build_config(vec![url1, url2], 2, 2000);
681 let mems = vec![sample_memory()];
682 let errors = bulk_catchup_push(&cfg, &mems).await;
683 assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
684 assert!(
685 errors[0].1.contains("500") || errors[0].1.contains("http"),
686 "error must name the HTTP failure, got {:?}",
687 errors[0]
688 );
689 }
690
691 #[tokio::test]
692 async fn bulk_catchup_push_empty_inputs_are_noop() {
693 let cfg = build_config(vec![], 1, 500);
695 assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
696
697 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
698 let cfg = build_config(vec![url1], 1, 500);
699 assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
700 assert_eq!(
701 count1.load(Ordering::Relaxed),
702 0,
703 "no catchup POST must fire when the row set is empty"
704 );
705 }
706
707 #[tokio::test]
708 async fn partition_minority_fails_quorum() {
709 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
711 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
712 let cfg = build_config(vec![url1, url2], 3, 500);
713 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
714 .await
715 .unwrap();
716 let err = finalise_quorum(&tracker).unwrap_err();
717 match err {
718 QuorumError::QuorumNotMet { got, needed, .. } => {
719 assert_eq!(got, 1, "local commit only");
720 assert_eq!(needed, 3);
721 }
722 other => panic!("expected QuorumNotMet, got {other:?}"),
723 }
724 }
725
726 #[tokio::test]
727 async fn timeout_on_hanging_peer_classified_timeout() {
728 let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
730 let cfg = build_config(vec![url1], 2, 200);
731 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
732 .await
733 .unwrap();
734 tokio::time::sleep(Duration::from_millis(50)).await;
736 let err = finalise_quorum(&tracker).unwrap_err();
737 match err {
738 QuorumError::QuorumNotMet { reason, .. } => {
739 assert!(
740 matches!(
741 reason,
742 QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
743 ),
744 "unexpected reason {reason:?}"
745 );
746 }
747 other => panic!("expected QuorumNotMet, got {other:?}"),
748 }
749 }
750
751 #[tokio::test]
752 async fn majority_quorum_tolerates_one_peer_down() {
753 let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
755 let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
756 let cfg = build_config(vec![url_up, url_down], 2, 2000);
757 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
758 .await
759 .unwrap();
760 let result = finalise_quorum(&tracker);
761 assert!(
762 result.is_ok(),
763 "majority should tolerate 1 peer down, got {result:?}"
764 );
765 }
766
767 #[test]
768 fn config_build_disabled_when_w_zero() {
769 let cfg = FederationConfig::build(
770 0,
771 &["http://example.com".to_string()],
772 Duration::from_millis(500),
773 None,
774 None,
775 None,
776 "ai:test".to_string(),
777 None,
778 )
779 .unwrap();
780 assert!(cfg.is_none());
781 }
782
783 #[test]
784 fn config_build_disabled_when_peers_empty() {
785 let cfg = FederationConfig::build(
786 2,
787 &[],
788 Duration::from_millis(500),
789 None,
790 None,
791 None,
792 "ai:test".to_string(),
793 None,
794 )
795 .unwrap();
796 assert!(cfg.is_none());
797 }
798
799 #[test]
800 fn quorum_not_met_payload_from_err() {
801 let err = QuorumError::QuorumNotMet {
802 got: 1,
803 needed: 3,
804 reason: QuorumFailureReason::Timeout,
805 };
806 let payload = QuorumNotMetPayload::from_err(&err);
807 assert_eq!(payload.error, "quorum_not_met");
808 assert_eq!(payload.got, 1);
809 assert_eq!(payload.needed, 3);
810 assert_eq!(payload.reason, "timeout");
811 }
812
813 #[tokio::test]
816 async fn archive_quorum_two_peers_ack_meets_quorum() {
817 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
818 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
819 let cfg = build_config(vec![url1, url2], 2, 2000);
820 let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
821 let result = finalise_quorum(&tracker);
822 assert!(result.is_ok(), "expected quorum met, got {result:?}");
823 for _ in 0..20 {
825 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
826 break;
827 }
828 tokio::time::sleep(Duration::from_millis(10)).await;
829 }
830 assert_eq!(count1.load(Ordering::Relaxed), 1);
831 assert_eq!(count2.load(Ordering::Relaxed), 1);
832 }
833
834 #[tokio::test]
835 async fn archive_quorum_partition_minority_fails() {
836 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
838 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
839 let cfg = build_config(vec![url1, url2], 3, 500);
840 let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
841 let err = finalise_quorum(&tracker).unwrap_err();
842 match err {
843 QuorumError::QuorumNotMet { got, needed, .. } => {
844 assert_eq!(got, 1);
845 assert_eq!(needed, 3);
846 }
847 other => panic!("expected QuorumNotMet, got {other:?}"),
848 }
849 }
850
851 #[tokio::test]
859 async fn delete_quorum_two_peers_ack_meets_quorum() {
860 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
861 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
862 let cfg = build_config(vec![url1, url2], 2, 2000);
863 let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
864 assert!(finalise_quorum(&tracker).is_ok());
865 for _ in 0..20 {
866 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
867 break;
868 }
869 tokio::time::sleep(Duration::from_millis(10)).await;
870 }
871 assert_eq!(count1.load(Ordering::Relaxed), 1);
872 assert_eq!(count2.load(Ordering::Relaxed), 1);
873 }
874
875 #[tokio::test]
876 async fn delete_quorum_partition_minority_fails() {
877 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
878 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
879 let cfg = build_config(vec![url1, url2], 3, 500);
880 let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
881 let err = finalise_quorum(&tracker).unwrap_err();
882 match err {
883 QuorumError::QuorumNotMet { got, needed, .. } => {
884 assert_eq!(got, 1);
885 assert_eq!(needed, 3);
886 }
887 other => panic!("expected QuorumNotMet, got {other:?}"),
888 }
889 }
890
891 #[tokio::test]
894 async fn restore_quorum_two_peers_ack_meets_quorum() {
895 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
896 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
897 let cfg = build_config(vec![url1, url2], 2, 2000);
898 let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
899 assert!(finalise_quorum(&tracker).is_ok());
900 for _ in 0..20 {
901 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
902 break;
903 }
904 tokio::time::sleep(Duration::from_millis(10)).await;
905 }
906 assert_eq!(count1.load(Ordering::Relaxed), 1);
907 assert_eq!(count2.load(Ordering::Relaxed), 1);
908 }
909
910 #[tokio::test]
911 async fn restore_quorum_partition_minority_fails() {
912 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
913 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
914 let cfg = build_config(vec![url1, url2], 3, 500);
915 let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
916 let err = finalise_quorum(&tracker).unwrap_err();
917 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
918 }
919
920 fn sample_link() -> MemoryLink {
923 MemoryLink {
924 source_id: "mem-a".to_string(),
925 target_id: "mem-b".to_string(),
926 relation: crate::models::MemoryLinkRelation::RelatedTo,
927 created_at: chrono::Utc::now().to_rfc3339(),
928 signature: None,
929 observed_by: None,
930 valid_from: None,
931 valid_until: None,
932 attest_level: None,
933 }
934 }
935
936 #[tokio::test]
937 async fn link_quorum_two_peers_ack_meets_quorum() {
938 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
939 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
940 let cfg = build_config(vec![url1, url2], 2, 2000);
941 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
942 assert!(finalise_quorum(&tracker).is_ok());
943 for _ in 0..20 {
944 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
945 break;
946 }
947 tokio::time::sleep(Duration::from_millis(10)).await;
948 }
949 assert_eq!(count1.load(Ordering::Relaxed), 1);
950 assert_eq!(count2.load(Ordering::Relaxed), 1);
951 }
952
953 #[tokio::test]
954 async fn link_quorum_partition_minority_fails() {
955 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
956 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
957 let cfg = build_config(vec![url1, url2], 3, 500);
958 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
959 let err = finalise_quorum(&tracker).unwrap_err();
960 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
961 }
962
963 #[tokio::test]
966 async fn consolidate_quorum_two_peers_ack_meets_quorum() {
967 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
968 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
969 let cfg = build_config(vec![url1, url2], 2, 2000);
970 let new_mem = sample_memory();
971 let sources = vec!["src-a".to_string(), "src-b".to_string()];
972 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
973 .await
974 .unwrap();
975 assert!(finalise_quorum(&tracker).is_ok());
976 for _ in 0..20 {
977 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
978 break;
979 }
980 tokio::time::sleep(Duration::from_millis(10)).await;
981 }
982 assert_eq!(count1.load(Ordering::Relaxed), 1);
983 assert_eq!(count2.load(Ordering::Relaxed), 1);
984 }
985
986 #[tokio::test]
987 async fn consolidate_quorum_partition_minority_fails() {
988 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
989 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
990 let cfg = build_config(vec![url1, url2], 3, 500);
991 let new_mem = sample_memory();
992 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
993 .await
994 .unwrap();
995 let err = finalise_quorum(&tracker).unwrap_err();
996 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
997 }
998
999 fn sample_pending() -> PendingAction {
1002 PendingAction {
1003 id: "pa-1".to_string(),
1004 action_type: "delete".to_string(),
1005 memory_id: Some("mem-x".to_string()),
1006 namespace: "app".to_string(),
1007 payload: serde_json::json!({}),
1008 requested_by: "ai:test".to_string(),
1009 requested_at: chrono::Utc::now().to_rfc3339(),
1010 status: "pending".to_string(),
1011 decided_by: None,
1012 decided_at: None,
1013 approvals: vec![],
1014 }
1015 }
1016
1017 #[tokio::test]
1018 async fn pending_quorum_two_peers_ack_meets_quorum() {
1019 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1020 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1021 let cfg = build_config(vec![url1, url2], 2, 2000);
1022 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
1023 .await
1024 .unwrap();
1025 assert!(finalise_quorum(&tracker).is_ok());
1026 for _ in 0..20 {
1027 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1028 break;
1029 }
1030 tokio::time::sleep(Duration::from_millis(10)).await;
1031 }
1032 assert_eq!(count1.load(Ordering::Relaxed), 1);
1033 assert_eq!(count2.load(Ordering::Relaxed), 1);
1034 }
1035
1036 #[tokio::test]
1037 async fn pending_quorum_partition_minority_fails() {
1038 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1039 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1040 let cfg = build_config(vec![url1, url2], 3, 500);
1041 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
1042 .await
1043 .unwrap();
1044 let err = finalise_quorum(&tracker).unwrap_err();
1045 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1046 }
1047
1048 fn sample_decision() -> PendingDecision {
1051 PendingDecision {
1052 id: "pa-1".to_string(),
1053 approved: true,
1054 decider: "ai:approver".to_string(),
1055 }
1056 }
1057
1058 #[tokio::test]
1059 async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
1060 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1061 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1062 let cfg = build_config(vec![url1, url2], 2, 2000);
1063 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
1064 .await
1065 .unwrap();
1066 assert!(finalise_quorum(&tracker).is_ok());
1067 for _ in 0..20 {
1068 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1069 break;
1070 }
1071 tokio::time::sleep(Duration::from_millis(10)).await;
1072 }
1073 assert_eq!(count1.load(Ordering::Relaxed), 1);
1074 assert_eq!(count2.load(Ordering::Relaxed), 1);
1075 }
1076
1077 #[tokio::test]
1078 async fn pending_decision_quorum_partition_minority_fails() {
1079 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1080 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1081 let cfg = build_config(vec![url1, url2], 3, 500);
1082 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
1083 .await
1084 .unwrap();
1085 let err = finalise_quorum(&tracker).unwrap_err();
1086 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1087 }
1088
1089 fn sample_namespace_meta() -> NamespaceMetaEntry {
1092 NamespaceMetaEntry {
1093 namespace: "app/team".to_string(),
1094 standard_id: "mem-std-1".to_string(),
1095 parent_namespace: Some("app".to_string()),
1096 updated_at: chrono::Utc::now().to_rfc3339(),
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
1102 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1103 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1104 let cfg = build_config(vec![url1, url2], 2, 2000);
1105 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
1106 .await
1107 .unwrap();
1108 assert!(finalise_quorum(&tracker).is_ok());
1109 for _ in 0..20 {
1110 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1111 break;
1112 }
1113 tokio::time::sleep(Duration::from_millis(10)).await;
1114 }
1115 assert_eq!(count1.load(Ordering::Relaxed), 1);
1116 assert_eq!(count2.load(Ordering::Relaxed), 1);
1117 }
1118
1119 #[tokio::test]
1120 async fn namespace_meta_quorum_partition_minority_fails() {
1121 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1122 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1123 let cfg = build_config(vec![url1, url2], 3, 500);
1124 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
1125 .await
1126 .unwrap();
1127 let err = finalise_quorum(&tracker).unwrap_err();
1128 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1129 }
1130
1131 #[tokio::test]
1134 async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
1135 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1136 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1137 let cfg = build_config(vec![url1, url2], 2, 2000);
1138 let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
1139 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
1140 .await
1141 .unwrap();
1142 assert!(finalise_quorum(&tracker).is_ok());
1143 for _ in 0..20 {
1144 if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1145 break;
1146 }
1147 tokio::time::sleep(Duration::from_millis(10)).await;
1148 }
1149 assert_eq!(count1.load(Ordering::Relaxed), 1);
1150 assert_eq!(count2.load(Ordering::Relaxed), 1);
1151 }
1152
1153 #[tokio::test]
1154 async fn namespace_meta_clear_quorum_partition_minority_fails() {
1155 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1156 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1157 let cfg = build_config(vec![url1, url2], 3, 500);
1158 let namespaces = vec!["app/team".to_string()];
1159 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
1160 .await
1161 .unwrap();
1162 let err = finalise_quorum(&tracker).unwrap_err();
1163 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1164 }
1165
1166 #[test]
1173 fn quorum_not_met_payload_unreachable_reason() {
1174 let err = QuorumError::QuorumNotMet {
1175 got: 1,
1176 needed: 2,
1177 reason: QuorumFailureReason::Unreachable,
1178 };
1179 let payload = QuorumNotMetPayload::from_err(&err);
1180 assert_eq!(payload.reason, "unreachable");
1181 }
1182
1183 #[test]
1184 fn quorum_not_met_payload_id_drift_reason() {
1185 let err = QuorumError::QuorumNotMet {
1186 got: 1,
1187 needed: 2,
1188 reason: QuorumFailureReason::IdDrift,
1189 };
1190 let payload = QuorumNotMetPayload::from_err(&err);
1191 assert_eq!(payload.reason, "id_drift");
1192 }
1193
1194 #[test]
1195 fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
1196 let err = QuorumError::QuorumNotMet {
1199 got: 1,
1200 needed: 2,
1201 reason: QuorumFailureReason::InFlight,
1202 };
1203 let payload = QuorumNotMetPayload::from_err(&err);
1204 assert_eq!(payload.reason, "timeout");
1205 }
1206
1207 #[test]
1208 fn quorum_not_met_payload_invalid_policy_branch() {
1209 let err = QuorumError::InvalidPolicy {
1210 detail: "bad-thing".to_string(),
1211 };
1212 let payload = QuorumNotMetPayload::from_err(&err);
1213 assert_eq!(payload.error, "quorum_not_met");
1214 assert_eq!(payload.got, 0);
1215 assert_eq!(payload.needed, 0);
1216 assert!(payload.reason.starts_with("invalid_policy:"));
1217 assert!(payload.reason.contains("bad-thing"));
1218 }
1219
1220 #[test]
1221 fn quorum_not_met_payload_local_write_failed_branch() {
1222 let err = QuorumError::LocalWriteFailed {
1223 detail: "disk-full".to_string(),
1224 };
1225 let payload = QuorumNotMetPayload::from_err(&err);
1226 assert_eq!(payload.error, "quorum_not_met");
1227 assert!(payload.reason.starts_with("local_write_failed:"));
1228 assert!(payload.reason.contains("disk-full"));
1229 }
1230
1231 #[test]
1234 fn config_build_constructs_when_w_and_peers_set() {
1235 let cfg = FederationConfig::build(
1236 2,
1237 &[
1238 "http://peer-a.example/".to_string(),
1239 "http://peer-b.example".to_string(),
1240 ],
1241 Duration::from_millis(500),
1242 None,
1243 None,
1244 None,
1245 "ai:builder".to_string(),
1246 None,
1247 )
1248 .unwrap()
1249 .expect("config should be Some when w>0 and peers nonempty");
1250 assert_eq!(cfg.peer_count(), 2);
1251 assert_eq!(cfg.peers[0].id, "peer-0");
1252 assert_eq!(cfg.peers[1].id, "peer-1");
1253 assert_eq!(
1255 cfg.peers[0].sync_push_url,
1256 "http://peer-a.example/api/v1/sync/push"
1257 );
1258 assert_eq!(
1259 cfg.peers[1].sync_push_url,
1260 "http://peer-b.example/api/v1/sync/push"
1261 );
1262 assert_eq!(cfg.sender_agent_id, "ai:builder");
1263 }
1264
1265 #[test]
1266 fn config_build_rejects_duplicate_peer_urls() {
1267 let result = FederationConfig::build(
1268 2,
1269 &[
1270 "http://peer.example".to_string(),
1271 "http://peer.example/".to_string(),
1272 ],
1273 Duration::from_millis(500),
1274 None,
1275 None,
1276 None,
1277 "ai:builder".to_string(),
1278 None,
1279 );
1280 let err = match result {
1281 Ok(_) => panic!("expected duplicate-URL rejection"),
1282 Err(e) => e,
1283 };
1284 let msg = format!("{err}");
1285 assert!(
1286 msg.contains("duplicate peer URL"),
1287 "expected duplicate-URL rejection, got {msg:?}"
1288 );
1289 }
1290
1291 #[test]
1292 fn config_build_rejects_missing_ca_cert_path() {
1293 let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
1295 let result = FederationConfig::build(
1296 2,
1297 &["http://peer.example".to_string()],
1298 Duration::from_millis(500),
1299 None,
1300 None,
1301 Some(&bogus),
1302 "ai:builder".to_string(),
1303 None,
1304 );
1305 let err = match result {
1306 Ok(_) => panic!("expected ca-cert read error"),
1307 Err(e) => e,
1308 };
1309 let msg = format!("{err}");
1310 assert!(
1311 msg.contains("read --quorum-ca-cert"),
1312 "expected ca-cert read error, got {msg:?}"
1313 );
1314 }
1315
1316 #[test]
1317 fn config_build_rejects_invalid_ca_cert_pem() {
1318 let dir = tempfile::tempdir().unwrap();
1320 let bad = dir.path().join("not-a-cert.pem");
1321 std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
1322 let result = FederationConfig::build(
1323 2,
1324 &["http://peer.example".to_string()],
1325 Duration::from_millis(500),
1326 None,
1327 None,
1328 Some(&bad),
1329 "ai:builder".to_string(),
1330 None,
1331 );
1332 let err = match result {
1333 Ok(_) => panic!("expected ca-cert parse error"),
1334 Err(e) => e,
1335 };
1336 let msg = format!("{err}");
1337 assert!(
1338 msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
1339 "expected ca-cert parse error, got {msg:?}"
1340 );
1341 }
1342
1343 #[test]
1344 fn config_build_rejects_missing_client_cert_path() {
1345 let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
1346 let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
1347 let result = FederationConfig::build(
1348 2,
1349 &["http://peer.example".to_string()],
1350 Duration::from_millis(500),
1351 Some(&bogus_cert),
1352 Some(&bogus_key),
1353 None,
1354 "ai:builder".to_string(),
1355 None,
1356 );
1357 let err = match result {
1358 Ok(_) => panic!("expected client-cert read error"),
1359 Err(e) => e,
1360 };
1361 let msg = format!("{err}");
1362 assert!(
1363 msg.contains("read --client-cert"),
1364 "expected client-cert read error, got {msg:?}"
1365 );
1366 }
1367
1368 #[test]
1369 fn peer_count_matches_peer_list() {
1370 let cfg = build_config(
1371 vec![
1372 "http://a.example".to_string(),
1373 "http://b.example".to_string(),
1374 "http://c.example".to_string(),
1375 ],
1376 2,
1377 500,
1378 );
1379 assert_eq!(cfg.peer_count(), 3);
1380 }
1381
1382 #[test]
1385 fn urlencoding_encode_passthrough_safe_chars() {
1386 let encoded = urlencoding_encode("abcXYZ-09_.~");
1388 assert_eq!(encoded, "abcXYZ-09_.~");
1389 }
1390
1391 #[test]
1392 fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
1393 let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
1395 assert!(
1396 encoded.contains("%3A"),
1397 "expected colon to be percent-encoded: {encoded}"
1398 );
1399 assert!(
1400 encoded.contains("%2B"),
1401 "expected + to be percent-encoded: {encoded}"
1402 );
1403 assert!(
1404 encoded.contains("%2F"),
1405 "expected / to be percent-encoded: {encoded}"
1406 );
1407 assert!(
1408 encoded.contains("%20"),
1409 "expected space to be percent-encoded: {encoded}"
1410 );
1411 assert!(
1413 !encoded.contains("%2D"),
1414 "hyphen must pass through unencoded: {encoded}"
1415 );
1416 }
1417
1418 #[test]
1419 fn urlencoding_encode_empty_string() {
1420 assert_eq!(urlencoding_encode(""), "");
1421 }
1422
1423 async fn id_drift_handler(
1431 AxumJson(_body): AxumJson<serde_json::Value>,
1432 ) -> (StatusCode, AxumJson<serde_json::Value>) {
1433 (
1435 StatusCode::OK,
1436 AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
1437 )
1438 }
1439
1440 async fn spawn_id_drift_peer() -> String {
1441 let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
1442 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1443 let addr = listener.local_addr().unwrap();
1444 tokio::spawn(async move {
1445 axum::serve(listener, app).await.ok();
1446 });
1447 format!("http://{addr}")
1448 }
1449
1450 #[tokio::test]
1451 async fn id_drift_peer_does_not_count_as_ack() {
1452 let url1 = spawn_id_drift_peer().await;
1456 let url2 = spawn_id_drift_peer().await;
1457 let cfg = build_config(vec![url1, url2], 2, 1000);
1458 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1459 .await
1460 .unwrap();
1461 let result = finalise_quorum(&tracker);
1462 let err = result.unwrap_err();
1464 match err {
1465 QuorumError::QuorumNotMet {
1466 got,
1467 needed,
1468 reason,
1469 } => {
1470 assert_eq!(got, 1, "only local should count");
1471 assert_eq!(needed, 2);
1472 assert!(
1480 matches!(
1481 reason,
1482 QuorumFailureReason::IdDrift
1483 | QuorumFailureReason::Timeout
1484 | QuorumFailureReason::InFlight
1485 ),
1486 "expected IdDrift / Timeout / InFlight, got {reason:?}"
1487 );
1488 }
1489 other => panic!("expected QuorumNotMet, got {other:?}"),
1490 }
1491 }
1492
1493 #[derive(Clone)]
1508 enum SinceMockBehaviour {
1509 ReturnMemories(Vec<Memory>),
1511 Error500,
1513 Hang(Duration),
1515 MalformedBody,
1517 }
1518
1519 #[derive(Clone)]
1520 struct SinceMockState {
1521 behaviour: SinceMockBehaviour,
1522 hits: Arc<AtomicUsize>,
1523 last_since: Arc<Mutex<Option<String>>>,
1524 last_peer: Arc<Mutex<Option<String>>>,
1525 }
1526
1527 async fn since_handler(
1528 axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
1529 axum::extract::State(state): axum::extract::State<SinceMockState>,
1530 ) -> axum::response::Response {
1531 use axum::response::IntoResponse;
1532 state.hits.fetch_add(1, Ordering::Relaxed);
1533 {
1534 let mut s = state.last_since.lock().await;
1535 *s = q.get("since").cloned();
1536 }
1537 {
1538 let mut p = state.last_peer.lock().await;
1539 *p = q.get("peer").cloned();
1540 }
1541 match &state.behaviour {
1542 SinceMockBehaviour::ReturnMemories(mems) => {
1543 let body = serde_json::json!({"memories": mems});
1544 (StatusCode::OK, AxumJson(body)).into_response()
1545 }
1546 SinceMockBehaviour::Error500 => (
1547 StatusCode::INTERNAL_SERVER_ERROR,
1548 AxumJson(serde_json::json!({"error":"oops"})),
1549 )
1550 .into_response(),
1551 SinceMockBehaviour::Hang(d) => {
1552 tokio::time::sleep(*d).await;
1553 (
1554 StatusCode::OK,
1555 AxumJson(serde_json::json!({"memories": []})),
1556 )
1557 .into_response()
1558 }
1559 SinceMockBehaviour::MalformedBody => {
1560 (
1563 [(axum::http::header::CONTENT_TYPE, crate::MIME_JSON)],
1564 "this is not json {{{",
1565 )
1566 .into_response()
1567 }
1568 }
1569 }
1570
1571 async fn spawn_since_peer(
1574 behaviour: SinceMockBehaviour,
1575 ) -> (
1576 String,
1577 Arc<AtomicUsize>,
1578 Arc<Mutex<Option<String>>>,
1579 Arc<Mutex<Option<String>>>,
1580 ) {
1581 let hits = Arc::new(AtomicUsize::new(0));
1582 let last_since = Arc::new(Mutex::new(None));
1583 let last_peer = Arc::new(Mutex::new(None));
1584 let state = SinceMockState {
1585 behaviour,
1586 hits: hits.clone(),
1587 last_since: last_since.clone(),
1588 last_peer: last_peer.clone(),
1589 };
1590 let app = Router::new()
1591 .route("/api/v1/sync/since", axum::routing::get(since_handler))
1592 .with_state(state);
1593 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1594 let addr = listener.local_addr().unwrap();
1595 tokio::spawn(async move {
1596 axum::serve(listener, app).await.ok();
1597 });
1598 (format!("http://{addr}"), hits, last_since, last_peer)
1599 }
1600
1601 fn build_test_db() -> crate::handlers::Db {
1605 let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
1606 let path = std::path::PathBuf::from(":memory:");
1607 Arc::new(Mutex::new((
1608 conn,
1609 path,
1610 crate::config::ResolvedTtl::default(),
1611 true,
1612 )))
1613 }
1614
1615 fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
1623 let client = reqwest::Client::builder()
1624 .timeout(Duration::from_millis(timeout_ms))
1625 .build()
1626 .unwrap();
1627 FederationConfig {
1628 policy: QuorumPolicy::new(
1629 2,
1630 1,
1631 Duration::from_millis(timeout_ms),
1632 Duration::from_secs(30),
1633 )
1634 .unwrap(),
1635 peers: vec![PeerEndpoint {
1636 id: "peer-0".to_string(),
1637 sync_push_url: format!("{peer_url}/api/v1/sync/push"),
1638 }],
1639 client,
1640 sender_agent_id: "ai:catchup-test".to_string(),
1641 api_key: None,
1642 signing_key: None,
1643 #[cfg(feature = "sal")]
1644 dlq_sink: None,
1645 }
1646 }
1647
1648 fn catchup_memory(title: &str, updated_at: &str) -> Memory {
1653 Memory {
1654 id: format!("cat-{title}"),
1655 tier: crate::models::Tier::Mid,
1656 namespace: "catchup".to_string(),
1657 title: title.to_string(),
1658 content: format!("content for {title}"),
1659 tags: vec!["catchup".to_string()],
1660 priority: 5,
1661 confidence: 1.0,
1662 source: "system".to_string(),
1668 access_count: 0,
1669 created_at: updated_at.to_string(),
1670 updated_at: updated_at.to_string(),
1671 last_accessed_at: None,
1672 expires_at: None,
1673 metadata: serde_json::json!({
1680 "agent_id": "ai:peer-0",
1681 "scope": "collective",
1682 }),
1683 reflection_depth: 0,
1684 memory_kind: crate::models::MemoryKind::Observation,
1685 entity_id: None,
1686 persona_version: None,
1687 citations: Vec::new(),
1688 source_uri: None,
1689 source_span: None,
1690 confidence_source: crate::models::ConfidenceSource::CallerProvided,
1691 confidence_signals: None,
1692 confidence_decayed_at: None,
1693 version: 1,
1694 }
1695 }
1696
1697 #[tokio::test]
1700 async fn test_catchup_once_pulls_since_cursor_advances_state() {
1701 let mems = vec![
1705 catchup_memory("a", "2026-04-26T10:00:00Z"),
1706 catchup_memory("b", "2026-04-26T10:00:01Z"),
1707 catchup_memory("c", "2026-04-26T10:00:02Z"),
1708 catchup_memory("d", "2026-04-26T10:00:03Z"),
1709 catchup_memory("e", "2026-04-26T10:00:04Z"),
1710 ];
1711 let latest_ts = mems.last().unwrap().updated_at.clone();
1712 let (url, hits, last_since, last_peer) =
1713 spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
1714 let cfg = build_catchup_cfg(&url, 2000);
1715 let db = build_test_db();
1716
1717 catchup_once(&cfg, &db).await;
1718
1719 assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
1720 assert!(
1722 last_since.lock().await.is_none(),
1723 "first catchup must omit since"
1724 );
1725 assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
1727 let lock = db.lock().await;
1729 let clock =
1730 crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
1731 assert_eq!(
1732 clock.entries.get("peer-0").map(String::as_str),
1733 Some(latest_ts.as_str()),
1734 "sync state advanced to latest pulled memory's updated_at"
1735 );
1736 let count: i64 = lock
1738 .0
1739 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1740 .unwrap();
1741 assert_eq!(count, 5, "all five memories inserted");
1742 }
1743
1744 #[tokio::test]
1747 async fn test_catchup_once_no_new_memories_no_op() {
1748 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1749 let cfg = build_catchup_cfg(&url, 2000);
1750 let db = build_test_db();
1751
1752 catchup_once(&cfg, &db).await;
1753
1754 assert_eq!(hits.load(Ordering::Relaxed), 1);
1755 let lock = db.lock().await;
1756 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1757 assert!(
1758 clock.entries.get("peer-0").is_none(),
1759 "empty response must not advance sync_state"
1760 );
1761 let count: i64 = lock
1762 .0
1763 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1764 .unwrap();
1765 assert_eq!(count, 0);
1766 }
1767
1768 #[tokio::test]
1771 async fn test_catchup_once_peer_500_error_logged_no_panic() {
1772 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
1773 let cfg = build_catchup_cfg(&url, 2000);
1774 let db = build_test_db();
1775
1776 catchup_once(&cfg, &db).await;
1778
1779 assert_eq!(hits.load(Ordering::Relaxed), 1);
1780 let lock = db.lock().await;
1781 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1782 assert!(
1783 clock.entries.get("peer-0").is_none(),
1784 "500 must not advance sync state"
1785 );
1786 }
1787
1788 #[tokio::test]
1791 async fn test_catchup_once_peer_timeout_handled() {
1792 let (url, hits, _, _) =
1795 spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
1796 let cfg = build_catchup_cfg(&url, 200);
1797 let db = build_test_db();
1798
1799 let start = Instant::now();
1800 catchup_once(&cfg, &db).await;
1801 let elapsed = start.elapsed();
1802
1803 assert!(
1806 elapsed < Duration::from_millis(1500),
1807 "catchup_once should honour the client timeout, took {elapsed:?}"
1808 );
1809 assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
1810 let lock = db.lock().await;
1811 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1812 assert!(clock.entries.get("peer-0").is_none());
1813 }
1814
1815 #[tokio::test]
1818 async fn test_catchup_once_malformed_response_handled() {
1819 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
1820 let cfg = build_catchup_cfg(&url, 2000);
1821 let db = build_test_db();
1822
1823 catchup_once(&cfg, &db).await;
1825
1826 assert_eq!(hits.load(Ordering::Relaxed), 1);
1827 let lock = db.lock().await;
1828 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1829 assert!(
1830 clock.entries.get("peer-0").is_none(),
1831 "malformed body must not advance sync state"
1832 );
1833 }
1834
1835 #[tokio::test]
1838 async fn test_catchup_once_inserts_only_newer_memories() {
1839 let db = build_test_db();
1844 {
1845 let lock = db.lock().await;
1846 let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
1847 crate::db::insert_if_newer(&lock.0, &local).unwrap();
1850 let cnt: i64 = lock
1852 .0
1853 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1854 .unwrap();
1855 assert_eq!(cnt, 1, "pre-seeded shared row");
1856 }
1857
1858 let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
1859 stale_shared.content = "stale-from-catchup-peer".to_string();
1862 stale_shared.id = "cat-shared-OLD".to_string();
1863 let stale_shared_content = stale_shared.content.clone();
1864 let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
1865 let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
1866 stale_shared,
1867 new_fresh,
1868 ]))
1869 .await;
1870 let cfg = build_catchup_cfg(&url, 2000);
1871
1872 catchup_once(&cfg, &db).await;
1873
1874 let lock = db.lock().await;
1875 let cnt: i64 = lock
1877 .0
1878 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1879 .unwrap();
1880 assert_eq!(cnt, 2, "fresh row inserted, shared kept");
1881 let shared_content: String = lock
1884 .0
1885 .query_row(
1886 "SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
1887 [],
1888 |r| r.get(0),
1889 )
1890 .unwrap();
1891 assert_ne!(
1892 shared_content, stale_shared_content,
1893 "older catchup memory must NOT overwrite newer local row"
1894 );
1895 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1899 assert_eq!(
1900 clock.entries.get("peer-0").map(String::as_str),
1901 Some("2026-04-26T10:00:02Z"),
1902 );
1903 }
1904
1905 #[tokio::test(start_paused = true)]
1908 async fn test_spawn_catchup_loop_runs_at_interval() {
1909 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1913 let cfg = build_catchup_cfg(&url, 5000);
1914 let db = build_test_db();
1915
1916 let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
1917
1918 for _ in 0..6 {
1923 tokio::time::advance(Duration::from_secs(1)).await;
1924 tokio::task::yield_now().await;
1925 }
1926 for _ in 0..50 {
1930 if hits.load(Ordering::Relaxed) >= 1 {
1931 break;
1932 }
1933 tokio::task::yield_now().await;
1934 tokio::time::advance(Duration::from_millis(10)).await;
1935 }
1936
1937 assert!(
1938 hits.load(Ordering::Relaxed) >= 1,
1939 "first catchup tick must hit the mock peer (got {})",
1940 hits.load(Ordering::Relaxed),
1941 );
1942
1943 handle.abort();
1944 }
1945
1946 #[tokio::test]
1949 async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
1950 let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1953 let cfg = build_catchup_cfg(&url, 2000);
1954 let db = build_test_db();
1955
1956 let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(crate::SECS_PER_HOUR as u64));
1957 handle.abort();
1960 let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
1961 let join = result.expect("aborted handle must resolve within 500ms");
1962 assert!(
1963 join.is_err() && join.unwrap_err().is_cancelled(),
1964 "handle.abort() must surface as is_cancelled() == true"
1965 );
1966 }
1967
1968 #[test]
1971 fn test_build_config_mtls_with_valid_files() {
1972 let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1978 .join("tests/fixtures/tls/valid_cert.pem");
1979 let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1980 .join("tests/fixtures/tls/valid_key_pkcs8.pem");
1981 assert!(cert.exists(), "missing test fixture: {cert:?}");
1983 assert!(key.exists(), "missing test fixture: {key:?}");
1984
1985 let result = FederationConfig::build(
1986 2,
1987 &["http://peer.example".to_string()],
1988 Duration::from_millis(500),
1989 Some(&cert),
1990 Some(&key),
1991 None,
1992 "ai:builder".to_string(),
1993 None,
1994 );
1995 let cfg = match result {
1996 Ok(Some(c)) => c,
1997 Ok(None) => panic!("expected Some(FederationConfig), got None"),
1998 Err(e) => panic!("expected Ok, got Err: {e}"),
1999 };
2000 assert_eq!(cfg.peer_count(), 1);
2001 }
2002
2003 #[test]
2006 fn test_build_config_mtls_with_missing_files_returns_error() {
2007 let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2013 .join("tests/fixtures/tls/valid_cert.pem");
2014 let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
2015 assert!(cert.exists(), "missing test fixture: {cert:?}");
2016
2017 let result = FederationConfig::build(
2018 2,
2019 &["http://peer.example".to_string()],
2020 Duration::from_millis(500),
2021 Some(&cert),
2022 Some(&bogus_key),
2023 None,
2024 "ai:builder".to_string(),
2025 None,
2026 );
2027 let err = match result {
2028 Ok(_) => panic!("expected client-key read error"),
2029 Err(e) => e,
2030 };
2031 let msg = format!("{err}");
2032 assert!(
2033 msg.contains("read --client-key"),
2034 "expected client-key read error, got {msg:?}"
2035 );
2036 }
2037
2038 #[tokio::test]
2072 async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
2073 let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
2074 let client = reqwest::Client::builder()
2075 .timeout(Duration::from_millis(2000))
2076 .build()
2077 .unwrap();
2078 let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
2079 let target = format!("{url}/api/v1/sync/push");
2080
2081 let outcome =
2082 post_and_classify(&client, &target, &body, "mem-x", Some("mem-x"), None, None).await;
2083 match outcome {
2084 AckOutcome::Fail(reason) => {
2085 assert!(
2086 reason.contains("first:") && reason.contains("retry:"),
2087 "expected both attempts in reason, got {reason:?}"
2088 );
2089 assert!(
2091 reason.contains("http 500"),
2092 "expected 5xx in reason, got {reason:?}"
2093 );
2094 }
2095 other => panic!("expected AckOutcome::Fail, got {other:?}"),
2096 }
2097 assert_eq!(
2098 count.load(Ordering::Relaxed),
2099 2,
2100 "first attempt + one retry = exactly two POSTs"
2101 );
2102 }
2103
2104 #[tokio::test]
2110 async fn post_and_classify_id_drift_does_not_retry() {
2111 let count = Arc::new(AtomicUsize::new(0));
2113 let cnt_clone = count.clone();
2114 let app = Router::new().route(
2115 "/api/v1/sync/push",
2116 post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
2117 let c = cnt_clone.clone();
2118 async move {
2119 c.fetch_add(1, Ordering::Relaxed);
2120 (
2121 StatusCode::OK,
2122 AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
2123 )
2124 }
2125 }),
2126 );
2127 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2128 let addr = listener.local_addr().unwrap();
2129 tokio::spawn(async move {
2130 axum::serve(listener, app).await.ok();
2131 });
2132 let url = format!("http://{addr}/api/v1/sync/push");
2133
2134 let client = reqwest::Client::builder()
2135 .timeout(Duration::from_millis(2000))
2136 .build()
2137 .unwrap();
2138 let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
2139 let outcome =
2140 post_and_classify(&client, &url, &body, "mem-x", Some("mem-x"), None, None).await;
2141 assert!(
2142 matches!(outcome, AckOutcome::IdDrift),
2143 "expected IdDrift, got {outcome:?}"
2144 );
2145 assert_eq!(
2146 count.load(Ordering::Relaxed),
2147 1,
2148 "IdDrift must NOT trigger the retry path (only one POST)"
2149 );
2150 }
2151
2152 #[tokio::test]
2158 async fn bulk_catchup_push_no_peers_is_noop() {
2159 let client = reqwest::Client::builder()
2160 .timeout(Duration::from_millis(500))
2161 .build()
2162 .unwrap();
2163 let cfg = FederationConfig {
2164 policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
2165 .unwrap(),
2166 peers: Vec::new(),
2167 client,
2168 sender_agent_id: "ai:no-peers".to_string(),
2169 api_key: None,
2170 signing_key: None,
2171 #[cfg(feature = "sal")]
2172 dlq_sink: None,
2173 };
2174 let mems = vec![sample_memory()];
2177 let errors = bulk_catchup_push(&cfg, &mems).await;
2178 assert!(
2179 errors.is_empty(),
2180 "no-peers catchup must return empty error vec immediately, got {errors:?}"
2181 );
2182 }
2183
2184 #[tokio::test]
2191 async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
2192 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2193 let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
2194 let cfg = build_config(vec![url1, url2], 2, 2000);
2195 let mems = vec![sample_memory()];
2196 let errors = bulk_catchup_push(&cfg, &mems).await;
2197 assert_eq!(
2198 errors.len(),
2199 1,
2200 "exactly one failing peer should be in errors, got {errors:?}"
2201 );
2202 let (peer_id, reason) = &errors[0];
2203 assert!(
2206 peer_id.starts_with("peer-1"),
2207 "failing peer should be peer-1, got {peer_id}"
2208 );
2209 assert!(
2210 reason.contains("http 500"),
2211 "expected http 500 reason, got {reason}"
2212 );
2213 assert_eq!(count1.load(Ordering::Relaxed), 1);
2215 assert_eq!(count2.load(Ordering::Relaxed), 1);
2216 }
2217
2218 #[tokio::test]
2223 async fn quorum_w1_local_commit_alone_is_sufficient() {
2224 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2225 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2226 let cfg = build_config(vec![url1, url2], 1, 1000);
2228 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2229 .await
2230 .unwrap();
2231 let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
2232 assert_eq!(count, 1, "W=1 quorum returns local-only count");
2233 }
2234
2235 #[test]
2240 fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
2241 let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
2242 let mut t = AckTracker::new(p3, Instant::now());
2246 t.record_local();
2247 assert!(
2249 !t.is_quorum_met(Instant::now()),
2250 "majority-of-3 needs more than local"
2251 );
2252 t.record_peer_ack("peer-a");
2253 assert!(
2254 t.is_quorum_met(Instant::now()),
2255 "local + 1 peer ack = 2 = majority of 3"
2256 );
2257
2258 let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
2259 let mut t5 = AckTracker::new(p5, Instant::now());
2260 t5.record_local();
2261 t5.record_peer_ack("a");
2262 assert!(
2263 !t5.is_quorum_met(Instant::now()),
2264 "majority-of-5 needs 3 acks"
2265 );
2266 t5.record_peer_ack("b");
2267 assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
2268 }
2269
2270 #[test]
2275 fn quorum_policy_majority_rejects_zero() {
2276 let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
2277 match err {
2278 QuorumError::InvalidPolicy { detail } => {
2279 assert!(
2280 detail.contains("n must be"),
2281 "expected n>=1 message, got {detail}"
2282 );
2283 }
2284 other => panic!("expected InvalidPolicy, got {other:?}"),
2285 }
2286 }
2287
2288 #[test]
2294 fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
2295 let result = FederationConfig::build(
2296 2,
2297 &[
2298 "http://peer.example".to_string(),
2299 "http://peer.example/".to_string(),
2300 ],
2301 Duration::from_millis(500),
2302 None,
2303 None,
2304 None,
2305 "ai:dup-test".to_string(),
2306 None,
2307 );
2308 let err = match result {
2309 Ok(_) => panic!("trailing-slash dup must be rejected"),
2310 Err(e) => e,
2311 };
2312 let msg = format!("{err}");
2313 assert!(
2314 msg.contains("duplicate peer URL"),
2315 "expected duplicate-peer error, got {msg}"
2316 );
2317 }
2318
2319 #[test]
2323 fn config_build_rejects_duplicate_peers_differing_only_in_case() {
2324 let result = FederationConfig::build(
2325 2,
2326 &[
2327 "http://Peer.Example".to_string(),
2328 "http://peer.example".to_string(),
2329 ],
2330 Duration::from_millis(500),
2331 None,
2332 None,
2333 None,
2334 "ai:dup-case-test".to_string(),
2335 None,
2336 );
2337 let err = match result {
2338 Ok(_) => panic!("case-only dup must be rejected"),
2339 Err(e) => e,
2340 };
2341 let msg = format!("{err}");
2342 assert!(
2343 msg.contains("duplicate peer URL"),
2344 "expected duplicate-peer error, got {msg}"
2345 );
2346 }
2347
2348 #[tokio::test]
2355 async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
2356 let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
2357 let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
2358 let cfg = build_config(vec![url1, url2], 2, 200);
2361 let start = Instant::now();
2362 let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
2363 let elapsed = start.elapsed();
2364 assert!(
2367 elapsed < Duration::from_secs(2),
2368 "archive_quorum must exit at deadline, took {elapsed:?}"
2369 );
2370 let err = finalise_quorum(&tracker).unwrap_err();
2371 assert!(
2372 matches!(err, QuorumError::QuorumNotMet { .. }),
2373 "expected QuorumNotMet, got {err:?}"
2374 );
2375 }
2376
2377 #[tokio::test]
2382 async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
2383 let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2387 let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2388 let cfg = build_config(vec![url1, url2], 2, 100);
2390 let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2391 .await
2392 .unwrap();
2393 tokio::time::sleep(Duration::from_millis(150)).await;
2397 let err = finalise_quorum(&tracker).unwrap_err();
2398 let payload = QuorumNotMetPayload::from_err(&err);
2399 assert_eq!(payload.error, "quorum_not_met");
2400 assert_eq!(payload.got, 1, "only local commit");
2401 assert_eq!(payload.needed, 2);
2402 assert!(
2403 payload.reason == "unreachable" || payload.reason == "timeout",
2404 "expected unreachable/timeout, got {}",
2405 payload.reason
2406 );
2407 }
2408
2409 #[tokio::test]
2415 async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
2416 let (url, hits, _, last_peer) =
2417 spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
2418 let client = reqwest::Client::builder()
2422 .timeout(Duration::from_millis(2000))
2423 .build()
2424 .unwrap();
2425 let cfg = FederationConfig {
2426 policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
2427 .unwrap(),
2428 peers: vec![PeerEndpoint {
2429 id: "peer-0".to_string(),
2430 sync_push_url: url.clone(),
2433 }],
2434 client,
2435 sender_agent_id: "ai:no-suffix".to_string(),
2436 api_key: None,
2437 signing_key: None,
2438 #[cfg(feature = "sal")]
2439 dlq_sink: None,
2440 };
2441 let db = build_test_db();
2442 catchup_once(&cfg, &db).await;
2443 assert_eq!(hits.load(Ordering::Relaxed), 1);
2445 assert_eq!(
2446 last_peer.lock().await.as_deref(),
2447 Some("ai:no-suffix"),
2448 "local agent id should be forwarded as ?peer="
2449 );
2450 }
2451
2452 #[tokio::test]
2458 async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
2459 let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
2461 let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
2463 bad.source = "made-up-source-not-in-allowlist".to_string();
2464 let mems = vec![valid.clone(), bad];
2465
2466 let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
2467 let cfg = build_catchup_cfg(&url, 2000);
2468 let db = build_test_db();
2469 catchup_once(&cfg, &db).await;
2470
2471 assert_eq!(hits.load(Ordering::Relaxed), 1);
2472 let lock = db.lock().await;
2473 let count: i64 = lock
2475 .0
2476 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2477 .unwrap();
2478 assert_eq!(count, 1, "only the valid memory should land");
2479 let title: String = lock
2480 .0
2481 .query_row(
2482 "SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
2483 [],
2484 |r| r.get(0),
2485 )
2486 .unwrap();
2487 assert_eq!(title, "ok-mem");
2488 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2493 assert_eq!(
2494 clock.entries.get("peer-0").map(String::as_str),
2495 Some("2026-04-26T10:00:00Z"),
2496 "sync_state tracks latest_ts of validate-passing rows"
2497 );
2498 }
2499
2500 #[tokio::test]
2522 async fn l11_catchup_preserves_original_agent_id_through_replication() {
2523 let mut alice_mem = catchup_memory("alice-note", "2026-05-10T10:00:00Z");
2525 alice_mem.metadata = serde_json::json!({
2526 "agent_id": "ai:alice@plan-c",
2527 "shared": "alice wrote this"
2528 });
2529
2530 let (url, hits, _, _) =
2531 spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![alice_mem.clone()])).await;
2532 let cfg = build_catchup_cfg(&url, 2000);
2533 let db = build_test_db();
2534
2535 catchup_once(&cfg, &db).await;
2536
2537 assert_eq!(hits.load(Ordering::Relaxed), 1, "catchup should hit once");
2538
2539 let lock = db.lock().await;
2541 let count: i64 = lock
2542 .0
2543 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2544 .unwrap();
2545 assert_eq!(count, 1, "alice's row must land on the receiver");
2546
2547 let (raw_metadata,): (String,) = lock
2548 .0
2549 .query_row(
2550 "SELECT metadata FROM memories WHERE title='alice-note'",
2551 [],
2552 |r| Ok((r.get(0)?,)),
2553 )
2554 .unwrap();
2555 let stored: serde_json::Value = serde_json::from_str(&raw_metadata).unwrap();
2556 assert_eq!(
2557 stored.get("agent_id").and_then(serde_json::Value::as_str),
2558 Some("ai:alice@plan-c"),
2559 "agent_id must survive federation replication verbatim — \
2560 observed rewrite to receiver identity is the L11 NHI-D \
2561 regression"
2562 );
2563 assert_eq!(
2565 stored.get("shared").and_then(serde_json::Value::as_str),
2566 Some("alice wrote this"),
2567 "sibling metadata fields must round-trip alongside agent_id"
2568 );
2569 }
2570
2571 #[test]
2576 fn ack_tracker_record_peer_ack_is_idempotent() {
2577 let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
2578 .expect("policy");
2579 let mut t = AckTracker::new(policy, Instant::now());
2580 t.record_local();
2581 t.record_peer_ack("peer-a");
2582 t.record_peer_ack("peer-a"); assert!(t.is_quorum_met(Instant::now()));
2585 t.record_peer_ack("peer-b");
2587 assert!(t.is_quorum_met(Instant::now()));
2588 }
2589
2590 #[tokio::test]
2596 async fn catchup_once_body_without_memories_key_is_skipped() {
2597 let app = Router::new().route(
2599 "/api/v1/sync/since",
2600 axum::routing::get(|| async {
2601 (
2602 StatusCode::OK,
2603 AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
2604 )
2605 }),
2606 );
2607 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2608 let addr = listener.local_addr().unwrap();
2609 tokio::spawn(async move {
2610 axum::serve(listener, app).await.ok();
2611 });
2612 let url = format!("http://{addr}");
2613 let cfg = build_catchup_cfg(&url, 2000);
2614 let db = build_test_db();
2615 catchup_once(&cfg, &db).await;
2616 let lock = db.lock().await;
2617 let count: i64 = lock
2618 .0
2619 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2620 .unwrap();
2621 assert_eq!(count, 0, "no memories key → no inserts");
2622 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2623 assert!(
2624 clock.entries.get("peer-0").is_none(),
2625 "no memories key → sync_state untouched"
2626 );
2627 }
2628
2629 #[tokio::test]
2634 async fn catchup_once_unparseable_individual_memory_is_skipped() {
2635 let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
2638 let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
2639 let app = Router::new().route(
2640 "/api/v1/sync/since",
2641 axum::routing::get(move || {
2642 let valid = valid_mem.clone();
2643 let bad = bad_mem.clone();
2644 async move {
2645 (
2646 StatusCode::OK,
2647 AxumJson(serde_json::json!({"memories": [valid, bad]})),
2648 )
2649 }
2650 }),
2651 );
2652 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2653 let addr = listener.local_addr().unwrap();
2654 tokio::spawn(async move {
2655 axum::serve(listener, app).await.ok();
2656 });
2657 let url = format!("http://{addr}");
2658 let cfg = build_catchup_cfg(&url, 2000);
2659 let db = build_test_db();
2660 catchup_once(&cfg, &db).await;
2661 let lock = db.lock().await;
2662 let count: i64 = lock
2664 .0
2665 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2666 .unwrap();
2667 assert_eq!(count, 1, "only parseable memory inserted");
2668 }
2669
2670 #[tokio::test]
2674 async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
2675 let url1 = spawn_id_drift_peer().await;
2676 let url2 = spawn_id_drift_peer().await;
2677 let cfg = build_config(vec![url1, url2], 2, 1000);
2678 let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
2679 let err = finalise_quorum(&tracker).unwrap_err();
2681 assert!(
2682 matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
2683 "expected QuorumNotMet got=1, got {err:?}"
2684 );
2685 assert_eq!(
2687 tracker.id_drift_count(),
2688 2,
2689 "both peers should be recorded as drift"
2690 );
2691 }
2692
2693 #[tokio::test]
2696 async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
2697 let url1 = spawn_id_drift_peer().await;
2698 let cfg = build_config(vec![url1], 2, 1000);
2699 let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
2700 let err = finalise_quorum(&tracker).unwrap_err();
2701 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2702 assert_eq!(tracker.id_drift_count(), 1);
2703 }
2704
2705 #[tokio::test]
2708 async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
2709 let url1 = spawn_id_drift_peer().await;
2710 let cfg = build_config(vec![url1], 2, 1000);
2711 let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
2712 let err = finalise_quorum(&tracker).unwrap_err();
2713 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2714 assert_eq!(tracker.id_drift_count(), 1);
2715 }
2716
2717 #[tokio::test]
2720 async fn link_quorum_id_drift_peer_records_drift_not_ack() {
2721 let url1 = spawn_id_drift_peer().await;
2722 let cfg = build_config(vec![url1], 2, 1000);
2723 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2724 let err = finalise_quorum(&tracker).unwrap_err();
2725 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2726 assert_eq!(tracker.id_drift_count(), 1);
2727 }
2728
2729 #[tokio::test]
2732 async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
2733 let url1 = spawn_id_drift_peer().await;
2734 let cfg = build_config(vec![url1], 2, 1000);
2735 let new_mem = sample_memory();
2736 let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
2737 .await
2738 .unwrap();
2739 let err = finalise_quorum(&tracker).unwrap_err();
2740 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2741 assert_eq!(tracker.id_drift_count(), 1);
2742 }
2743
2744 #[tokio::test]
2747 async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
2748 let url1 = spawn_id_drift_peer().await;
2749 let cfg = build_config(vec![url1], 2, 1000);
2750 let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2751 .await
2752 .unwrap();
2753 let err = finalise_quorum(&tracker).unwrap_err();
2754 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2755 assert_eq!(tracker.id_drift_count(), 1);
2756 }
2757
2758 #[tokio::test]
2761 async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
2762 let url1 = spawn_id_drift_peer().await;
2763 let cfg = build_config(vec![url1], 2, 1000);
2764 let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2765 .await
2766 .unwrap();
2767 let err = finalise_quorum(&tracker).unwrap_err();
2768 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2769 assert_eq!(tracker.id_drift_count(), 1);
2770 }
2771
2772 #[tokio::test]
2775 async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
2776 let url1 = spawn_id_drift_peer().await;
2777 let cfg = build_config(vec![url1], 2, 1000);
2778 let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2779 .await
2780 .unwrap();
2781 let err = finalise_quorum(&tracker).unwrap_err();
2782 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2783 assert_eq!(tracker.id_drift_count(), 1);
2784 }
2785
2786 #[tokio::test]
2789 async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
2790 let url1 = spawn_id_drift_peer().await;
2791 let cfg = build_config(vec![url1], 2, 1000);
2792 let namespaces = vec!["app/team".to_string()];
2793 let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2794 .await
2795 .unwrap();
2796 let err = finalise_quorum(&tracker).unwrap_err();
2797 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2798 assert_eq!(tracker.id_drift_count(), 1);
2799 }
2800
2801 #[tokio::test]
2807 async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
2808 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2809 let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2810 let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
2811 let cfg = build_config(vec![url1, url2, url3], 2, 2000);
2812 let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
2813 for _ in 0..100 {
2816 if count1.load(Ordering::Relaxed) >= 1
2817 && count2.load(Ordering::Relaxed) >= 1
2818 && count3.load(Ordering::Relaxed) >= 1
2819 {
2820 break;
2821 }
2822 tokio::time::sleep(Duration::from_millis(20)).await;
2823 }
2824 assert!(
2827 count3.load(Ordering::Relaxed) >= 1,
2828 "failing peer must be reached by the detached fanout"
2829 );
2830 }
2831
2832 #[test]
2837 fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
2838 let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
2840 .expect("policy");
2841 let now = Instant::now();
2842 let mut t = AckTracker::new(policy, now);
2843 t.record_local();
2844 let err = t.finalise(now).unwrap_err();
2846 match err {
2847 QuorumError::QuorumNotMet {
2848 got,
2849 needed,
2850 reason,
2851 } => {
2852 assert_eq!(got, 1);
2853 assert_eq!(needed, 2);
2854 assert_eq!(
2855 reason,
2856 QuorumFailureReason::InFlight,
2857 "pre-deadline insufficient-ack must classify as InFlight"
2858 );
2859 }
2860 other => panic!("expected QuorumNotMet, got {other:?}"),
2861 }
2862 }
2863
2864 #[tokio::test]
2876 async fn delete_quorum_transient_peer_failure_retried_once() {
2877 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2878 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2879 let cfg = build_config(vec![url1, url2], 2, 2000);
2880 let _tracker = broadcast_delete_quorum(&cfg, "mem-del-retry")
2881 .await
2882 .unwrap();
2883 for _ in 0..200 {
2884 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2885 break;
2886 }
2887 tokio::time::sleep(Duration::from_millis(10)).await;
2888 }
2889 assert_eq!(
2890 count2.load(Ordering::Relaxed),
2891 2,
2892 "transient failure must retry"
2893 );
2894 }
2895
2896 #[tokio::test]
2897 async fn archive_quorum_transient_peer_failure_retried_once() {
2898 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2899 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2900 let cfg = build_config(vec![url1, url2], 2, 2000);
2901 let _tracker = broadcast_archive_quorum(&cfg, "mem-arc-retry")
2902 .await
2903 .unwrap();
2904 for _ in 0..200 {
2905 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2906 break;
2907 }
2908 tokio::time::sleep(Duration::from_millis(10)).await;
2909 }
2910 assert_eq!(count2.load(Ordering::Relaxed), 2);
2911 }
2912
2913 #[tokio::test]
2914 async fn restore_quorum_transient_peer_failure_retried_once() {
2915 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2916 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2917 let cfg = build_config(vec![url1, url2], 2, 2000);
2918 let _tracker = broadcast_restore_quorum(&cfg, "mem-res-retry")
2919 .await
2920 .unwrap();
2921 for _ in 0..200 {
2922 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2923 break;
2924 }
2925 tokio::time::sleep(Duration::from_millis(10)).await;
2926 }
2927 assert_eq!(count2.load(Ordering::Relaxed), 2);
2928 }
2929
2930 #[tokio::test]
2931 async fn link_quorum_transient_peer_failure_retried_once() {
2932 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2933 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2934 let cfg = build_config(vec![url1, url2], 2, 2000);
2935 let _tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2936 for _ in 0..200 {
2937 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2938 break;
2939 }
2940 tokio::time::sleep(Duration::from_millis(10)).await;
2941 }
2942 assert_eq!(count2.load(Ordering::Relaxed), 2);
2943 }
2944
2945 #[tokio::test]
2946 async fn consolidate_quorum_transient_peer_failure_retried_once() {
2947 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2948 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2949 let cfg = build_config(vec![url1, url2], 2, 2000);
2950 let mem = sample_memory();
2951 let sources = vec!["src-1".to_string(), "src-2".to_string()];
2952 let _tracker = broadcast_consolidate_quorum(&cfg, &mem, &sources)
2953 .await
2954 .unwrap();
2955 for _ in 0..200 {
2956 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2957 break;
2958 }
2959 tokio::time::sleep(Duration::from_millis(10)).await;
2960 }
2961 assert_eq!(count2.load(Ordering::Relaxed), 2);
2962 }
2963
2964 #[tokio::test]
2965 async fn pending_quorum_transient_peer_failure_retried_once() {
2966 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2967 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2968 let cfg = build_config(vec![url1, url2], 2, 2000);
2969 let _tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2970 .await
2971 .unwrap();
2972 for _ in 0..200 {
2973 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2974 break;
2975 }
2976 tokio::time::sleep(Duration::from_millis(10)).await;
2977 }
2978 assert_eq!(count2.load(Ordering::Relaxed), 2);
2979 }
2980
2981 #[tokio::test]
2982 async fn pending_decision_quorum_transient_peer_failure_retried_once() {
2983 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2984 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2985 let cfg = build_config(vec![url1, url2], 2, 2000);
2986 let _tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2987 .await
2988 .unwrap();
2989 for _ in 0..200 {
2990 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2991 break;
2992 }
2993 tokio::time::sleep(Duration::from_millis(10)).await;
2994 }
2995 assert_eq!(count2.load(Ordering::Relaxed), 2);
2996 }
2997
2998 #[tokio::test]
2999 async fn namespace_meta_quorum_transient_peer_failure_retried_once() {
3000 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3001 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
3002 let cfg = build_config(vec![url1, url2], 2, 2000);
3003 let _tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
3004 .await
3005 .unwrap();
3006 for _ in 0..200 {
3007 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
3008 break;
3009 }
3010 tokio::time::sleep(Duration::from_millis(10)).await;
3011 }
3012 assert_eq!(count2.load(Ordering::Relaxed), 2);
3013 }
3014
3015 #[tokio::test]
3016 async fn namespace_meta_clear_quorum_transient_peer_failure_retried_once() {
3017 let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3018 let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
3019 let cfg = build_config(vec![url1, url2], 2, 2000);
3020 let namespaces = vec!["ns/x".to_string()];
3021 let _tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
3022 .await
3023 .unwrap();
3024 for _ in 0..200 {
3025 if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
3026 break;
3027 }
3028 tokio::time::sleep(Duration::from_millis(10)).await;
3029 }
3030 assert_eq!(count2.load(Ordering::Relaxed), 2);
3031 }
3032
3033 #[tokio::test]
3036 async fn delete_quorum_id_drift_does_not_count_as_ack() {
3037 let url1 = spawn_id_drift_peer().await;
3038 let url2 = spawn_id_drift_peer().await;
3039 let cfg = build_config(vec![url1, url2], 2, 1000);
3040 let tracker = broadcast_delete_quorum(&cfg, "mem-del-drift")
3041 .await
3042 .unwrap();
3043 let err = finalise_quorum(&tracker).unwrap_err();
3044 match err {
3045 QuorumError::QuorumNotMet { got, .. } => assert_eq!(got, 1),
3046 other => panic!("expected QuorumNotMet, got {other:?}"),
3047 }
3048 }
3049
3050 #[tokio::test]
3051 async fn archive_quorum_id_drift_does_not_count_as_ack() {
3052 let url1 = spawn_id_drift_peer().await;
3053 let url2 = spawn_id_drift_peer().await;
3054 let cfg = build_config(vec![url1, url2], 2, 1000);
3055 let tracker = broadcast_archive_quorum(&cfg, "mem-arc-drift")
3056 .await
3057 .unwrap();
3058 let err = finalise_quorum(&tracker).unwrap_err();
3059 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3060 }
3061
3062 #[tokio::test]
3063 async fn link_quorum_id_drift_does_not_count_as_ack() {
3064 let url1 = spawn_id_drift_peer().await;
3065 let url2 = spawn_id_drift_peer().await;
3066 let cfg = build_config(vec![url1, url2], 2, 1000);
3067 let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
3068 let err = finalise_quorum(&tracker).unwrap_err();
3069 assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3070 }
3071
3072 #[cfg(feature = "sal")]
3083 #[tokio::test]
3084 async fn catchup_once_with_store_applies_via_sal_handle() {
3085 use super::receive::catchup_once_with_store;
3086 use crate::store::MemoryStore;
3087
3088 let mem = catchup_memory("sal-applied", "2026-04-26T10:00:00Z");
3089 let (url, hits, _, _) =
3090 spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem.clone()])).await;
3091 let cfg = build_catchup_cfg(&url, 2000);
3092 let db = build_test_db();
3093 let dir = tempfile::tempdir().expect("tempdir");
3100 let store_path = dir.path().join("store.db");
3101 let store: Arc<dyn MemoryStore> = Arc::new(
3102 crate::store::sqlite::SqliteStore::open(&store_path).expect("open SqliteStore"),
3103 );
3104 catchup_once_with_store(&cfg, &db, Some(&store)).await;
3105
3106 assert_eq!(hits.load(Ordering::Relaxed), 1, "peer must be hit once");
3107 let ctx = crate::store::CallerContext::for_agent("test");
3110 let got = store
3111 .get(&ctx, &mem.id)
3112 .await
3113 .expect("SAL store should have the catchup memory");
3114 assert_eq!(got.title, "sal-applied");
3115
3116 let lock = db.lock().await;
3120 let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3121 assert_eq!(
3122 clock.entries.get("peer-0").map(String::as_str),
3123 Some("2026-04-26T10:00:00Z"),
3124 );
3125 }
3126
3127 #[cfg(feature = "sal")]
3132 #[tokio::test]
3133 async fn catchup_once_with_store_none_uses_legacy_rusqlite() {
3134 use super::receive::catchup_once_with_store;
3135 let mem = catchup_memory("legacy-applied", "2026-04-26T10:00:00Z");
3136 let (url, hits, _, _) =
3137 spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem])).await;
3138 let cfg = build_catchup_cfg(&url, 2000);
3139 let db = build_test_db();
3140 catchup_once_with_store(&cfg, &db, None).await;
3141 assert_eq!(hits.load(Ordering::Relaxed), 1);
3142 let lock = db.lock().await;
3143 let count: i64 = lock
3144 .0
3145 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3146 .unwrap();
3147 assert_eq!(count, 1, "legacy path must insert the row locally");
3148 }
3149
3150 #[cfg(feature = "sal")]
3154 #[tokio::test]
3155 async fn catchup_once_with_store_skips_invalid_memory_via_sal_path() {
3156 use super::receive::catchup_once_with_store;
3157 let valid = catchup_memory("sal-valid", "2026-04-26T10:00:00Z");
3158 let mut bad = catchup_memory("sal-bad", "2026-04-26T10:00:01Z");
3159 bad.source = "not-in-allowlist".to_string();
3160 let mems = vec![valid.clone(), bad];
3161
3162 let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
3163 let cfg = build_catchup_cfg(&url, 2000);
3164 let db = build_test_db();
3165 let dir = tempfile::tempdir().expect("tempdir");
3166 let store: Arc<dyn crate::store::MemoryStore> = Arc::new(
3167 crate::store::sqlite::SqliteStore::open(dir.path().join("store.db"))
3168 .expect("open SqliteStore"),
3169 );
3170 catchup_once_with_store(&cfg, &db, Some(&store)).await;
3171 let ctx = crate::store::CallerContext::for_agent("test");
3173 assert!(
3174 store.get(&ctx, &valid.id).await.is_ok(),
3175 "valid memory must land via SAL store"
3176 );
3177 }
3178}