#[cfg(feature = "sal")]
use std::sync::Arc;
use std::time::Duration;
use super::FederationConfig;
fn log_catchup_http_skip(peer_id: &str, status: impl std::fmt::Display) {
tracing::debug!("catchup: peer {peer_id} returned HTTP {status} — skipping this tick");
}
fn log_catchup_unreachable(peer_id: &str, e: impl std::fmt::Display) {
tracing::debug!("catchup: peer {peer_id} unreachable: {e}");
}
fn log_catchup_unparseable_body(peer_id: &str, e: impl std::fmt::Display) {
tracing::warn!("catchup: peer {peer_id} returned unparseable body: {e}");
}
fn log_catchup_pull_ok(peer_id: &str, rows: usize) {
tracing::info!("catchup: pull: {peer_id} ok ({rows} row(s) returned)");
}
fn log_catchup_unparseable_memory(peer_id: &str, e: impl std::fmt::Display) {
tracing::warn!("catchup: unparseable memory from peer {peer_id}: {e}");
}
fn log_catchup_sync_state_observe_failed(peer_id: &str, e: impl std::fmt::Display) {
tracing::warn!("catchup: sync_state_observe failed for {peer_id}: {e}");
}
pub fn spawn_catchup_loop(
config: FederationConfig,
db: crate::handlers::Db,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
#[cfg(feature = "sal")]
{
spawn_catchup_loop_with_store(config, db, None, interval)
}
#[cfg(not(feature = "sal"))]
{
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
catchup_once(&config, &db).await;
tokio::time::sleep(interval).await;
}
})
}
}
#[cfg(feature = "sal")]
pub fn spawn_catchup_loop_with_store(
config: FederationConfig,
db: crate::handlers::Db,
store: Option<Arc<dyn crate::store::MemoryStore>>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
catchup_once_with_store(&config, &db, store.as_ref()).await;
tokio::time::sleep(interval).await;
}
})
}
#[cfg_attr(not(test), allow(dead_code))]
pub(super) async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
#[cfg(feature = "sal")]
{
catchup_once_with_store(config, db, None).await;
}
#[cfg(not(feature = "sal"))]
{
catchup_once_legacy(config, db).await;
}
}
#[cfg(feature = "sal")]
pub(super) async fn catchup_once_with_store(
config: &FederationConfig,
db: &crate::handlers::Db,
store: Option<&Arc<dyn crate::store::MemoryStore>>,
) {
let local_id = config.sender_agent_id.clone();
for peer in &config.peers {
let base = peer
.sync_push_url
.trim_end_matches(crate::handlers::routes::SYNC_PUSH)
.to_string();
let since_opt: Option<String> = {
let lock = db.lock().await;
match crate::db::sync_state_load(&lock.0, &local_id) {
Ok(clock) => clock.entries.get(&peer.id).cloned(),
Err(_) => None,
}
};
let url = sync_since_url(&base, &local_id, since_opt.as_deref());
let mut req = config
.client
.get(&url)
.header(crate::HEADER_AGENT_ID, local_id.as_str())
.header(
crate::federation::peer_attestation::PEER_ID_HEADER,
local_id.as_str(),
);
if let Some(ref key) = config.api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
let resp = match req.send().await {
Ok(r) if r.status().is_success() => r,
Ok(r) => {
log_catchup_http_skip(&peer.id, r.status());
continue;
}
Err(e) => {
log_catchup_unreachable(&peer.id, e);
continue;
}
};
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(e) => {
log_catchup_unparseable_body(&peer.id, e);
continue;
}
};
let memories = match body.get("memories").and_then(|v| v.as_array()) {
Some(arr) => arr.clone(),
None => continue,
};
log_catchup_pull_ok(&peer.id, memories.len());
if memories.is_empty() {
continue;
}
let mut applied = 0usize;
let mut latest_ts: Option<String> = None;
if let Some(store) = store {
let ctx = crate::store::CallerContext::for_admin(
crate::identity::sentinels::FEDERATION_CATCHUP,
);
for raw in &memories {
let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
Ok(m) => m,
Err(e) => {
log_catchup_unparseable_memory(&peer.id, e);
continue;
}
};
if crate::validate::validate_memory(&mem).is_err() {
continue;
}
if latest_ts
.as_deref()
.is_none_or(|cur| mem.updated_at.as_str() > cur)
{
latest_ts = Some(mem.updated_at.clone());
}
match store.apply_remote_memory(&ctx, &mem).await {
Ok(_) => applied += 1,
Err(e) => {
tracing::warn!(
"catchup: apply_remote_memory failed for peer {}: {e}",
peer.id
);
}
}
}
if let Some(ts) = latest_ts.as_deref() {
let lock = db.lock().await;
if let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts) {
log_catchup_sync_state_observe_failed(&peer.id, e);
}
}
} else {
let lock = db.lock().await;
for raw in &memories {
let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
Ok(m) => m,
Err(e) => {
log_catchup_unparseable_memory(&peer.id, e);
continue;
}
};
if crate::validate::validate_memory(&mem).is_err() {
continue;
}
if latest_ts
.as_deref()
.is_none_or(|cur| mem.updated_at.as_str() > cur)
{
latest_ts = Some(mem.updated_at.clone());
}
if crate::db::insert_if_newer(&lock.0, &mem).is_ok() {
applied += 1;
}
}
if let Some(ts) = latest_ts.as_deref()
&& let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
{
log_catchup_sync_state_observe_failed(&peer.id, e);
}
}
if applied > 0 {
tracing::info!(
"catchup: applied {applied} memories from peer {} (since={})",
peer.id,
since_opt.as_deref().unwrap_or("<full-snapshot>"),
);
}
}
}
#[cfg(not(feature = "sal"))]
async fn catchup_once_legacy(config: &FederationConfig, db: &crate::handlers::Db) {
let local_id = config.sender_agent_id.clone();
for peer in &config.peers {
let base = peer
.sync_push_url
.trim_end_matches(crate::handlers::routes::SYNC_PUSH)
.to_string();
let since_opt: Option<String> = {
let lock = db.lock().await;
match crate::db::sync_state_load(&lock.0, &local_id) {
Ok(clock) => clock.entries.get(&peer.id).cloned(),
Err(_) => None,
}
};
let url = sync_since_url(&base, &local_id, since_opt.as_deref());
let mut req = config
.client
.get(&url)
.header(crate::HEADER_AGENT_ID, local_id.as_str())
.header(
crate::federation::peer_attestation::PEER_ID_HEADER,
local_id.as_str(),
);
if let Some(ref key) = config.api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
let resp = match req.send().await {
Ok(r) if r.status().is_success() => r,
Ok(r) => {
log_catchup_http_skip(&peer.id, r.status());
continue;
}
Err(e) => {
log_catchup_unreachable(&peer.id, e);
continue;
}
};
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(e) => {
log_catchup_unparseable_body(&peer.id, e);
continue;
}
};
let memories = match body.get("memories").and_then(|v| v.as_array()) {
Some(arr) => arr.clone(),
None => continue,
};
log_catchup_pull_ok(&peer.id, memories.len());
if memories.is_empty() {
continue;
}
let mut applied = 0usize;
let mut latest_ts: Option<String> = None;
{
let lock = db.lock().await;
for raw in &memories {
let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
Ok(m) => m,
Err(e) => {
log_catchup_unparseable_memory(&peer.id, e);
continue;
}
};
if crate::validate::validate_memory(&mem).is_err() {
continue;
}
if latest_ts
.as_deref()
.is_none_or(|cur| mem.updated_at.as_str() > cur)
{
latest_ts = Some(mem.updated_at.clone());
}
if crate::db::insert_if_newer(&lock.0, &mem).is_ok() {
applied += 1;
}
}
if let Some(ts) = latest_ts.as_deref()
&& let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
{
log_catchup_sync_state_observe_failed(&peer.id, e);
}
}
if applied > 0 {
tracing::info!(
"catchup: applied {applied} memories from peer {} (since={})",
peer.id,
since_opt.as_deref().unwrap_or("<full-snapshot>"),
);
}
}
}
#[doc(hidden)]
pub async fn catchup_once_for_tests(config: &FederationConfig) {
let local_id = config.sender_agent_id.clone();
for peer in &config.peers {
let base = peer
.sync_push_url
.trim_end_matches(crate::handlers::routes::SYNC_PUSH)
.to_string();
let url = sync_since_url(&base, &local_id, None);
let mut req = config
.client
.get(&url)
.header(crate::HEADER_AGENT_ID, local_id.as_str())
.header(
crate::federation::peer_attestation::PEER_ID_HEADER,
local_id.as_str(),
);
if let Some(ref key) = config.api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
let resp = match req.send().await {
Ok(r) if r.status().is_success() => r,
Ok(r) => {
log_catchup_http_skip(&peer.id, r.status());
continue;
}
Err(e) => {
log_catchup_unreachable(&peer.id, e);
continue;
}
};
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(e) => {
log_catchup_unparseable_body(&peer.id, e);
continue;
}
};
let memories = body
.get("memories")
.and_then(|v| v.as_array())
.map(Vec::as_slice)
.unwrap_or(&[]);
log_catchup_pull_ok(&peer.id, memories.len());
}
}
fn sync_since_url(base: &str, local_id: &str, since: Option<&str>) -> String {
match since {
Some(s) => format!(
"{base}{}?since={}&peer={local_id}",
crate::handlers::routes::SYNC_SINCE,
urlencoding_encode(s)
),
None => format!(
"{base}{}?peer={local_id}",
crate::handlers::routes::SYNC_SINCE
),
}
}
pub(super) fn urlencoding_encode(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 6);
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char);
}
_ => {
use std::fmt::Write;
let _ = write!(out, "%{b:02X}");
}
}
}
out
}