1use crate::models::field_names;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::Mutex;
12use tokio::task::JoinSet;
13
14use crate::federation::identity::chain::CHAIN_HEADER;
15use crate::federation::identity::credential::CREDENTIAL_HEADER;
16use crate::federation::identity::outbound;
17use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
18use crate::replication::{AckTracker, QuorumError};
19
20use super::FederationConfig;
21
22const TRACKER_ARC_STILL_REFERENCED: &str = "tracker arc still referenced at finalise";
26
27#[derive(Debug)]
28pub(super) enum AckOutcome {
29 Ack,
30 IdDrift,
31 Fail(String),
32}
33
34pub(super) async fn post_once(
45 client: &reqwest::Client,
46 url: &str,
47 body: &serde_json::Value,
48 expected_id: &str,
49 idempotency_key: Option<&str>,
50 api_key: Option<&str>,
51 signing_key: Option<&ed25519_dalek::SigningKey>,
52) -> AckOutcome {
53 let host = reqwest::Url::parse(url)
69 .ok()
70 .and_then(|u| u.host_str().map(str::to_string))
71 .unwrap_or_else(|| url.to_string());
72 let scheme = reqwest::Url::parse(url)
73 .ok()
74 .map(|u| u.scheme().to_string())
75 .unwrap_or_default();
76 let net_action = crate::governance::agent_action::AgentAction::NetworkRequest {
77 host: host.clone(),
78 scheme,
79 };
80 if let Err(refusal) = crate::governance::wire_check::check(&net_action) {
81 return AckOutcome::Fail(format!(
82 "governance refused outbound to {host}: {}",
83 refusal.reason
84 ));
85 }
86 let body_bytes = match serde_json::to_vec(body) {
92 Ok(b) => b,
93 Err(e) => {
94 return AckOutcome::Fail(format!("serialise body: {e}"));
95 }
96 };
97 let mut req = client
98 .post(url)
99 .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
100 .body(body_bytes.clone());
101 if let Some(key) = idempotency_key {
102 req = req.header("Idempotency-Key", key);
103 }
104 if let Some(key) = api_key {
110 req = req.header(crate::HEADER_API_KEY, key);
111 }
112 if let Some(sk) = signing_key {
115 let nonce = uuid::Uuid::new_v4().to_string();
116 let sig_header =
117 crate::federation::signing::sign_body_with_nonce_header(sk, &body_bytes, &nonce);
118 req = req
119 .header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
120 .header(crate::federation::signing::NONCE_HEADER, nonce);
121 }
122 if let Some(peer_id) = body
132 .get(field_names::SENDER_AGENT_ID)
133 .and_then(|v| v.as_str())
134 .filter(|s| !s.is_empty())
135 {
136 req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
137 }
138 if let Some(cred) = outbound::current() {
146 match cred.to_header_value() {
147 Ok(value) => req = req.header(CREDENTIAL_HEADER, value),
148 Err(e) => {
149 tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
150 "failed to encode outbound federation credential header; omitting");
151 }
152 }
153 let intermediates = outbound::current_intermediates();
159 match crate::federation::identity::chain::intermediates_to_header_value(&intermediates) {
160 Ok(Some(value)) => req = req.header(CHAIN_HEADER, value),
161 Ok(None) => {}
162 Err(e) => {
163 tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
164 "failed to encode outbound federation chain header; omitting");
165 }
166 }
167 }
168 match req.send().await {
169 Ok(resp) if resp.status().is_success() => {
170 match resp.json::<serde_json::Value>().await {
171 Ok(v) => {
172 if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
177 && !ids.is_empty()
178 && !ids.iter().any(|x| x.as_str() == Some(expected_id))
179 {
180 return AckOutcome::IdDrift;
181 }
182 AckOutcome::Ack
183 }
184 Err(_) => AckOutcome::Ack, }
186 }
187 Ok(resp) => {
188 let status = resp.status();
198 let _ = resp.bytes().await;
199 AckOutcome::Fail(format!("http {status}"))
200 }
201 Err(e) => AckOutcome::Fail(crate::errors::msg::network(e)),
202 }
203}
204
205pub(super) const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
210
211pub(super) async fn post_and_classify(
233 client: &reqwest::Client,
234 url: &str,
235 body: &serde_json::Value,
236 expected_id: &str,
237 idempotency_key: Option<&str>,
238 api_key: Option<&str>,
239 signing_key: Option<&ed25519_dalek::SigningKey>,
240) -> AckOutcome {
241 match post_once(
242 client,
243 url,
244 body,
245 expected_id,
246 idempotency_key,
247 api_key,
248 signing_key,
249 )
250 .await
251 {
252 AckOutcome::Ack => AckOutcome::Ack,
253 AckOutcome::IdDrift => AckOutcome::IdDrift,
254 AckOutcome::Fail(first_reason) => {
255 tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
256 match post_once(
257 client,
258 url,
259 body,
260 expected_id,
261 idempotency_key,
262 api_key,
263 signing_key,
264 )
265 .await
266 {
267 AckOutcome::Ack => {
268 tracing::debug!(
269 "federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
270 );
271 crate::metrics::registry()
272 .federation_fanout_retry_total
273 .with_label_values(&["ok"])
274 .inc();
275 AckOutcome::Ack
276 }
277 AckOutcome::IdDrift => {
278 crate::metrics::registry()
279 .federation_fanout_retry_total
280 .with_label_values(&["id_drift"])
281 .inc();
282 AckOutcome::IdDrift
283 }
284 AckOutcome::Fail(retry_reason) => {
285 crate::metrics::registry()
286 .federation_fanout_retry_total
287 .with_label_values(&["fail"])
288 .inc();
289 AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
290 }
291 }
292 }
293 }
294}
295
296pub async fn broadcast_store_quorum(
305 config: &FederationConfig,
306 mem: &Memory,
307) -> Result<AckTracker, QuorumError> {
308 broadcast_store_quorum_with_embedding(config, mem, None).await
309}
310
311pub async fn broadcast_store_quorum_with_embedding(
326 config: &FederationConfig,
327 mem: &Memory,
328 shipped: Option<&super::ShippedEmbedding>,
329) -> Result<AckTracker, QuorumError> {
330 tracing::info!(
344 target: super::SYNC_TRACE_TARGET,
345 memory_id = %mem.id,
346 namespace = %mem.namespace,
347 peer_count = config.peers.len(),
348 quorum_w = config.policy.w,
349 "federation::broadcast: store {} -> {} peer(s) (quorum W={})",
350 mem.id,
351 config.peers.len(),
352 config.policy.w,
353 );
354 let now = Instant::now();
355 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
356 tracker.lock().await.record_local();
357
358 let mut body = serde_json::json!({
359 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
360 "memories": [mem],
361 "dry_run": false,
362 });
363 if let Some(se) = shipped {
367 body[field_names::EMBEDDINGS] = serde_json::json!([se]);
368 }
369
370 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
371 #[cfg(feature = "sal")]
378 let dispatched_peer_ids: Vec<String> = config.peers.iter().map(|p| p.id.clone()).collect();
379 for peer in &config.peers {
380 let client = config.client.clone();
381 let url = peer.sync_push_url.clone();
382 let id = peer.id.clone();
383 let mem_id = mem.id.clone();
384 let payload = body.clone();
385 let api_key = config.api_key.clone();
386 let signing_key = config.signing_key.clone();
387 joins.spawn(async move {
388 let outcome = post_and_classify(
389 &client,
390 &url,
391 &payload,
392 &mem_id,
393 Some(&mem_id),
394 api_key.as_deref(),
395 signing_key.as_deref(),
396 )
397 .await;
398 (id, outcome)
399 });
400 }
401
402 #[cfg(feature = "sal")]
409 let mut explicit_failures: Vec<(String, String)> = Vec::new();
410
411 let deadline = now + config.policy.ack_timeout;
417 loop {
418 let remaining = deadline.saturating_duration_since(Instant::now());
419 if remaining.is_zero() {
420 break;
421 }
422 match tokio::time::timeout(remaining, joins.join_next()).await {
423 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
424 tracker.lock().await.record_peer_ack(peer_id);
425 }
426 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
427 tracker.lock().await.record_id_drift(peer_id);
428 }
429 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
430 tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
431 #[cfg(feature = "sal")]
432 explicit_failures.push((peer_id.clone(), reason.clone()));
433 #[cfg(not(feature = "sal"))]
434 {
435 let _ = (peer_id, reason);
436 }
437 }
438 Ok(Some(Err(e))) => {
439 tracing::warn!("federation: peer join error: {e}");
440 }
441 Ok(None) | Err(_) => break, }
443 if tracker.lock().await.is_quorum_met(Instant::now()) {
446 break;
447 }
448 }
449
450 if !joins.is_empty() {
467 let mem_id = mem.id.clone();
472 tokio::spawn(async move {
473 while let Some(res) = joins.join_next().await {
474 match res {
475 Ok((peer_id, AckOutcome::Ack)) => {
476 tracing::debug!("federation: post-quorum ack from {peer_id}");
477 }
478 Ok((peer_id, AckOutcome::IdDrift)) => {
479 tracing::warn!(
480 "federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
481 );
482 crate::metrics::registry()
483 .federation_fanout_dropped_total
484 .with_label_values(&["id_drift"])
485 .inc();
486 }
487 Ok((peer_id, AckOutcome::Fail(reason))) => {
488 tracing::warn!(
489 "federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
490 );
491 crate::metrics::registry()
492 .federation_fanout_dropped_total
493 .with_label_values(&["peer_fail"])
494 .inc();
495 }
496 Err(e) => {
497 tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
498 crate::metrics::registry()
499 .federation_fanout_dropped_total
500 .with_label_values(&["join_error"])
501 .inc();
502 }
503 }
504 }
505 });
506 }
507
508 let tracker = Arc::try_unwrap(tracker)
509 .map_err(|_| QuorumError::LocalWriteFailed {
510 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
511 })?
512 .into_inner();
513 if tracker.finalise(Instant::now()).is_ok() {
521 let acked = tracker.acked_peer_ids();
522 let mut missing: Vec<String> = config
523 .peers
524 .iter()
525 .filter(|p| !acked.contains(&p.id))
526 .map(|p| p.sync_push_url.clone())
527 .collect();
528 if !missing.is_empty() {
529 missing.sort();
530 tracing::warn!(
531 memory_id = %mem.id,
532 n_missing = missing.len(),
533 peer_urls = ?missing,
534 "federation: quorum met but {} peer(s) did not ack: {:?}",
535 missing.len(),
536 missing,
537 );
538 crate::metrics::registry()
539 .federation_partial_quorum_total
540 .inc();
541 }
542 }
543
544 #[cfg(feature = "sal")]
561 if let Some(sink) = config.dlq_sink.as_ref() {
562 let acked = tracker.acked_peer_ids();
563 let explicit_map: std::collections::HashMap<String, String> =
564 explicit_failures.into_iter().collect();
565 for peer_id in &dispatched_peer_ids {
566 if acked.contains(peer_id) {
567 continue;
568 }
569 let reason = explicit_map
570 .get(peer_id)
571 .cloned()
572 .unwrap_or_else(|| "deadline_exceeded".to_string());
573 if let Err(e) = sink
574 .enqueue_push_failure(&mem.id, peer_id, &body, &reason)
575 .await
576 {
577 tracing::warn!(
578 target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
579 memory_id = %mem.id,
580 peer_id = %peer_id,
581 "federation: failed to enqueue push-failure DLQ row \
582 for peer {peer_id} on memory {}: {e}",
583 mem.id,
584 );
585 } else {
586 tracing::info!(
587 target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
588 memory_id = %mem.id,
589 peer_id = %peer_id,
590 reason = %reason,
591 "federation: enqueued push-failure DLQ row for peer {peer_id} \
592 on memory {} (reason: {reason})",
593 mem.id,
594 );
595 }
596 }
597 }
598 Ok(tracker)
599}
600
601pub async fn broadcast_delete_quorum(
612 config: &FederationConfig,
613 id: &str,
614) -> Result<AckTracker, QuorumError> {
615 let now = Instant::now();
616 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
617 tracker.lock().await.record_local();
618
619 let body = serde_json::json!({
620 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
621 "memories": [],
622 "deletions": [id],
623 "dry_run": false,
624 });
625
626 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
627 for peer in &config.peers {
628 let client = config.client.clone();
629 let url = peer.sync_push_url.clone();
630 let peer_id = peer.id.clone();
631 let payload = body.clone();
632 let target_id = id.to_string();
633 let api_key = config.api_key.clone();
634 let signing_key = config.signing_key.clone();
635 joins.spawn(async move {
636 let outcome = post_and_classify(
637 &client,
638 &url,
639 &payload,
640 &target_id,
641 Some(&target_id),
642 api_key.as_deref(),
643 signing_key.as_deref(),
644 )
645 .await;
646 (peer_id, outcome)
647 });
648 }
649
650 let deadline = now + config.policy.ack_timeout;
651 loop {
652 let remaining = deadline.saturating_duration_since(Instant::now());
653 if remaining.is_zero() {
654 break;
655 }
656 match tokio::time::timeout(remaining, joins.join_next()).await {
657 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
658 tracker.lock().await.record_peer_ack(peer_id);
659 }
660 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
661 tracker.lock().await.record_id_drift(peer_id);
662 }
663 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
664 tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
665 }
666 Ok(Some(Err(e))) => {
667 tracing::warn!("federation: delete peer join error: {e}");
668 }
669 Ok(None) | Err(_) => break,
670 }
671 if tracker.lock().await.is_quorum_met(Instant::now()) {
672 break;
673 }
674 }
675
676 if !joins.is_empty() {
677 tokio::spawn(async move {
678 while let Some(res) = joins.join_next().await {
679 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
680 tracing::debug!(
681 "federation: post-quorum delete peer {peer_id} did not ack: {reason}"
682 );
683 }
684 }
685 });
686 }
687
688 let tracker = Arc::try_unwrap(tracker)
689 .map_err(|_| QuorumError::LocalWriteFailed {
690 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
691 })?
692 .into_inner();
693 Ok(tracker)
694}
695
696pub async fn broadcast_archive_quorum(
710 config: &FederationConfig,
711 id: &str,
712) -> Result<AckTracker, QuorumError> {
713 let now = Instant::now();
714 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
715 tracker.lock().await.record_local();
716
717 let body = serde_json::json!({
718 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
719 "memories": [],
720 "archives": [id],
721 "dry_run": false,
722 });
723
724 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
725 for peer in &config.peers {
726 let client = config.client.clone();
727 let url = peer.sync_push_url.clone();
728 let peer_id = peer.id.clone();
729 let payload = body.clone();
730 let target_id = id.to_string();
731 let api_key = config.api_key.clone();
732 let signing_key = config.signing_key.clone();
733 joins.spawn(async move {
734 let outcome = post_and_classify(
735 &client,
736 &url,
737 &payload,
738 &target_id,
739 Some(&target_id),
740 api_key.as_deref(),
741 signing_key.as_deref(),
742 )
743 .await;
744 (peer_id, outcome)
745 });
746 }
747
748 let deadline = now + config.policy.ack_timeout;
749 loop {
750 let remaining = deadline.saturating_duration_since(Instant::now());
751 if remaining.is_zero() {
752 break;
753 }
754 match tokio::time::timeout(remaining, joins.join_next()).await {
755 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
756 tracker.lock().await.record_peer_ack(peer_id);
757 }
758 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
759 tracker.lock().await.record_id_drift(peer_id);
760 }
761 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
762 tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
763 }
764 Ok(Some(Err(e))) => {
765 tracing::warn!("federation: archive peer join error: {e}");
766 }
767 Ok(None) | Err(_) => break,
768 }
769 if tracker.lock().await.is_quorum_met(Instant::now()) {
770 break;
771 }
772 }
773
774 if !joins.is_empty() {
775 tokio::spawn(async move {
776 while let Some(res) = joins.join_next().await {
777 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
778 tracing::debug!(
779 "federation: post-quorum archive peer {peer_id} did not ack: {reason}"
780 );
781 }
782 }
783 });
784 }
785
786 let tracker = Arc::try_unwrap(tracker)
787 .map_err(|_| QuorumError::LocalWriteFailed {
788 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
789 })?
790 .into_inner();
791 Ok(tracker)
792}
793
794pub async fn broadcast_restore_quorum(
809 config: &FederationConfig,
810 id: &str,
811) -> Result<AckTracker, QuorumError> {
812 let now = Instant::now();
813 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
814 tracker.lock().await.record_local();
815
816 let body = serde_json::json!({
817 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
818 "memories": [],
819 "restores": [id],
820 "dry_run": false,
821 });
822
823 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
824 for peer in &config.peers {
825 let client = config.client.clone();
826 let url = peer.sync_push_url.clone();
827 let peer_id = peer.id.clone();
828 let payload = body.clone();
829 let target_id = id.to_string();
830 let api_key = config.api_key.clone();
831 let signing_key = config.signing_key.clone();
832 joins.spawn(async move {
833 let outcome = post_and_classify(
834 &client,
835 &url,
836 &payload,
837 &target_id,
838 Some(&target_id),
839 api_key.as_deref(),
840 signing_key.as_deref(),
841 )
842 .await;
843 (peer_id, outcome)
844 });
845 }
846
847 let deadline = now + config.policy.ack_timeout;
848 loop {
849 let remaining = deadline.saturating_duration_since(Instant::now());
850 if remaining.is_zero() {
851 break;
852 }
853 match tokio::time::timeout(remaining, joins.join_next()).await {
854 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
855 tracker.lock().await.record_peer_ack(peer_id);
856 }
857 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
858 tracker.lock().await.record_id_drift(peer_id);
859 }
860 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
861 tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
862 }
863 Ok(Some(Err(e))) => {
864 tracing::warn!("federation: restore peer join error: {e}");
865 }
866 Ok(None) | Err(_) => break,
867 }
868 if tracker.lock().await.is_quorum_met(Instant::now()) {
869 break;
870 }
871 }
872
873 if !joins.is_empty() {
874 tokio::spawn(async move {
875 while let Some(res) = joins.join_next().await {
876 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
877 tracing::debug!(
878 "federation: post-quorum restore peer {peer_id} did not ack: {reason}"
879 );
880 }
881 }
882 });
883 }
884
885 let tracker = Arc::try_unwrap(tracker)
886 .map_err(|_| QuorumError::LocalWriteFailed {
887 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
888 })?
889 .into_inner();
890 Ok(tracker)
891}
892
893pub async fn broadcast_link_quorum(
902 config: &FederationConfig,
903 link: &MemoryLink,
904) -> Result<AckTracker, QuorumError> {
905 let now = Instant::now();
906 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
907 tracker.lock().await.record_local();
908
909 let body = serde_json::json!({
910 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
911 "memories": [],
912 "links": [link],
913 "dry_run": false,
914 });
915 let log_id = format!("{}→{}", link.source_id, link.target_id);
916
917 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
918 for peer in &config.peers {
919 let client = config.client.clone();
920 let url = peer.sync_push_url.clone();
921 let peer_id = peer.id.clone();
922 let payload = body.clone();
923 let log_id = log_id.clone();
924 let api_key = config.api_key.clone();
925 let signing_key = config.signing_key.clone();
926 joins.spawn(async move {
927 let outcome = post_and_classify(
928 &client,
929 &url,
930 &payload,
931 &log_id,
932 Some(&log_id),
933 api_key.as_deref(),
934 signing_key.as_deref(),
935 )
936 .await;
937 (peer_id, outcome)
938 });
939 }
940
941 let deadline = now + config.policy.ack_timeout;
942 loop {
943 let remaining = deadline.saturating_duration_since(Instant::now());
944 if remaining.is_zero() {
945 break;
946 }
947 match tokio::time::timeout(remaining, joins.join_next()).await {
948 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
949 tracker.lock().await.record_peer_ack(peer_id);
950 }
951 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
952 tracker.lock().await.record_id_drift(peer_id);
953 }
954 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
955 tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
956 }
957 Ok(Some(Err(e))) => {
958 tracing::warn!("federation: link peer join error: {e}");
959 }
960 Ok(None) | Err(_) => break,
961 }
962 if tracker.lock().await.is_quorum_met(Instant::now()) {
963 break;
964 }
965 }
966
967 if !joins.is_empty() {
968 tokio::spawn(async move {
969 while let Some(res) = joins.join_next().await {
970 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
971 tracing::debug!(
972 "federation: post-quorum link peer {peer_id} did not ack: {reason}"
973 );
974 }
975 }
976 });
977 }
978
979 let tracker = Arc::try_unwrap(tracker)
980 .map_err(|_| QuorumError::LocalWriteFailed {
981 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
982 })?
983 .into_inner();
984 Ok(tracker)
985}
986
987pub async fn broadcast_consolidate_quorum(
996 config: &FederationConfig,
997 new_mem: &Memory,
998 source_ids: &[String],
999) -> Result<AckTracker, QuorumError> {
1000 let now = Instant::now();
1001 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1002 tracker.lock().await.record_local();
1003
1004 let body = serde_json::json!({
1005 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1006 "memories": [new_mem],
1007 "deletions": source_ids,
1008 "dry_run": false,
1009 });
1010
1011 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1012 for peer in &config.peers {
1013 let client = config.client.clone();
1014 let url = peer.sync_push_url.clone();
1015 let peer_id = peer.id.clone();
1016 let payload = body.clone();
1017 let target_id = new_mem.id.clone();
1018 let api_key = config.api_key.clone();
1019 let signing_key = config.signing_key.clone();
1020 joins.spawn(async move {
1021 let outcome = post_and_classify(
1022 &client,
1023 &url,
1024 &payload,
1025 &target_id,
1026 Some(&target_id),
1027 api_key.as_deref(),
1028 signing_key.as_deref(),
1029 )
1030 .await;
1031 (peer_id, outcome)
1032 });
1033 }
1034
1035 let deadline = now + config.policy.ack_timeout;
1036 loop {
1037 let remaining = deadline.saturating_duration_since(Instant::now());
1038 if remaining.is_zero() {
1039 break;
1040 }
1041 match tokio::time::timeout(remaining, joins.join_next()).await {
1042 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1043 tracker.lock().await.record_peer_ack(peer_id);
1044 }
1045 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1046 tracker.lock().await.record_id_drift(peer_id);
1047 }
1048 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1049 tracing::warn!(
1050 "federation: consolidate peer {peer_id} failed for {}: {reason}",
1051 new_mem.id
1052 );
1053 }
1054 Ok(Some(Err(e))) => {
1055 tracing::warn!("federation: consolidate peer join error: {e}");
1056 }
1057 Ok(None) | Err(_) => break,
1058 }
1059 if tracker.lock().await.is_quorum_met(Instant::now()) {
1060 break;
1061 }
1062 }
1063
1064 if !joins.is_empty() {
1065 tokio::spawn(async move {
1066 while let Some(res) = joins.join_next().await {
1067 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1068 tracing::debug!(
1069 "federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
1070 );
1071 }
1072 }
1073 });
1074 }
1075
1076 let tracker = Arc::try_unwrap(tracker)
1077 .map_err(|_| QuorumError::LocalWriteFailed {
1078 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1079 })?
1080 .into_inner();
1081 Ok(tracker)
1082}
1083
1084pub async fn broadcast_pending_quorum(
1096 config: &FederationConfig,
1097 pending: &PendingAction,
1098) -> Result<AckTracker, QuorumError> {
1099 let now = Instant::now();
1100 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1101 tracker.lock().await.record_local();
1102
1103 let body = serde_json::json!({
1104 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1105 "memories": [],
1106 "pendings": [pending],
1107 "dry_run": false,
1108 });
1109
1110 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1111 for peer in &config.peers {
1112 let client = config.client.clone();
1113 let url = peer.sync_push_url.clone();
1114 let peer_id = peer.id.clone();
1115 let payload = body.clone();
1116 let target_id = pending.id.clone();
1117 let api_key = config.api_key.clone();
1118 let signing_key = config.signing_key.clone();
1119 joins.spawn(async move {
1120 let outcome = post_and_classify(
1121 &client,
1122 &url,
1123 &payload,
1124 &target_id,
1125 Some(&target_id),
1126 api_key.as_deref(),
1127 signing_key.as_deref(),
1128 )
1129 .await;
1130 (peer_id, outcome)
1131 });
1132 }
1133
1134 let deadline = now + config.policy.ack_timeout;
1135 loop {
1136 let remaining = deadline.saturating_duration_since(Instant::now());
1137 if remaining.is_zero() {
1138 break;
1139 }
1140 match tokio::time::timeout(remaining, joins.join_next()).await {
1141 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1142 tracker.lock().await.record_peer_ack(peer_id);
1143 }
1144 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1145 tracker.lock().await.record_id_drift(peer_id);
1146 }
1147 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1148 tracing::warn!(
1149 "federation: pending peer {peer_id} failed for {}: {reason}",
1150 pending.id
1151 );
1152 }
1153 Ok(Some(Err(e))) => {
1154 tracing::warn!("federation: pending peer join error: {e}");
1155 }
1156 Ok(None) | Err(_) => break,
1157 }
1158 if tracker.lock().await.is_quorum_met(Instant::now()) {
1159 break;
1160 }
1161 }
1162
1163 if !joins.is_empty() {
1164 tokio::spawn(async move {
1165 while let Some(res) = joins.join_next().await {
1166 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1167 tracing::debug!(
1168 "federation: post-quorum pending peer {peer_id} did not ack: {reason}"
1169 );
1170 }
1171 }
1172 });
1173 }
1174
1175 let tracker = Arc::try_unwrap(tracker)
1176 .map_err(|_| QuorumError::LocalWriteFailed {
1177 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1178 })?
1179 .into_inner();
1180 Ok(tracker)
1181}
1182
1183pub async fn broadcast_pending_decision_quorum(
1194 config: &FederationConfig,
1195 decision: &PendingDecision,
1196) -> Result<AckTracker, QuorumError> {
1197 let now = Instant::now();
1198 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1199 tracker.lock().await.record_local();
1200
1201 let body = serde_json::json!({
1202 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1203 "memories": [],
1204 "pending_decisions": [decision],
1205 "dry_run": false,
1206 });
1207
1208 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1209 for peer in &config.peers {
1210 let client = config.client.clone();
1211 let url = peer.sync_push_url.clone();
1212 let peer_id = peer.id.clone();
1213 let payload = body.clone();
1214 let target_id = decision.id.clone();
1215 let api_key = config.api_key.clone();
1216 let signing_key = config.signing_key.clone();
1217 joins.spawn(async move {
1218 let outcome = post_and_classify(
1219 &client,
1220 &url,
1221 &payload,
1222 &target_id,
1223 Some(&target_id),
1224 api_key.as_deref(),
1225 signing_key.as_deref(),
1226 )
1227 .await;
1228 (peer_id, outcome)
1229 });
1230 }
1231
1232 let deadline = now + config.policy.ack_timeout;
1233 loop {
1234 let remaining = deadline.saturating_duration_since(Instant::now());
1235 if remaining.is_zero() {
1236 break;
1237 }
1238 match tokio::time::timeout(remaining, joins.join_next()).await {
1239 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1240 tracker.lock().await.record_peer_ack(peer_id);
1241 }
1242 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1243 tracker.lock().await.record_id_drift(peer_id);
1244 }
1245 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1246 tracing::warn!(
1247 "federation: pending-decision peer {peer_id} failed for {}: {reason}",
1248 decision.id
1249 );
1250 }
1251 Ok(Some(Err(e))) => {
1252 tracing::warn!("federation: pending-decision peer join error: {e}");
1253 }
1254 Ok(None) | Err(_) => break,
1255 }
1256 if tracker.lock().await.is_quorum_met(Instant::now()) {
1257 break;
1258 }
1259 }
1260
1261 if !joins.is_empty() {
1262 tokio::spawn(async move {
1263 while let Some(res) = joins.join_next().await {
1264 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1265 tracing::debug!(
1266 "federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
1267 );
1268 }
1269 }
1270 });
1271 }
1272
1273 let tracker = Arc::try_unwrap(tracker)
1274 .map_err(|_| QuorumError::LocalWriteFailed {
1275 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1276 })?
1277 .into_inner();
1278 Ok(tracker)
1279}
1280
1281pub async fn broadcast_namespace_meta_quorum(
1293 config: &FederationConfig,
1294 entry: &NamespaceMetaEntry,
1295) -> Result<AckTracker, QuorumError> {
1296 let now = Instant::now();
1297 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1298 tracker.lock().await.record_local();
1299
1300 let body = serde_json::json!({
1301 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1302 "memories": [],
1303 "namespace_meta": [entry],
1304 "dry_run": false,
1305 });
1306
1307 let target_id = entry.namespace.clone();
1308 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1309 for peer in &config.peers {
1310 let client = config.client.clone();
1311 let url = peer.sync_push_url.clone();
1312 let peer_id = peer.id.clone();
1313 let payload = body.clone();
1314 let target = target_id.clone();
1315 let api_key = config.api_key.clone();
1316 let signing_key = config.signing_key.clone();
1317 joins.spawn(async move {
1318 let outcome = post_and_classify(
1319 &client,
1320 &url,
1321 &payload,
1322 &target,
1323 Some(&target),
1324 api_key.as_deref(),
1325 signing_key.as_deref(),
1326 )
1327 .await;
1328 (peer_id, outcome)
1329 });
1330 }
1331
1332 let deadline = now + config.policy.ack_timeout;
1333 loop {
1334 let remaining = deadline.saturating_duration_since(Instant::now());
1335 if remaining.is_zero() {
1336 break;
1337 }
1338 match tokio::time::timeout(remaining, joins.join_next()).await {
1339 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1340 tracker.lock().await.record_peer_ack(peer_id);
1341 }
1342 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1343 tracker.lock().await.record_id_drift(peer_id);
1344 }
1345 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1346 tracing::warn!(
1347 "federation: namespace_meta peer {peer_id} failed for {}: {reason}",
1348 entry.namespace
1349 );
1350 }
1351 Ok(Some(Err(e))) => {
1352 tracing::warn!("federation: namespace_meta peer join error: {e}");
1353 }
1354 Ok(None) | Err(_) => break,
1355 }
1356 if tracker.lock().await.is_quorum_met(Instant::now()) {
1357 break;
1358 }
1359 }
1360
1361 if !joins.is_empty() {
1362 tokio::spawn(async move {
1363 while let Some(res) = joins.join_next().await {
1364 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1365 tracing::debug!(
1366 "federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
1367 );
1368 }
1369 }
1370 });
1371 }
1372
1373 let tracker = Arc::try_unwrap(tracker)
1374 .map_err(|_| QuorumError::LocalWriteFailed {
1375 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1376 })?
1377 .into_inner();
1378 Ok(tracker)
1379}
1380
1381pub async fn broadcast_namespace_meta_clear_quorum(
1395 config: &FederationConfig,
1396 namespaces: &[String],
1397) -> Result<AckTracker, QuorumError> {
1398 let now = Instant::now();
1399 let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1400 tracker.lock().await.record_local();
1401
1402 let body = serde_json::json!({
1403 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1404 "memories": [],
1405 "namespace_meta_clears": namespaces,
1406 "dry_run": false,
1407 });
1408
1409 let target_id = namespaces.join(",");
1412 let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1413 for peer in &config.peers {
1414 let client = config.client.clone();
1415 let url = peer.sync_push_url.clone();
1416 let peer_id = peer.id.clone();
1417 let payload = body.clone();
1418 let target = target_id.clone();
1419 let api_key = config.api_key.clone();
1420 let signing_key = config.signing_key.clone();
1421 joins.spawn(async move {
1422 let outcome = post_and_classify(
1423 &client,
1424 &url,
1425 &payload,
1426 &target,
1427 Some(&target),
1428 api_key.as_deref(),
1429 signing_key.as_deref(),
1430 )
1431 .await;
1432 (peer_id, outcome)
1433 });
1434 }
1435
1436 let deadline = now + config.policy.ack_timeout;
1437 loop {
1438 let remaining = deadline.saturating_duration_since(Instant::now());
1439 if remaining.is_zero() {
1440 break;
1441 }
1442 match tokio::time::timeout(remaining, joins.join_next()).await {
1443 Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1444 tracker.lock().await.record_peer_ack(peer_id);
1445 }
1446 Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1447 tracker.lock().await.record_id_drift(peer_id);
1448 }
1449 Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1450 tracing::warn!(
1451 "federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
1452 target_id
1453 );
1454 }
1455 Ok(Some(Err(e))) => {
1456 tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
1457 }
1458 Ok(None) | Err(_) => break,
1459 }
1460 if tracker.lock().await.is_quorum_met(Instant::now()) {
1461 break;
1462 }
1463 }
1464
1465 if !joins.is_empty() {
1466 tokio::spawn(async move {
1467 while let Some(res) = joins.join_next().await {
1468 if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1469 tracing::debug!(
1470 "federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
1471 );
1472 }
1473 }
1474 });
1475 }
1476
1477 let tracker = Arc::try_unwrap(tracker)
1478 .map_err(|_| QuorumError::LocalWriteFailed {
1479 detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1480 })?
1481 .into_inner();
1482 Ok(tracker)
1483}
1484
1485pub async fn bulk_catchup_push(
1520 config: &FederationConfig,
1521 memories: &[Memory],
1522) -> Vec<(String, String)> {
1523 if memories.is_empty() || config.peers.is_empty() {
1524 return Vec::new();
1525 }
1526 let body = serde_json::json!({
1527 (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1528 "memories": memories,
1529 "dry_run": false,
1530 });
1531 let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
1532 for peer in &config.peers {
1533 let client = config.client.clone();
1534 let url = peer.sync_push_url.clone();
1535 let id = peer.id.clone();
1536 let payload = body.clone();
1537 let api_key = config.api_key.clone();
1538 let signing_key = config.signing_key.clone();
1539 joins.spawn(async move {
1540 let body_bytes = match serde_json::to_vec(&payload) {
1543 Ok(b) => b,
1544 Err(e) => {
1545 return (id, Err(format!("serialise body: {e}")));
1546 }
1547 };
1548 let mut req = client
1549 .post(&url)
1550 .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
1551 .body(body_bytes.clone());
1552 req = req.header("X-Catchup", "bulk");
1556 if let Some(sk) = signing_key.as_deref() {
1558 let nonce = uuid::Uuid::new_v4().to_string();
1559 let sig_header = crate::federation::signing::sign_body_with_nonce_header(
1560 sk,
1561 &body_bytes,
1562 &nonce,
1563 );
1564 req = req
1565 .header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
1566 .header(crate::federation::signing::NONCE_HEADER, nonce);
1567 }
1568 if let Some(key) = api_key.as_deref() {
1573 req = req.header(crate::HEADER_API_KEY, key);
1574 }
1575 if let Some(peer_id) = payload
1579 .get(field_names::SENDER_AGENT_ID)
1580 .and_then(|v| v.as_str())
1581 .filter(|s| !s.is_empty())
1582 {
1583 req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
1584 }
1585 let outcome = match req.send().await {
1586 Ok(resp) if resp.status().is_success() => {
1587 let _ = resp.bytes().await;
1590 Ok(())
1591 }
1592 Ok(resp) => {
1593 let status = resp.status();
1596 let _ = resp.bytes().await;
1597 Err(format!("http {status}"))
1598 }
1599 Err(e) => Err(crate::errors::msg::network(e)),
1600 };
1601 (id, outcome)
1602 });
1603 }
1604 let mut errors = Vec::new();
1605 while let Some(res) = joins.join_next().await {
1606 match res {
1607 Ok((peer_id, Err(err))) => {
1608 tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
1609 errors.push((peer_id, err));
1610 }
1611 Ok((_, Ok(()))) => {}
1612 Err(e) => {
1613 tracing::warn!("bulk_catchup_push: join error: {e:?}");
1614 errors.push(("unknown".to_string(), e.to_string()));
1615 }
1616 }
1617 }
1618 errors
1619}