1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team
//! Commit notifications — reactive awareness of database changes.
//!
//! Sessions can watch for commits via `session.watch()` or `session.watch_with()`
//! to receive filtered `CommitNotification` events.
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
/// Describes a committed transaction's effects.
#[derive(Debug, Clone)]
pub struct CommitNotification {
/// Database version after commit.
pub version: u64,
/// Number of mutations in the committed transaction.
pub mutation_count: usize,
/// Vertex labels that were affected by the commit.
pub labels_affected: Vec<String>,
/// Edge types that were affected by the commit.
pub edge_types_affected: Vec<String>,
/// Number of Locy rules promoted from the transaction.
pub rules_promoted: usize,
/// Timestamp of the commit.
pub timestamp: chrono::DateTime<chrono::Utc>,
/// Transaction ID.
pub tx_id: String,
/// Session ID that committed the transaction.
pub session_id: String,
/// Database version when the transaction started (for causal ordering).
pub causal_version: u64,
}
/// An async stream of commit notifications with optional filtering.
pub struct CommitStream {
rx: broadcast::Receiver<Arc<CommitNotification>>,
label_filter: Option<HashSet<String>>,
edge_type_filter: Option<HashSet<String>>,
exclude_session: Option<String>,
debounce: Option<Duration>,
last_emitted: Option<Instant>,
}
impl CommitStream {
/// Wait for the next matching commit notification.
///
/// Returns `None` if the broadcast channel is closed (database dropped).
/// Skips notifications that don't match filters or are within the debounce window.
pub async fn next(&mut self) -> Option<CommitNotification> {
loop {
match self.rx.recv().await {
Ok(notif) => {
// Apply exclude_session filter
if self
.exclude_session
.as_ref()
.is_some_and(|excluded| notif.session_id == *excluded)
{
continue;
}
// Apply label filter
if self.label_filter.as_ref().is_some_and(|labels| {
!notif.labels_affected.iter().any(|l| labels.contains(l))
}) {
continue;
}
// Apply edge type filter
if self.edge_type_filter.as_ref().is_some_and(|types| {
!notif.edge_types_affected.iter().any(|t| types.contains(t))
}) {
continue;
}
// Apply debounce
if let Some(debounce) = self.debounce {
if self
.last_emitted
.is_some_and(|last| last.elapsed() < debounce)
{
continue;
}
self.last_emitted = Some(Instant::now());
}
return Some((*notif).clone());
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("CommitStream lagged by {} notifications", n);
// Continue receiving — we just lost some older notifications
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
}
}
}
}
/// Builder for creating a filtered [`CommitStream`].
pub struct WatchBuilder {
rx: broadcast::Receiver<Arc<CommitNotification>>,
label_filter: Option<HashSet<String>>,
edge_type_filter: Option<HashSet<String>>,
exclude_session: Option<String>,
debounce: Option<Duration>,
}
impl WatchBuilder {
pub(crate) fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
Self {
rx,
label_filter: None,
edge_type_filter: None,
exclude_session: None,
debounce: None,
}
}
/// Only receive notifications that affect the given labels.
pub fn labels(mut self, labels: &[&str]) -> Self {
self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
self
}
/// Only receive notifications that affect the given edge types.
pub fn edge_types(mut self, types: &[&str]) -> Self {
self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
self
}
/// Collapse notifications within the given interval.
pub fn debounce(mut self, interval: Duration) -> Self {
self.debounce = Some(interval);
self
}
/// Exclude notifications from the given session ID.
pub fn exclude_session(mut self, session_id: &str) -> Self {
self.exclude_session = Some(session_id.to_string());
self
}
/// Build the commit stream with the configured filters.
pub fn build(self) -> CommitStream {
CommitStream {
rx: self.rx,
label_filter: self.label_filter,
edge_type_filter: self.edge_type_filter,
exclude_session: self.exclude_session,
debounce: self.debounce,
last_emitted: None,
}
}
}