use std::sync::Arc;
use tracing::{debug, trace, warn};
use crate::types::{Lsn, TenantId};
#[derive(Debug, Clone)]
pub struct ChangeEvent {
pub lsn: Lsn,
pub tenant_id: TenantId,
pub collection: String,
pub document_id: String,
pub operation: ChangeOperation,
pub timestamp_ms: u64,
pub after: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOperation {
Insert,
Update,
Delete,
}
impl ChangeOperation {
pub fn as_str(self) -> &'static str {
match self {
Self::Insert => "INSERT",
Self::Update => "UPDATE",
Self::Delete => "DELETE",
}
}
}
pub struct Subscription {
pub id: u64,
pub receiver: tokio::sync::broadcast::Receiver<ChangeEvent>,
pub collection_filter: Option<String>,
pub tenant_filter: Option<TenantId>,
pub field_filter: Vec<String>,
active_counter: std::sync::Arc<std::sync::atomic::AtomicU64>,
}
impl Drop for Subscription {
fn drop(&mut self) {
self.active_counter
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
impl Subscription {
pub async fn recv_filtered(
&mut self,
) -> Result<ChangeEvent, tokio::sync::broadcast::error::RecvError> {
loop {
let event = self.receiver.recv().await?;
if self
.collection_filter
.as_ref()
.is_some_and(|c| event.collection != *c)
{
continue;
}
if self
.tenant_filter
.as_ref()
.is_some_and(|t| event.tenant_id != *t)
{
continue;
}
return Ok(event);
}
}
}
pub struct ChangeStream {
sender: tokio::sync::broadcast::Sender<ChangeEvent>,
next_sub_id: std::sync::atomic::AtomicU64,
active_subscriptions: std::sync::Arc<std::sync::atomic::AtomicU64>,
events_published: std::sync::atomic::AtomicU64,
last_lsn: std::sync::atomic::AtomicU64,
recent_changes: std::sync::RwLock<std::collections::VecDeque<ChangeEvent>>,
recent_capacity: usize,
}
impl ChangeStream {
pub fn new(capacity: usize) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(capacity);
Self {
sender,
next_sub_id: std::sync::atomic::AtomicU64::new(1),
active_subscriptions: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
events_published: std::sync::atomic::AtomicU64::new(0),
last_lsn: std::sync::atomic::AtomicU64::new(0),
recent_changes: std::sync::RwLock::new(std::collections::VecDeque::with_capacity(
capacity,
)),
recent_capacity: capacity,
}
}
pub fn subscribe(
&self,
collection_filter: Option<String>,
tenant_filter: Option<TenantId>,
) -> Subscription {
let id = self
.next_sub_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let receiver = self.sender.subscribe();
self.active_subscriptions
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
debug!(
id,
?collection_filter,
?tenant_filter,
"change stream: new subscription"
);
Subscription {
id,
receiver,
collection_filter,
tenant_filter,
field_filter: Vec::new(),
active_counter: std::sync::Arc::clone(&self.active_subscriptions),
}
}
pub fn publish(&self, event: ChangeEvent) {
self.last_lsn
.store(event.lsn.as_u64(), std::sync::atomic::Ordering::Relaxed);
self.events_published
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut buf) = self.recent_changes.write() {
if buf.len() >= self.recent_capacity {
buf.pop_front();
}
buf.push_back(event.clone());
}
let _ = self.sender.send(event);
}
pub fn publish_batch(&self, events: &[ChangeEvent]) {
for event in events {
self.publish(event.clone());
}
}
pub fn subscriber_count(&self) -> u64 {
self.active_subscriptions
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn events_published(&self) -> u64 {
self.events_published
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn last_lsn(&self) -> u64 {
self.last_lsn.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn query_changes(
&self,
collection: Option<&str>,
since_ms: u64,
limit: usize,
) -> Vec<ChangeEvent> {
let buf = match self.recent_changes.read() {
Ok(b) => b,
Err(p) => p.into_inner(),
};
buf.iter()
.filter(|e| e.timestamp_ms >= since_ms && collection.is_none_or(|c| e.collection == c))
.take(limit)
.cloned()
.collect()
}
pub fn unsubscribe(&self) {
self.active_subscriptions
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn deliver_remote_notify(
&self,
msg: &crate::event::cross_shard::types::NotifyBroadcastMsg,
) {
let op = match msg.operation.as_str() {
"INSERT" => ChangeOperation::Insert,
"UPDATE" => ChangeOperation::Update,
"DELETE" => ChangeOperation::Delete,
_ => ChangeOperation::Insert,
};
let event = ChangeEvent {
lsn: Lsn::new(msg.lsn),
tenant_id: TenantId::new(msg.tenant_id),
collection: msg.collection.clone(),
document_id: msg.document_id.clone(),
operation: op,
timestamp_ms: msg.timestamp_ms,
after: None,
};
let _ = self.sender.send(event);
}
}
pub fn broadcast_notify_to_cluster(
event: &ChangeEvent,
node_id: u64,
sequence: u64,
transport: &Arc<nodedb_cluster::NexarTransport>,
topology: &Arc<std::sync::RwLock<nodedb_cluster::ClusterTopology>>,
) {
use crate::event::cross_shard::types::NotifyBroadcastMsg;
use nodedb_cluster::RaftRpc;
use nodedb_cluster::wire::{VShardEnvelope, VShardMessageType};
let msg = NotifyBroadcastMsg {
source_node: node_id,
sequence,
tenant_id: event.tenant_id.as_u64(),
collection: event.collection.clone(),
document_id: event.document_id.clone(),
operation: event.operation.as_str().to_string(),
timestamp_ms: event.timestamp_ms,
lsn: event.lsn.as_u64(),
};
let payload = match zerompk::to_msgpack_vec(&msg) {
Ok(p) => p,
Err(e) => {
warn!(error = %e, "failed to serialize NotifyBroadcast");
return;
}
};
let peer_ids: Vec<u64> = {
let topo = topology.read().unwrap_or_else(|p| p.into_inner());
topo.active_nodes()
.iter()
.map(|n| n.node_id)
.filter(|&id| id != node_id)
.collect()
};
if peer_ids.is_empty() {
return;
}
trace!(
peer_count = peer_ids.len(),
collection = %event.collection,
"broadcasting NOTIFY to cluster peers"
);
let transport = Arc::clone(transport);
for peer_id in peer_ids {
let envelope = VShardEnvelope::new(
VShardMessageType::NotifyBroadcast,
node_id,
peer_id,
0, payload.clone(),
);
let transport = Arc::clone(&transport);
tokio::spawn(async move {
let rpc = RaftRpc::VShardEnvelope(envelope.to_bytes());
if let Err(e) = transport.send_rpc(peer_id, rpc).await {
trace!(
peer = peer_id,
error = %e,
"NOTIFY broadcast to peer failed (best-effort)"
);
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn publish_and_receive() {
let stream = ChangeStream::new(64);
let mut sub = stream.subscribe(None, None);
let event = ChangeEvent {
lsn: Lsn::new(1),
tenant_id: TenantId::new(1),
collection: "users".into(),
document_id: "u1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
after: None,
};
stream.publish(event);
let received = sub.receiver.recv().await.unwrap();
assert_eq!(received.document_id, "u1");
assert_eq!(received.operation, ChangeOperation::Insert);
}
#[tokio::test]
async fn multiple_subscribers() {
let stream = ChangeStream::new(64);
let mut sub1 = stream.subscribe(None, None);
let mut sub2 = stream.subscribe(Some("orders".into()), None);
let event = ChangeEvent {
lsn: Lsn::new(2),
tenant_id: TenantId::new(1),
collection: "orders".into(),
document_id: "o1".into(),
operation: ChangeOperation::Update,
timestamp_ms: 2000,
after: None,
};
stream.publish(event);
let r1 = sub1.receiver.recv().await.unwrap();
let r2 = sub2.receiver.recv().await.unwrap();
assert_eq!(r1.document_id, "o1");
assert_eq!(r2.document_id, "o1");
}
#[test]
fn metrics_tracking() {
let stream = ChangeStream::new(64);
let _sub = stream.subscribe(None, None);
assert_eq!(stream.subscriber_count(), 1);
for i in 0..10 {
stream.publish(ChangeEvent {
lsn: Lsn::new(i),
tenant_id: TenantId::new(1),
collection: "test".into(),
document_id: format!("d{i}"),
operation: ChangeOperation::Insert,
timestamp_ms: 0,
after: None,
});
}
assert_eq!(stream.events_published(), 10);
assert_eq!(stream.last_lsn(), 9);
drop(_sub);
assert_eq!(stream.subscriber_count(), 0);
}
}