use std::sync::Arc;
use std::time::Duration;
use super::FederationConfig;
use super::sync::{AckOutcome, post_once};
pub(crate) const PUSH_DLQ_TRACE_TARGET: &str = "ai_memory::federation::push_dlq";
#[derive(Debug, Clone)]
pub struct FederationPushDlqRow {
pub id: i64,
pub memory_id: String,
pub peer_id: String,
pub payload_json: serde_json::Value,
pub attempt_count: i32,
pub last_error: String,
}
#[async_trait::async_trait]
pub trait FederationDlqSink: Send + Sync {
async fn enqueue_push_failure(
&self,
memory_id: &str,
peer_id: &str,
payload_json: &serde_json::Value,
last_error: &str,
) -> Result<(), String>;
async fn take_pending_dlq_rows(
&self,
limit: usize,
) -> Result<Vec<FederationPushDlqRow>, String>;
async fn mark_dlq_row_replayed(&self, id: i64) -> Result<(), String>;
async fn bump_dlq_attempt(&self, id: i64, last_error: &str) -> Result<(), String>;
async fn pending_dlq_count(&self) -> Result<i64, String>;
}
#[must_use]
pub fn spawn_replay_federation_push_dlq(
config: FederationConfig,
sink: Arc<dyn FederationDlqSink>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
replay_once(&config, sink.as_ref()).await;
tokio::time::sleep(interval).await;
}
})
}
pub const REPLAY_BATCH_SIZE: usize = 64;
pub const ENV_FED_DLQ_REPLAY_MAX_BATCH: &str = "AI_MEMORY_FED_DLQ_REPLAY_MAX_BATCH";
pub const DEFAULT_REPLAY_MAX_BATCH: usize = 2048;
#[must_use]
pub fn replay_max_batch() -> usize {
match std::env::var(ENV_FED_DLQ_REPLAY_MAX_BATCH) {
Ok(raw) => match raw.trim().parse::<usize>() {
Ok(v) if v >= REPLAY_BATCH_SIZE => v,
_ => {
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
raw = %raw,
"ignoring {ENV_FED_DLQ_REPLAY_MAX_BATCH}={raw} (must be an integer >= \
{REPLAY_BATCH_SIZE}); using default {DEFAULT_REPLAY_MAX_BATCH}"
);
DEFAULT_REPLAY_MAX_BATCH
}
},
Err(_) => DEFAULT_REPLAY_MAX_BATCH,
}
}
pub const MAX_REPLAY_ATTEMPTS: i32 = 100;
pub async fn replay_once(config: &FederationConfig, sink: &dyn FederationDlqSink) {
let batch = match sink.pending_dlq_count().await {
Ok(backlog) => usize::try_from(backlog)
.unwrap_or(REPLAY_BATCH_SIZE)
.clamp(REPLAY_BATCH_SIZE, replay_max_batch()),
Err(e) => {
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
"replay_federation_push_dlq: pending count failed ({e}); \
using fixed batch {REPLAY_BATCH_SIZE}"
);
REPLAY_BATCH_SIZE
}
};
let rows = match sink.take_pending_dlq_rows(batch).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
"replay_federation_push_dlq: failed to load pending rows: {e}"
);
return;
}
};
if rows.is_empty() {
refresh_depth_gauge(sink).await;
return;
}
tracing::info!(
target: PUSH_DLQ_TRACE_TARGET,
rows = rows.len(),
"federation: replay_federation_push_dlq draining {} row(s)",
rows.len(),
);
for row in rows {
if row.attempt_count >= MAX_REPLAY_ATTEMPTS {
crate::metrics::registry()
.federation_push_dlq_quarantined
.inc();
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
peer_id = %row.peer_id,
memory_id = %row.memory_id,
attempt_count = row.attempt_count,
"replay: row {} quarantined after {} attempts (ceiling {MAX_REPLAY_ATTEMPTS}); \
no CLI drain surface ships at v0.7.0 — see docs/TROUBLESHOOTING.md \
§federation-push-DLQ for the data-layer drain procedure (#1578)",
row.id,
row.attempt_count,
);
continue;
}
let Some(peer) = config.peers.iter().find(|p| p.id == row.peer_id) else {
let _ = sink
.bump_dlq_attempt(row.id, "peer no longer in FederationConfig")
.await;
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
peer_id = %row.peer_id,
"replay: peer {} not in FederationConfig — leaving row pending",
row.peer_id,
);
continue;
};
let outcome = post_once(
&config.client,
&peer.sync_push_url,
&row.payload_json,
&row.memory_id,
Some(&row.memory_id),
config.api_key.as_deref(),
config.signing_key.as_deref(),
)
.await;
match outcome {
AckOutcome::Ack => {
if let Err(e) = sink.mark_dlq_row_replayed(row.id).await {
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
"replay: peer {} acked but mark_dlq_row_replayed failed: {e}",
row.peer_id,
);
} else {
tracing::info!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
memory_id = %row.memory_id,
peer_id = %row.peer_id,
"replay: peer {} acked for {} (DLQ row {} cleared)",
row.peer_id,
row.memory_id,
row.id,
);
}
}
AckOutcome::IdDrift => {
let _ = sink
.bump_dlq_attempt(row.id, "replay observed id_drift on peer ack")
.await;
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
"replay: peer {} returned id_drift on row {} — leaving pending",
row.peer_id,
row.id,
);
}
AckOutcome::Fail(reason) => {
let _ = sink.bump_dlq_attempt(row.id, &reason).await;
tracing::debug!(
target: PUSH_DLQ_TRACE_TARGET,
row_id = row.id,
"replay: peer {} still failing on row {}: {reason}",
row.peer_id,
row.id,
);
}
}
}
refresh_depth_gauge(sink).await;
}
async fn refresh_depth_gauge(sink: &dyn FederationDlqSink) {
match sink.pending_dlq_count().await {
Ok(depth) => {
crate::metrics::registry()
.federation_push_dlq_depth
.set(depth);
}
Err(e) => {
tracing::warn!(
target: PUSH_DLQ_TRACE_TARGET,
"replay: failed to refresh federation_push_dlq_depth: {e}"
);
}
}
}
pub struct SqliteDlqSink {
db: crate::handlers::Db,
}
impl SqliteDlqSink {
#[must_use]
pub fn new(db: crate::handlers::Db) -> Self {
Self { db }
}
}
#[async_trait::async_trait]
impl FederationDlqSink for SqliteDlqSink {
async fn enqueue_push_failure(
&self,
memory_id: &str,
peer_id: &str,
payload_json: &serde_json::Value,
last_error: &str,
) -> Result<(), String> {
let now = chrono::Utc::now().to_rfc3339();
let payload_str = payload_json.to_string();
let conn = self.db.lock().await;
conn.0
.execute(
"INSERT INTO federation_push_dlq \
(memory_id, peer_id, payload_json, attempt_count, last_error, failed_at) \
VALUES (?1, ?2, ?3, 1, ?4, ?5) \
ON CONFLICT(memory_id, peer_id) WHERE replayed_at IS NULL \
DO UPDATE SET \
attempt_count = attempt_count + 1, \
last_error = excluded.last_error",
rusqlite::params![memory_id, peer_id, payload_str, last_error, now],
)
.map_err(|e| format!("sqlite enqueue_push_failure: {e}"))?;
Ok(())
}
async fn take_pending_dlq_rows(
&self,
limit: usize,
) -> Result<Vec<FederationPushDlqRow>, String> {
let conn = self.db.lock().await;
let mut stmt = conn
.0
.prepare(
"SELECT id, memory_id, peer_id, payload_json, attempt_count, last_error \
FROM federation_push_dlq \
WHERE replayed_at IS NULL AND attempt_count < ?2 \
ORDER BY failed_at ASC \
LIMIT ?1",
)
.map_err(|e| format!("sqlite take_pending_dlq_rows prepare: {e}"))?;
let rows = stmt
.query_map(
rusqlite::params![limit as i64, MAX_REPLAY_ATTEMPTS],
|row| {
let payload_str: String = row.get(3)?;
let payload_json =
serde_json::from_str(&payload_str).unwrap_or(serde_json::json!({}));
Ok(FederationPushDlqRow {
id: row.get(0)?,
memory_id: row.get(1)?,
peer_id: row.get(2)?,
payload_json,
attempt_count: row.get(4)?,
last_error: row.get(5)?,
})
},
)
.map_err(|e| format!("sqlite take_pending_dlq_rows query: {e}"))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("sqlite take_pending_dlq_rows collect: {e}"))?;
Ok(rows)
}
async fn mark_dlq_row_replayed(&self, id: i64) -> Result<(), String> {
let now = chrono::Utc::now().to_rfc3339();
let conn = self.db.lock().await;
conn.0
.execute(
"UPDATE federation_push_dlq SET replayed_at = ?1 WHERE id = ?2",
rusqlite::params![now, id],
)
.map_err(|e| format!("sqlite mark_dlq_row_replayed: {e}"))?;
Ok(())
}
async fn bump_dlq_attempt(&self, id: i64, last_error: &str) -> Result<(), String> {
let conn = self.db.lock().await;
conn.0
.execute(
"UPDATE federation_push_dlq \
SET attempt_count = attempt_count + 1, last_error = ?1 \
WHERE id = ?2 AND replayed_at IS NULL",
rusqlite::params![last_error, id],
)
.map_err(|e| format!("sqlite bump_dlq_attempt: {e}"))?;
Ok(())
}
async fn pending_dlq_count(&self) -> Result<i64, String> {
let conn = self.db.lock().await;
conn.0
.query_row(
"SELECT COUNT(*) FROM federation_push_dlq WHERE replayed_at IS NULL",
[],
|r| r.get::<_, i64>(0),
)
.map_err(|e| format!("sqlite pending_dlq_count: {e}"))
}
}
#[cfg(feature = "sal-postgres")]
pub struct PostgresDlqSink {
store: std::sync::Arc<crate::store::postgres::PostgresStore>,
}
#[cfg(feature = "sal-postgres")]
impl PostgresDlqSink {
#[must_use]
pub fn new(store: std::sync::Arc<crate::store::postgres::PostgresStore>) -> Self {
Self { store }
}
}
#[cfg(feature = "sal-postgres")]
#[async_trait::async_trait]
impl FederationDlqSink for PostgresDlqSink {
async fn enqueue_push_failure(
&self,
memory_id: &str,
peer_id: &str,
payload_json: &serde_json::Value,
last_error: &str,
) -> Result<(), String> {
let pool = self.store.pool();
sqlx::query(
"INSERT INTO federation_push_dlq \
(memory_id, peer_id, payload_json, attempt_count, last_error) \
VALUES ($1, $2, $3::jsonb, 1, $4) \
ON CONFLICT (memory_id, peer_id) WHERE replayed_at IS NULL \
DO UPDATE SET \
attempt_count = federation_push_dlq.attempt_count + 1, \
last_error = EXCLUDED.last_error",
)
.bind(memory_id)
.bind(peer_id)
.bind(payload_json.to_string())
.bind(last_error)
.execute(pool)
.await
.map_err(|e| format!("postgres enqueue_push_failure: {e}"))?;
Ok(())
}
async fn take_pending_dlq_rows(
&self,
limit: usize,
) -> Result<Vec<FederationPushDlqRow>, String> {
let pool = self.store.pool();
let limit_i64: i64 = limit.try_into().unwrap_or(i64::MAX);
let rows: Vec<(i64, String, String, serde_json::Value, i32, String)> = sqlx::query_as(
"SELECT id, memory_id, peer_id, payload_json, attempt_count, last_error \
FROM federation_push_dlq \
WHERE replayed_at IS NULL AND attempt_count < $2 \
ORDER BY failed_at ASC \
LIMIT $1",
)
.bind(limit_i64)
.bind(MAX_REPLAY_ATTEMPTS)
.fetch_all(pool)
.await
.map_err(|e| format!("postgres take_pending_dlq_rows: {e}"))?;
Ok(rows
.into_iter()
.map(
|(id, memory_id, peer_id, payload_json, attempt_count, last_error)| {
FederationPushDlqRow {
id,
memory_id,
peer_id,
payload_json,
attempt_count,
last_error,
}
},
)
.collect())
}
async fn mark_dlq_row_replayed(&self, id: i64) -> Result<(), String> {
let pool = self.store.pool();
sqlx::query("UPDATE federation_push_dlq SET replayed_at = now() WHERE id = $1")
.bind(id)
.execute(pool)
.await
.map_err(|e| format!("postgres mark_dlq_row_replayed: {e}"))?;
Ok(())
}
async fn bump_dlq_attempt(&self, id: i64, last_error: &str) -> Result<(), String> {
let pool = self.store.pool();
sqlx::query(
"UPDATE federation_push_dlq \
SET attempt_count = attempt_count + 1, last_error = $1 \
WHERE id = $2 AND replayed_at IS NULL",
)
.bind(last_error)
.bind(id)
.execute(pool)
.await
.map_err(|e| format!("postgres bump_dlq_attempt: {e}"))?;
Ok(())
}
async fn pending_dlq_count(&self) -> Result<i64, String> {
let pool = self.store.pool();
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM federation_push_dlq WHERE replayed_at IS NULL")
.fetch_one(pool)
.await
.map_err(|e| format!("postgres pending_dlq_count: {e}"))?;
Ok(row.0)
}
}
#[cfg(test)]
mod replay_arm_tests {
use super::{
DEFAULT_REPLAY_MAX_BATCH, ENV_FED_DLQ_REPLAY_MAX_BATCH, FederationDlqSink,
FederationPushDlqRow, MAX_REPLAY_ATTEMPTS, REPLAY_BATCH_SIZE, replay_max_batch,
replay_once,
};
use crate::federation::{FederationConfig, PeerEndpoint};
use crate::replication::QuorumPolicy;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
fn env_lock() -> std::sync::MutexGuard<'static, ()> {
use std::sync::OnceLock;
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[derive(Default)]
struct MockSink {
rows: Mutex<Vec<FederationPushDlqRow>>,
marked_replayed: Mutex<Vec<i64>>,
bumped: Mutex<Vec<(i64, String)>>,
count_should_err: bool,
take_should_err: bool,
take_calls: AtomicUsize,
}
#[async_trait::async_trait]
impl FederationDlqSink for MockSink {
async fn enqueue_push_failure(
&self,
memory_id: &str,
peer_id: &str,
payload_json: &serde_json::Value,
last_error: &str,
) -> Result<(), String> {
self.rows.lock().unwrap().push(FederationPushDlqRow {
id: (self.rows.lock().unwrap().len() + 1) as i64,
memory_id: memory_id.to_string(),
peer_id: peer_id.to_string(),
payload_json: payload_json.clone(),
attempt_count: 1,
last_error: last_error.to_string(),
});
Ok(())
}
async fn take_pending_dlq_rows(
&self,
_limit: usize,
) -> Result<Vec<FederationPushDlqRow>, String> {
self.take_calls.fetch_add(1, Ordering::SeqCst);
if self.take_should_err {
return Err("mock take error".to_string());
}
Ok(self.rows.lock().unwrap().clone())
}
async fn mark_dlq_row_replayed(&self, id: i64) -> Result<(), String> {
self.marked_replayed.lock().unwrap().push(id);
Ok(())
}
async fn bump_dlq_attempt(&self, id: i64, last_error: &str) -> Result<(), String> {
self.bumped
.lock()
.unwrap()
.push((id, last_error.to_string()));
Ok(())
}
async fn pending_dlq_count(&self) -> Result<i64, String> {
if self.count_should_err {
return Err("mock count error".to_string());
}
Ok(self.rows.lock().unwrap().len() as i64)
}
}
fn cfg_with_peer(peer_id: &str, url: &str) -> FederationConfig {
FederationConfig {
policy: QuorumPolicy::new(1, 1, Duration::from_millis(200), Duration::from_secs(30))
.unwrap(),
peers: vec![PeerEndpoint {
id: peer_id.to_string(),
sync_push_url: url.to_string(),
}],
client: reqwest::Client::builder()
.timeout(Duration::from_millis(200))
.build()
.unwrap(),
sender_agent_id: "ai:cov3-dlq".to_string(),
api_key: None,
signing_key: None,
dlq_sink: None,
}
}
fn row(id: i64, peer_id: &str, attempt_count: i32) -> FederationPushDlqRow {
FederationPushDlqRow {
id,
memory_id: format!("mem-{id}"),
peer_id: peer_id.to_string(),
payload_json: serde_json::json!({"id": format!("mem-{id}")}),
attempt_count,
last_error: String::new(),
}
}
#[test]
fn replay_max_batch_env_arms() {
let _g = env_lock();
unsafe {
std::env::remove_var(ENV_FED_DLQ_REPLAY_MAX_BATCH);
}
assert_eq!(
replay_max_batch(),
DEFAULT_REPLAY_MAX_BATCH,
"unset → default"
);
unsafe {
std::env::set_var(ENV_FED_DLQ_REPLAY_MAX_BATCH, "5000");
}
assert_eq!(replay_max_batch(), 5000, "valid override honoured");
unsafe {
std::env::set_var(ENV_FED_DLQ_REPLAY_MAX_BATCH, "10");
}
assert_eq!(
replay_max_batch(),
DEFAULT_REPLAY_MAX_BATCH,
"below floor falls through"
);
unsafe {
std::env::set_var(ENV_FED_DLQ_REPLAY_MAX_BATCH, "not-a-number");
}
assert_eq!(
replay_max_batch(),
DEFAULT_REPLAY_MAX_BATCH,
"garbage → default"
);
unsafe {
std::env::set_var(ENV_FED_DLQ_REPLAY_MAX_BATCH, &REPLAY_BATCH_SIZE.to_string());
}
assert_eq!(replay_max_batch(), REPLAY_BATCH_SIZE, "floor accepted");
unsafe {
std::env::remove_var(ENV_FED_DLQ_REPLAY_MAX_BATCH);
}
}
#[tokio::test]
async fn empty_queue_only_refreshes_gauge() {
let sink = MockSink::default();
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
assert_eq!(sink.take_calls.load(Ordering::SeqCst), 1);
assert!(sink.marked_replayed.lock().unwrap().is_empty());
assert!(sink.bumped.lock().unwrap().is_empty());
}
#[tokio::test]
async fn quarantined_row_is_skipped() {
let sink = MockSink::default();
sink.rows
.lock()
.unwrap()
.push(row(1, "peer-0", MAX_REPLAY_ATTEMPTS));
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
assert!(sink.marked_replayed.lock().unwrap().is_empty());
assert!(sink.bumped.lock().unwrap().is_empty());
}
#[tokio::test]
async fn peer_no_longer_in_config_bumps_and_leaves() {
let sink = MockSink::default();
sink.rows.lock().unwrap().push(row(7, "peer-gone", 1));
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
let bumped = sink.bumped.lock().unwrap();
assert_eq!(bumped.len(), 1);
assert_eq!(bumped[0].0, 7);
assert!(bumped[0].1.contains("no longer in FederationConfig"));
}
#[tokio::test]
async fn unreachable_peer_yields_fail_and_bumps() {
let sink = MockSink::default();
sink.rows.lock().unwrap().push(row(3, "peer-0", 1));
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
assert!(
!sink.bumped.lock().unwrap().is_empty(),
"a failed POST must bump attempt_count"
);
assert!(sink.marked_replayed.lock().unwrap().is_empty());
}
#[tokio::test]
async fn pending_count_error_degrades_to_fixed_batch() {
let mut sink = MockSink::default();
sink.count_should_err = true;
sink.rows.lock().unwrap().push(row(1, "peer-gone", 1));
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
assert_eq!(sink.take_calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn take_error_returns_early() {
let mut sink = MockSink::default();
sink.take_should_err = true;
let cfg = cfg_with_peer("peer-0", "http://127.0.0.1:1/api/v1/sync/push");
replay_once(&cfg, &sink).await;
assert!(sink.marked_replayed.lock().unwrap().is_empty());
assert!(sink.bumped.lock().unwrap().is_empty());
}
}