use tracing::debug;
use crate::types::{Lsn, TenantId};
#[derive(Debug, Clone)]
pub struct ChangeEvent {
pub lsn: Lsn,
pub tenant_id: TenantId,
pub collection: String,
pub document_id: String,
pub operation: ChangeOperation,
pub timestamp_ms: u64,
pub after: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOperation {
Insert,
Update,
Delete,
}
impl ChangeOperation {
pub fn as_str(self) -> &'static str {
match self {
Self::Insert => "INSERT",
Self::Update => "UPDATE",
Self::Delete => "DELETE",
}
}
}
pub struct Subscription {
pub id: u64,
pub receiver: tokio::sync::broadcast::Receiver<ChangeEvent>,
pub collection_filter: Option<String>,
pub tenant_filter: Option<TenantId>,
pub field_filter: Vec<String>,
active_counter: std::sync::Arc<std::sync::atomic::AtomicU64>,
}
impl Drop for Subscription {
fn drop(&mut self) {
self.active_counter
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
impl Subscription {
pub async fn recv_filtered(
&mut self,
) -> Result<ChangeEvent, tokio::sync::broadcast::error::RecvError> {
loop {
let event = self.receiver.recv().await?;
if self
.collection_filter
.as_ref()
.is_some_and(|c| event.collection != *c)
{
continue;
}
if self
.tenant_filter
.as_ref()
.is_some_and(|t| event.tenant_id != *t)
{
continue;
}
return Ok(event);
}
}
}
pub struct ChangeStream {
sender: tokio::sync::broadcast::Sender<ChangeEvent>,
next_sub_id: std::sync::atomic::AtomicU64,
active_subscriptions: std::sync::Arc<std::sync::atomic::AtomicU64>,
events_published: std::sync::atomic::AtomicU64,
last_lsn: std::sync::atomic::AtomicU64,
recent_changes: std::sync::RwLock<std::collections::VecDeque<ChangeEvent>>,
recent_capacity: usize,
}
impl ChangeStream {
pub fn new(capacity: usize) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(capacity);
Self {
sender,
next_sub_id: std::sync::atomic::AtomicU64::new(1),
active_subscriptions: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
events_published: std::sync::atomic::AtomicU64::new(0),
last_lsn: std::sync::atomic::AtomicU64::new(0),
recent_changes: std::sync::RwLock::new(std::collections::VecDeque::with_capacity(
capacity,
)),
recent_capacity: capacity,
}
}
pub fn subscribe(
&self,
collection_filter: Option<String>,
tenant_filter: Option<TenantId>,
) -> Subscription {
let id = self
.next_sub_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let receiver = self.sender.subscribe();
self.active_subscriptions
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
debug!(
id,
?collection_filter,
?tenant_filter,
"change stream: new subscription"
);
Subscription {
id,
receiver,
collection_filter,
tenant_filter,
field_filter: Vec::new(),
active_counter: std::sync::Arc::clone(&self.active_subscriptions),
}
}
pub fn publish(&self, event: ChangeEvent) {
self.last_lsn
.store(event.lsn.as_u64(), std::sync::atomic::Ordering::Relaxed);
self.events_published
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut buf) = self.recent_changes.write() {
if buf.len() >= self.recent_capacity {
buf.pop_front();
}
buf.push_back(event.clone());
}
let _ = self.sender.send(event);
}
pub fn publish_batch(&self, events: &[ChangeEvent]) {
for event in events {
self.publish(event.clone());
}
}
pub fn subscriber_count(&self) -> u64 {
self.active_subscriptions
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn events_published(&self) -> u64 {
self.events_published
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn last_lsn(&self) -> u64 {
self.last_lsn.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn query_changes(
&self,
collection: Option<&str>,
since_ms: u64,
limit: usize,
) -> Vec<ChangeEvent> {
let buf = match self.recent_changes.read() {
Ok(b) => b,
Err(p) => p.into_inner(),
};
buf.iter()
.filter(|e| e.timestamp_ms >= since_ms && collection.is_none_or(|c| e.collection == c))
.take(limit)
.cloned()
.collect()
}
pub fn unsubscribe(&self) {
self.active_subscriptions
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn publish_and_receive() {
let stream = ChangeStream::new(64);
let mut sub = stream.subscribe(None, None);
let event = ChangeEvent {
lsn: Lsn::new(1),
tenant_id: TenantId::new(1),
collection: "users".into(),
document_id: "u1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
after: None,
};
stream.publish(event);
let received = sub.receiver.recv().await.unwrap();
assert_eq!(received.document_id, "u1");
assert_eq!(received.operation, ChangeOperation::Insert);
}
#[tokio::test]
async fn multiple_subscribers() {
let stream = ChangeStream::new(64);
let mut sub1 = stream.subscribe(None, None);
let mut sub2 = stream.subscribe(Some("orders".into()), None);
let event = ChangeEvent {
lsn: Lsn::new(2),
tenant_id: TenantId::new(1),
collection: "orders".into(),
document_id: "o1".into(),
operation: ChangeOperation::Update,
timestamp_ms: 2000,
after: None,
};
stream.publish(event);
let r1 = sub1.receiver.recv().await.unwrap();
let r2 = sub2.receiver.recv().await.unwrap();
assert_eq!(r1.document_id, "o1");
assert_eq!(r2.document_id, "o1");
}
#[test]
fn metrics_tracking() {
let stream = ChangeStream::new(64);
let _sub = stream.subscribe(None, None);
assert_eq!(stream.subscriber_count(), 1);
for i in 0..10 {
stream.publish(ChangeEvent {
lsn: Lsn::new(i),
tenant_id: TenantId::new(1),
collection: "test".into(),
document_id: format!("d{i}"),
operation: ChangeOperation::Insert,
timestamp_ms: 0,
after: None,
});
}
assert_eq!(stream.events_published(), 10);
assert_eq!(stream.last_lsn(), 9);
drop(_sub);
assert_eq!(stream.subscriber_count(), 0);
}
}