Skip to main content

uni_plugin_host/
notifications.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Commit notifications — reactive awareness of database changes.
5//!
6//! Sessions can watch for commits via `session.watch()` or `session.watch_with()`
7//! to receive filtered `CommitNotification` events.
8
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use arrow_array::RecordBatch;
14use tokio::sync::broadcast;
15
16/// Describes a committed transaction's effects.
17#[derive(Debug, Clone)]
18pub struct CommitNotification {
19    /// Database version after commit.
20    pub version: u64,
21    /// Number of mutations in the committed transaction.
22    pub mutation_count: usize,
23    /// Vertex labels that were affected by the commit.
24    pub labels_affected: Vec<String>,
25    /// Edge types that were affected by the commit.
26    pub edge_types_affected: Vec<String>,
27    /// Number of Locy rules promoted from the transaction.
28    pub rules_promoted: usize,
29    /// Timestamp of the commit.
30    pub timestamp: chrono::DateTime<chrono::Utc>,
31    /// Transaction ID.
32    pub tx_id: String,
33    /// Session ID that committed the transaction.
34    pub session_id: String,
35    /// Database version when the transaction started (for causal ordering).
36    pub causal_version: u64,
37    /// Per-row mutation events for this commit, in the canonical
38    /// `event_row_schema` shape (`event_kind`,
39    /// `vid_or_eid`, `label`, `property`, `old_value`, `new_value`,
40    /// `properties_new`, `properties_old`).
41    ///
42    /// `Some` only when at least one [`CdcOutputProvider`] is
43    /// registered at commit time — the empty-registry hot path
44    /// broadcasts `None` so the trigger / watch surface pays no
45    /// extraction cost when CDC is unused. The `CdcRuntime` consumes
46    /// this field directly; user-facing `session.watch()` consumers
47    /// ignore it.
48    ///
49    /// [`CdcOutputProvider`]: uni_plugin::traits::cdc::CdcOutputProvider
50    pub mutations: Option<Arc<RecordBatch>>,
51}
52
53/// An async stream of commit notifications with optional filtering.
54pub struct CommitStream {
55    rx: broadcast::Receiver<Arc<CommitNotification>>,
56    label_filter: Option<HashSet<String>>,
57    edge_type_filter: Option<HashSet<String>>,
58    exclude_session: Option<String>,
59    debounce: Option<Duration>,
60    last_emitted: Option<Instant>,
61}
62
63impl CommitStream {
64    /// Wait for the next matching commit notification.
65    ///
66    /// Returns `None` if the broadcast channel is closed (database dropped).
67    /// Skips notifications that don't match filters or are within the debounce window.
68    pub async fn next(&mut self) -> Option<CommitNotification> {
69        loop {
70            match self.rx.recv().await {
71                Ok(notif) => {
72                    // Apply exclude_session filter
73                    if self
74                        .exclude_session
75                        .as_ref()
76                        .is_some_and(|excluded| notif.session_id == *excluded)
77                    {
78                        continue;
79                    }
80
81                    // Apply label filter
82                    if self.label_filter.as_ref().is_some_and(|labels| {
83                        !notif.labels_affected.iter().any(|l| labels.contains(l))
84                    }) {
85                        continue;
86                    }
87
88                    // Apply edge type filter
89                    if self.edge_type_filter.as_ref().is_some_and(|types| {
90                        !notif.edge_types_affected.iter().any(|t| types.contains(t))
91                    }) {
92                        continue;
93                    }
94
95                    // Apply debounce
96                    if let Some(debounce) = self.debounce {
97                        if self
98                            .last_emitted
99                            .is_some_and(|last| last.elapsed() < debounce)
100                        {
101                            continue;
102                        }
103                        self.last_emitted = Some(Instant::now());
104                    }
105
106                    return Some((*notif).clone());
107                }
108                Err(broadcast::error::RecvError::Lagged(n)) => {
109                    tracing::warn!("CommitStream lagged by {} notifications", n);
110                    // Continue receiving — we just lost some older notifications
111                    continue;
112                }
113                Err(broadcast::error::RecvError::Closed) => {
114                    return None;
115                }
116            }
117        }
118    }
119}
120
121/// Builder for creating a filtered [`CommitStream`].
122pub struct WatchBuilder {
123    rx: broadcast::Receiver<Arc<CommitNotification>>,
124    label_filter: Option<HashSet<String>>,
125    edge_type_filter: Option<HashSet<String>>,
126    exclude_session: Option<String>,
127    debounce: Option<Duration>,
128}
129
130impl WatchBuilder {
131    pub fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
132        Self {
133            rx,
134            label_filter: None,
135            edge_type_filter: None,
136            exclude_session: None,
137            debounce: None,
138        }
139    }
140
141    /// Only receive notifications that affect the given labels.
142    pub fn labels(mut self, labels: &[&str]) -> Self {
143        self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
144        self
145    }
146
147    /// Only receive notifications that affect the given edge types.
148    pub fn edge_types(mut self, types: &[&str]) -> Self {
149        self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
150        self
151    }
152
153    /// Collapse notifications within the given interval.
154    pub fn debounce(mut self, interval: Duration) -> Self {
155        self.debounce = Some(interval);
156        self
157    }
158
159    /// Exclude notifications from the given session ID.
160    pub fn exclude_session(mut self, session_id: &str) -> Self {
161        self.exclude_session = Some(session_id.to_string());
162        self
163    }
164
165    /// Build the commit stream with the configured filters.
166    pub fn build(self) -> CommitStream {
167        CommitStream {
168            rx: self.rx,
169            label_filter: self.label_filter,
170            edge_type_filter: self.edge_type_filter,
171            exclude_session: self.exclude_session,
172            debounce: self.debounce,
173            last_emitted: None,
174        }
175    }
176}