use std::sync::atomic::{AtomicU64, Ordering};
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::control::change_stream::ChangeEvent;
use crate::types::TenantId;
#[derive(Debug, Clone, Deserialize)]
pub struct SubscriptionRequest {
pub collection: String,
#[serde(default)]
pub filter: Option<serde_json::Value>,
#[serde(default)]
pub tenant_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ChangeNotification {
pub event: String,
pub doc_id: String,
pub collection: String,
pub lsn: u64,
pub timestamp_ms: u64,
}
impl From<&ChangeEvent> for ChangeNotification {
fn from(e: &ChangeEvent) -> Self {
Self {
event: e.operation.as_str().to_string(),
doc_id: e.document_id.clone(),
collection: e.collection.clone(),
lsn: e.lsn.as_u64(),
timestamp_ms: e.timestamp_ms,
}
}
}
pub struct SubscriptionManager {
next_id: AtomicU64,
active: AtomicU64,
events_delivered: AtomicU64,
events_dropped: AtomicU64,
}
impl Default for SubscriptionManager {
fn default() -> Self {
Self::new()
}
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
next_id: AtomicU64::new(1),
active: AtomicU64::new(0),
events_delivered: AtomicU64::new(0),
events_dropped: AtomicU64::new(0),
}
}
pub fn register(&self) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.active.fetch_add(1, Ordering::Relaxed);
debug!(id, "subscription registered");
id
}
pub fn unregister(&self, id: u64) {
self.active.fetch_sub(1, Ordering::Relaxed);
debug!(id, "subscription unregistered");
}
pub fn record_delivery(&self) {
self.events_delivered.fetch_add(1, Ordering::Relaxed);
}
pub fn record_drop(&self) {
self.events_dropped.fetch_add(1, Ordering::Relaxed);
}
pub fn active_count(&self) -> u64 {
self.active.load(Ordering::Relaxed)
}
pub fn events_delivered(&self) -> u64 {
self.events_delivered.load(Ordering::Relaxed)
}
pub fn events_dropped(&self) -> u64 {
self.events_dropped.load(Ordering::Relaxed)
}
}
pub fn matches_filter(
event: &ChangeEvent,
collection: &str,
tenant_id: TenantId,
filter: Option<&serde_json::Value>,
) -> bool {
if event.collection != collection {
return false;
}
if event.tenant_id != tenant_id {
return false;
}
if filter.is_none() {
return true;
}
true
}
#[cfg(test)]
mod tests {
use super::*;
use crate::control::change_stream::ChangeOperation;
use crate::types::Lsn;
#[test]
fn subscription_lifecycle() {
let mgr = SubscriptionManager::new();
let id1 = mgr.register();
let id2 = mgr.register();
assert_eq!(mgr.active_count(), 2);
assert_ne!(id1, id2);
mgr.unregister(id1);
assert_eq!(mgr.active_count(), 1);
mgr.record_delivery();
mgr.record_delivery();
mgr.record_drop();
assert_eq!(mgr.events_delivered(), 2);
assert_eq!(mgr.events_dropped(), 1);
}
#[test]
fn filter_matching() {
let event = ChangeEvent {
lsn: Lsn::new(1),
tenant_id: TenantId::new(1),
collection: "orders".into(),
document_id: "o1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 0,
after: None,
};
assert!(matches_filter(&event, "orders", TenantId::new(1), None));
assert!(!matches_filter(&event, "users", TenantId::new(1), None));
assert!(!matches_filter(&event, "orders", TenantId::new(99), None));
}
#[test]
fn notification_from_event() {
let event = ChangeEvent {
lsn: Lsn::new(42),
tenant_id: TenantId::new(1),
collection: "orders".into(),
document_id: "o1".into(),
operation: ChangeOperation::Update,
timestamp_ms: 12345,
after: None,
};
let notif = ChangeNotification::from(&event);
assert_eq!(notif.event, "UPDATE");
assert_eq!(notif.doc_id, "o1");
assert_eq!(notif.lsn, 42);
}
}