use nodedb_array::sync::hlc::Hlc;
use nodedb_types::sync::wire::SyncMessageType;
use nodedb_types::sync::wire::array::{ArrayRejectMsg, ArrayRejectReason};
use tracing::warn;
use super::delivery::{ArrayDeliveryRegistry, ArrayFrame};
pub fn check_and_trigger(
session_id: &str,
array_name: &str,
last_pushed_hlc: Hlc,
snapshot_hlc: Hlc,
delivery: &ArrayDeliveryRegistry,
) -> bool {
if last_pushed_hlc >= snapshot_hlc {
return false;
}
warn!(
session = %session_id,
array = %array_name,
last_pushed = ?last_pushed_hlc,
snapshot_boundary = ?snapshot_hlc,
"array_outbound: subscriber cursor below snapshot boundary — triggering catch-up"
);
let reject = ArrayRejectMsg {
array: array_name.to_string(),
op_hlc_bytes: last_pushed_hlc.to_bytes(),
reason: ArrayRejectReason::RetentionFloor,
detail: format!(
"subscriber cursor is below snapshot_hlc {:?}; issue CatchupRequest to resync",
snapshot_hlc
),
};
if let Some(frame) = encode_reject(&reject) {
delivery.enqueue(session_id, frame);
}
true
}
fn encode_reject(msg: &ArrayRejectMsg) -> Option<ArrayFrame> {
nodedb_types::sync::wire::SyncFrame::try_encode(SyncMessageType::ArrayReject, msg)
.map(|f| f.to_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_array::sync::replica_id::ReplicaId;
fn hlc(ms: u64) -> Hlc {
Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
}
#[tokio::test]
async fn no_trigger_when_cursor_at_boundary() {
let reg = ArrayDeliveryRegistry::new();
let mut rx = reg.register("s1".into());
let fired = check_and_trigger("s1", "arr", hlc(100), hlc(100), ®);
assert!(!fired);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn triggers_when_cursor_below_boundary() {
let reg = ArrayDeliveryRegistry::new();
let mut rx = reg.register("s1".into());
let fired = check_and_trigger("s1", "arr", hlc(50), hlc(100), ®);
assert!(fired);
let frame = rx.try_recv().expect("frame should be enqueued");
assert!(!frame.is_empty());
}
}