use crate::models::field_names;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use crate::federation::identity::chain::CHAIN_HEADER;
use crate::federation::identity::credential::CREDENTIAL_HEADER;
use crate::federation::identity::outbound;
use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
use crate::replication::{AckTracker, QuorumError};
use super::FederationConfig;
const TRACKER_ARC_STILL_REFERENCED: &str = "tracker arc still referenced at finalise";
#[derive(Debug)]
pub(super) enum AckOutcome {
Ack,
IdDrift,
Fail(String),
}
pub(super) async fn post_once(
client: &reqwest::Client,
url: &str,
body: &serde_json::Value,
expected_id: &str,
idempotency_key: Option<&str>,
api_key: Option<&str>,
signing_key: Option<&ed25519_dalek::SigningKey>,
) -> AckOutcome {
let host = reqwest::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(str::to_string))
.unwrap_or_else(|| url.to_string());
let scheme = reqwest::Url::parse(url)
.ok()
.map(|u| u.scheme().to_string())
.unwrap_or_default();
let net_action = crate::governance::agent_action::AgentAction::NetworkRequest {
host: host.clone(),
scheme,
};
if let Err(refusal) = crate::governance::wire_check::check(&net_action) {
return AckOutcome::Fail(format!(
"governance refused outbound to {host}: {}",
refusal.reason
));
}
let body_bytes = match serde_json::to_vec(body) {
Ok(b) => b,
Err(e) => {
return AckOutcome::Fail(format!("serialise body: {e}"));
}
};
let mut req = client
.post(url)
.header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
.body(body_bytes.clone());
if let Some(key) = idempotency_key {
req = req.header("Idempotency-Key", key);
}
if let Some(key) = api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
if let Some(sk) = signing_key {
let nonce = uuid::Uuid::new_v4().to_string();
let sig_header =
crate::federation::signing::sign_body_with_nonce_header(sk, &body_bytes, &nonce);
req = req
.header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
.header(crate::federation::signing::NONCE_HEADER, nonce);
}
if let Some(peer_id) = body
.get(field_names::SENDER_AGENT_ID)
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
}
if let Some(cred) = outbound::current() {
match cred.to_header_value() {
Ok(value) => req = req.header(CREDENTIAL_HEADER, value),
Err(e) => {
tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
"failed to encode outbound federation credential header; omitting");
}
}
let intermediates = outbound::current_intermediates();
match crate::federation::identity::chain::intermediates_to_header_value(&intermediates) {
Ok(Some(value)) => req = req.header(CHAIN_HEADER, value),
Ok(None) => {}
Err(e) => {
tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
"failed to encode outbound federation chain header; omitting");
}
}
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<serde_json::Value>().await {
Ok(v) => {
if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
&& !ids.is_empty()
&& !ids.iter().any(|x| x.as_str() == Some(expected_id))
{
return AckOutcome::IdDrift;
}
AckOutcome::Ack
}
Err(_) => AckOutcome::Ack, }
}
Ok(resp) => {
let status = resp.status();
let _ = resp.bytes().await;
AckOutcome::Fail(format!("http {status}"))
}
Err(e) => AckOutcome::Fail(crate::errors::msg::network(e)),
}
}
pub(super) const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
pub(super) async fn post_and_classify(
client: &reqwest::Client,
url: &str,
body: &serde_json::Value,
expected_id: &str,
idempotency_key: Option<&str>,
api_key: Option<&str>,
signing_key: Option<&ed25519_dalek::SigningKey>,
) -> AckOutcome {
match post_once(
client,
url,
body,
expected_id,
idempotency_key,
api_key,
signing_key,
)
.await
{
AckOutcome::Ack => AckOutcome::Ack,
AckOutcome::IdDrift => AckOutcome::IdDrift,
AckOutcome::Fail(first_reason) => {
tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
match post_once(
client,
url,
body,
expected_id,
idempotency_key,
api_key,
signing_key,
)
.await
{
AckOutcome::Ack => {
tracing::debug!(
"federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
);
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["ok"])
.inc();
AckOutcome::Ack
}
AckOutcome::IdDrift => {
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["id_drift"])
.inc();
AckOutcome::IdDrift
}
AckOutcome::Fail(retry_reason) => {
crate::metrics::registry()
.federation_fanout_retry_total
.with_label_values(&["fail"])
.inc();
AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
}
}
}
}
}
pub async fn broadcast_store_quorum(
config: &FederationConfig,
mem: &Memory,
) -> Result<AckTracker, QuorumError> {
broadcast_store_quorum_with_embedding(config, mem, None).await
}
pub async fn broadcast_store_quorum_with_embedding(
config: &FederationConfig,
mem: &Memory,
shipped: Option<&super::ShippedEmbedding>,
) -> Result<AckTracker, QuorumError> {
tracing::info!(
target: super::SYNC_TRACE_TARGET,
memory_id = %mem.id,
namespace = %mem.namespace,
peer_count = config.peers.len(),
quorum_w = config.policy.w,
"federation::broadcast: store {} -> {} peer(s) (quorum W={})",
mem.id,
config.peers.len(),
config.policy.w,
);
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let mut body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [mem],
"dry_run": false,
});
if let Some(se) = shipped {
body[field_names::EMBEDDINGS] = serde_json::json!([se]);
}
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
#[cfg(feature = "sal")]
let dispatched_peer_ids: Vec<String> = config.peers.iter().map(|p| p.id.clone()).collect();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let id = peer.id.clone();
let mem_id = mem.id.clone();
let payload = body.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&mem_id,
Some(&mem_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(id, outcome)
});
}
#[cfg(feature = "sal")]
let mut explicit_failures: Vec<(String, String)> = Vec::new();
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
#[cfg(feature = "sal")]
explicit_failures.push((peer_id.clone(), reason.clone()));
#[cfg(not(feature = "sal"))]
{
let _ = (peer_id, reason);
}
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: peer join error: {e}");
}
Ok(None) | Err(_) => break, }
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
let mem_id = mem.id.clone();
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
match res {
Ok((peer_id, AckOutcome::Ack)) => {
tracing::debug!("federation: post-quorum ack from {peer_id}");
}
Ok((peer_id, AckOutcome::IdDrift)) => {
tracing::warn!(
"federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
);
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["id_drift"])
.inc();
}
Ok((peer_id, AckOutcome::Fail(reason))) => {
tracing::warn!(
"federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
);
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["peer_fail"])
.inc();
}
Err(e) => {
tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
crate::metrics::registry()
.federation_fanout_dropped_total
.with_label_values(&["join_error"])
.inc();
}
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
if tracker.finalise(Instant::now()).is_ok() {
let acked = tracker.acked_peer_ids();
let mut missing: Vec<String> = config
.peers
.iter()
.filter(|p| !acked.contains(&p.id))
.map(|p| p.sync_push_url.clone())
.collect();
if !missing.is_empty() {
missing.sort();
tracing::warn!(
memory_id = %mem.id,
n_missing = missing.len(),
peer_urls = ?missing,
"federation: quorum met but {} peer(s) did not ack: {:?}",
missing.len(),
missing,
);
crate::metrics::registry()
.federation_partial_quorum_total
.inc();
}
}
#[cfg(feature = "sal")]
if let Some(sink) = config.dlq_sink.as_ref() {
let acked = tracker.acked_peer_ids();
let explicit_map: std::collections::HashMap<String, String> =
explicit_failures.into_iter().collect();
for peer_id in &dispatched_peer_ids {
if acked.contains(peer_id) {
continue;
}
let reason = explicit_map
.get(peer_id)
.cloned()
.unwrap_or_else(|| "deadline_exceeded".to_string());
if let Err(e) = sink
.enqueue_push_failure(&mem.id, peer_id, &body, &reason)
.await
{
tracing::warn!(
target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
memory_id = %mem.id,
peer_id = %peer_id,
"federation: failed to enqueue push-failure DLQ row \
for peer {peer_id} on memory {}: {e}",
mem.id,
);
} else {
tracing::info!(
target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
memory_id = %mem.id,
peer_id = %peer_id,
reason = %reason,
"federation: enqueued push-failure DLQ row for peer {peer_id} \
on memory {} (reason: {reason})",
mem.id,
);
}
}
}
Ok(tracker)
}
pub async fn broadcast_delete_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"deletions": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: delete peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum delete peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_archive_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"archives": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: archive peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum archive peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_restore_quorum(
config: &FederationConfig,
id: &str,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"restores": [id],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = id.to_string();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: restore peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum restore peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_link_quorum(
config: &FederationConfig,
link: &MemoryLink,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"links": [link],
"dry_run": false,
});
let log_id = format!("{}→{}", link.source_id, link.target_id);
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let log_id = log_id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&log_id,
Some(&log_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: link peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum link peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_consolidate_quorum(
config: &FederationConfig,
new_mem: &Memory,
source_ids: &[String],
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [new_mem],
"deletions": source_ids,
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = new_mem.id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: consolidate peer {peer_id} failed for {}: {reason}",
new_mem.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: consolidate peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_pending_quorum(
config: &FederationConfig,
pending: &PendingAction,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"pendings": [pending],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = pending.id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: pending peer {peer_id} failed for {}: {reason}",
pending.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: pending peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum pending peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_pending_decision_quorum(
config: &FederationConfig,
decision: &PendingDecision,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"pending_decisions": [decision],
"dry_run": false,
});
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target_id = decision.id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target_id,
Some(&target_id),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: pending-decision peer {peer_id} failed for {}: {reason}",
decision.id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: pending-decision peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_namespace_meta_quorum(
config: &FederationConfig,
entry: &NamespaceMetaEntry,
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"namespace_meta": [entry],
"dry_run": false,
});
let target_id = entry.namespace.clone();
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target = target_id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target,
Some(&target),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: namespace_meta peer {peer_id} failed for {}: {reason}",
entry.namespace
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: namespace_meta peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn broadcast_namespace_meta_clear_quorum(
config: &FederationConfig,
namespaces: &[String],
) -> Result<AckTracker, QuorumError> {
let now = Instant::now();
let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
tracker.lock().await.record_local();
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": [],
"namespace_meta_clears": namespaces,
"dry_run": false,
});
let target_id = namespaces.join(",");
let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let peer_id = peer.id.clone();
let payload = body.clone();
let target = target_id.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let outcome = post_and_classify(
&client,
&url,
&payload,
&target,
Some(&target),
api_key.as_deref(),
signing_key.as_deref(),
)
.await;
(peer_id, outcome)
});
}
let deadline = now + config.policy.ack_timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, joins.join_next()).await {
Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
tracker.lock().await.record_peer_ack(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
tracker.lock().await.record_id_drift(peer_id);
}
Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
tracing::warn!(
"federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
target_id
);
}
Ok(Some(Err(e))) => {
tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
}
Ok(None) | Err(_) => break,
}
if tracker.lock().await.is_quorum_met(Instant::now()) {
break;
}
}
if !joins.is_empty() {
tokio::spawn(async move {
while let Some(res) = joins.join_next().await {
if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
tracing::debug!(
"federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
);
}
}
});
}
let tracker = Arc::try_unwrap(tracker)
.map_err(|_| QuorumError::LocalWriteFailed {
detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
})?
.into_inner();
Ok(tracker)
}
pub async fn bulk_catchup_push(
config: &FederationConfig,
memories: &[Memory],
) -> Vec<(String, String)> {
if memories.is_empty() || config.peers.is_empty() {
return Vec::new();
}
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): config.sender_agent_id,
"memories": memories,
"dry_run": false,
});
let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
for peer in &config.peers {
let client = config.client.clone();
let url = peer.sync_push_url.clone();
let id = peer.id.clone();
let payload = body.clone();
let api_key = config.api_key.clone();
let signing_key = config.signing_key.clone();
joins.spawn(async move {
let body_bytes = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(e) => {
return (id, Err(format!("serialise body: {e}")));
}
};
let mut req = client
.post(&url)
.header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
.body(body_bytes.clone());
req = req.header("X-Catchup", "bulk");
if let Some(sk) = signing_key.as_deref() {
let nonce = uuid::Uuid::new_v4().to_string();
let sig_header = crate::federation::signing::sign_body_with_nonce_header(
sk,
&body_bytes,
&nonce,
);
req = req
.header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
.header(crate::federation::signing::NONCE_HEADER, nonce);
}
if let Some(key) = api_key.as_deref() {
req = req.header(crate::HEADER_API_KEY, key);
}
if let Some(peer_id) = payload
.get(field_names::SENDER_AGENT_ID)
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
}
let outcome = match req.send().await {
Ok(resp) if resp.status().is_success() => {
let _ = resp.bytes().await;
Ok(())
}
Ok(resp) => {
let status = resp.status();
let _ = resp.bytes().await;
Err(format!("http {status}"))
}
Err(e) => Err(crate::errors::msg::network(e)),
};
(id, outcome)
});
}
let mut errors = Vec::new();
while let Some(res) = joins.join_next().await {
match res {
Ok((peer_id, Err(err))) => {
tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
errors.push((peer_id, err));
}
Ok((_, Ok(()))) => {}
Err(e) => {
tracing::warn!("bulk_catchup_push: join error: {e:?}");
errors.push(("unknown".to_string(), e.to_string()));
}
}
}
errors
}