use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrow_array::RecordBatch;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct CommitNotification {
pub version: u64,
pub mutation_count: usize,
pub labels_affected: Vec<String>,
pub edge_types_affected: Vec<String>,
pub rules_promoted: usize,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub tx_id: String,
pub session_id: String,
pub causal_version: u64,
pub mutations: Option<Arc<RecordBatch>>,
}
pub struct CommitStream {
rx: broadcast::Receiver<Arc<CommitNotification>>,
label_filter: Option<HashSet<String>>,
edge_type_filter: Option<HashSet<String>>,
exclude_session: Option<String>,
debounce: Option<Duration>,
last_emitted: Option<Instant>,
}
impl CommitStream {
pub async fn next(&mut self) -> Option<CommitNotification> {
loop {
match self.rx.recv().await {
Ok(notif) => {
if self
.exclude_session
.as_ref()
.is_some_and(|excluded| notif.session_id == *excluded)
{
continue;
}
if self.label_filter.as_ref().is_some_and(|labels| {
!notif.labels_affected.iter().any(|l| labels.contains(l))
}) {
continue;
}
if self.edge_type_filter.as_ref().is_some_and(|types| {
!notif.edge_types_affected.iter().any(|t| types.contains(t))
}) {
continue;
}
if let Some(debounce) = self.debounce {
if self
.last_emitted
.is_some_and(|last| last.elapsed() < debounce)
{
continue;
}
self.last_emitted = Some(Instant::now());
}
return Some((*notif).clone());
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("CommitStream lagged by {} notifications", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
}
}
}
}
pub struct WatchBuilder {
rx: broadcast::Receiver<Arc<CommitNotification>>,
label_filter: Option<HashSet<String>>,
edge_type_filter: Option<HashSet<String>>,
exclude_session: Option<String>,
debounce: Option<Duration>,
}
impl WatchBuilder {
pub fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
Self {
rx,
label_filter: None,
edge_type_filter: None,
exclude_session: None,
debounce: None,
}
}
pub fn labels(mut self, labels: &[&str]) -> Self {
self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
self
}
pub fn edge_types(mut self, types: &[&str]) -> Self {
self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
self
}
pub fn debounce(mut self, interval: Duration) -> Self {
self.debounce = Some(interval);
self
}
pub fn exclude_session(mut self, session_id: &str) -> Self {
self.exclude_session = Some(session_id.to_string());
self
}
pub fn build(self) -> CommitStream {
CommitStream {
rx: self.rx,
label_filter: self.label_filter,
edge_type_filter: self.edge_type_filter,
exclude_session: self.exclude_session,
debounce: self.debounce,
last_emitted: None,
}
}
}