Skip to main content

uni_db/api/
notifications.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Commit notifications — reactive awareness of database changes.
5//!
6//! Sessions can watch for commits via `session.watch()` or `session.watch_with()`
7//! to receive filtered `CommitNotification` events.
8
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use tokio::sync::broadcast;
14
15/// Describes a committed transaction's effects.
16#[derive(Debug, Clone)]
17pub struct CommitNotification {
18    /// Database version after commit.
19    pub version: u64,
20    /// Number of mutations in the committed transaction.
21    pub mutation_count: usize,
22    /// Vertex labels that were affected by the commit.
23    pub labels_affected: Vec<String>,
24    /// Edge types that were affected by the commit.
25    pub edge_types_affected: Vec<String>,
26    /// Number of Locy rules promoted from the transaction.
27    pub rules_promoted: usize,
28    /// Timestamp of the commit.
29    pub timestamp: chrono::DateTime<chrono::Utc>,
30    /// Transaction ID.
31    pub tx_id: String,
32    /// Session ID that committed the transaction.
33    pub session_id: String,
34    /// Database version when the transaction started (for causal ordering).
35    pub causal_version: u64,
36}
37
38/// An async stream of commit notifications with optional filtering.
39pub struct CommitStream {
40    rx: broadcast::Receiver<Arc<CommitNotification>>,
41    label_filter: Option<HashSet<String>>,
42    edge_type_filter: Option<HashSet<String>>,
43    exclude_session: Option<String>,
44    debounce: Option<Duration>,
45    last_emitted: Option<Instant>,
46}
47
48impl CommitStream {
49    /// Wait for the next matching commit notification.
50    ///
51    /// Returns `None` if the broadcast channel is closed (database dropped).
52    /// Skips notifications that don't match filters or are within the debounce window.
53    pub async fn next(&mut self) -> Option<CommitNotification> {
54        loop {
55            match self.rx.recv().await {
56                Ok(notif) => {
57                    // Apply exclude_session filter
58                    if self
59                        .exclude_session
60                        .as_ref()
61                        .is_some_and(|excluded| notif.session_id == *excluded)
62                    {
63                        continue;
64                    }
65
66                    // Apply label filter
67                    if self.label_filter.as_ref().is_some_and(|labels| {
68                        !notif.labels_affected.iter().any(|l| labels.contains(l))
69                    }) {
70                        continue;
71                    }
72
73                    // Apply edge type filter
74                    if self.edge_type_filter.as_ref().is_some_and(|types| {
75                        !notif.edge_types_affected.iter().any(|t| types.contains(t))
76                    }) {
77                        continue;
78                    }
79
80                    // Apply debounce
81                    if let Some(debounce) = self.debounce {
82                        if self
83                            .last_emitted
84                            .is_some_and(|last| last.elapsed() < debounce)
85                        {
86                            continue;
87                        }
88                        self.last_emitted = Some(Instant::now());
89                    }
90
91                    return Some((*notif).clone());
92                }
93                Err(broadcast::error::RecvError::Lagged(n)) => {
94                    tracing::warn!("CommitStream lagged by {} notifications", n);
95                    // Continue receiving — we just lost some older notifications
96                    continue;
97                }
98                Err(broadcast::error::RecvError::Closed) => {
99                    return None;
100                }
101            }
102        }
103    }
104}
105
106/// Builder for creating a filtered [`CommitStream`].
107pub struct WatchBuilder {
108    rx: broadcast::Receiver<Arc<CommitNotification>>,
109    label_filter: Option<HashSet<String>>,
110    edge_type_filter: Option<HashSet<String>>,
111    exclude_session: Option<String>,
112    debounce: Option<Duration>,
113}
114
115impl WatchBuilder {
116    pub(crate) fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
117        Self {
118            rx,
119            label_filter: None,
120            edge_type_filter: None,
121            exclude_session: None,
122            debounce: None,
123        }
124    }
125
126    /// Only receive notifications that affect the given labels.
127    pub fn labels(mut self, labels: &[&str]) -> Self {
128        self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
129        self
130    }
131
132    /// Only receive notifications that affect the given edge types.
133    pub fn edge_types(mut self, types: &[&str]) -> Self {
134        self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
135        self
136    }
137
138    /// Collapse notifications within the given interval.
139    pub fn debounce(mut self, interval: Duration) -> Self {
140        self.debounce = Some(interval);
141        self
142    }
143
144    /// Exclude notifications from the given session ID.
145    pub fn exclude_session(mut self, session_id: &str) -> Self {
146        self.exclude_session = Some(session_id.to_string());
147        self
148    }
149
150    /// Build the commit stream with the configured filters.
151    pub fn build(self) -> CommitStream {
152        CommitStream {
153            rx: self.rx,
154            label_filter: self.label_filter,
155            edge_type_filter: self.edge_type_filter,
156            exclude_session: self.exclude_session,
157            debounce: self.debounce,
158            last_emitted: None,
159        }
160    }
161}