uni_plugin_host/notifications.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Commit notifications — reactive awareness of database changes.
5//!
6//! Sessions can watch for commits via `session.watch()` or `session.watch_with()`
7//! to receive filtered `CommitNotification` events.
8
9use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use arrow_array::RecordBatch;
14use tokio::sync::broadcast;
15
16/// Describes a committed transaction's effects.
17#[derive(Debug, Clone)]
18pub struct CommitNotification {
19 /// Database version after commit.
20 pub version: u64,
21 /// Number of mutations in the committed transaction.
22 pub mutation_count: usize,
23 /// Vertex labels that were affected by the commit.
24 pub labels_affected: Vec<String>,
25 /// Edge types that were affected by the commit.
26 pub edge_types_affected: Vec<String>,
27 /// Number of Locy rules promoted from the transaction.
28 pub rules_promoted: usize,
29 /// Timestamp of the commit.
30 pub timestamp: chrono::DateTime<chrono::Utc>,
31 /// Transaction ID.
32 pub tx_id: String,
33 /// Session ID that committed the transaction.
34 pub session_id: String,
35 /// Database version when the transaction started (for causal ordering).
36 pub causal_version: u64,
37 /// Per-row mutation events for this commit, in the canonical
38 /// `event_row_schema` shape (`event_kind`,
39 /// `vid_or_eid`, `label`, `property`, `old_value`, `new_value`,
40 /// `properties_new`, `properties_old`).
41 ///
42 /// `Some` only when at least one [`CdcOutputProvider`] is
43 /// registered at commit time — the empty-registry hot path
44 /// broadcasts `None` so the trigger / watch surface pays no
45 /// extraction cost when CDC is unused. The `CdcRuntime` consumes
46 /// this field directly; user-facing `session.watch()` consumers
47 /// ignore it.
48 ///
49 /// [`CdcOutputProvider`]: uni_plugin::traits::cdc::CdcOutputProvider
50 pub mutations: Option<Arc<RecordBatch>>,
51}
52
53/// An async stream of commit notifications with optional filtering.
54pub struct CommitStream {
55 rx: broadcast::Receiver<Arc<CommitNotification>>,
56 label_filter: Option<HashSet<String>>,
57 edge_type_filter: Option<HashSet<String>>,
58 exclude_session: Option<String>,
59 debounce: Option<Duration>,
60 last_emitted: Option<Instant>,
61}
62
63impl CommitStream {
64 /// Wait for the next matching commit notification.
65 ///
66 /// Returns `None` if the broadcast channel is closed (database dropped).
67 /// Skips notifications that don't match filters or are within the debounce window.
68 pub async fn next(&mut self) -> Option<CommitNotification> {
69 loop {
70 match self.rx.recv().await {
71 Ok(notif) => {
72 // Apply exclude_session filter
73 if self
74 .exclude_session
75 .as_ref()
76 .is_some_and(|excluded| notif.session_id == *excluded)
77 {
78 continue;
79 }
80
81 // Apply label filter
82 if self.label_filter.as_ref().is_some_and(|labels| {
83 !notif.labels_affected.iter().any(|l| labels.contains(l))
84 }) {
85 continue;
86 }
87
88 // Apply edge type filter
89 if self.edge_type_filter.as_ref().is_some_and(|types| {
90 !notif.edge_types_affected.iter().any(|t| types.contains(t))
91 }) {
92 continue;
93 }
94
95 // Apply debounce
96 if let Some(debounce) = self.debounce {
97 if self
98 .last_emitted
99 .is_some_and(|last| last.elapsed() < debounce)
100 {
101 continue;
102 }
103 self.last_emitted = Some(Instant::now());
104 }
105
106 return Some((*notif).clone());
107 }
108 Err(broadcast::error::RecvError::Lagged(n)) => {
109 tracing::warn!("CommitStream lagged by {} notifications", n);
110 // Continue receiving — we just lost some older notifications
111 continue;
112 }
113 Err(broadcast::error::RecvError::Closed) => {
114 return None;
115 }
116 }
117 }
118 }
119}
120
121/// Builder for creating a filtered [`CommitStream`].
122pub struct WatchBuilder {
123 rx: broadcast::Receiver<Arc<CommitNotification>>,
124 label_filter: Option<HashSet<String>>,
125 edge_type_filter: Option<HashSet<String>>,
126 exclude_session: Option<String>,
127 debounce: Option<Duration>,
128}
129
130impl WatchBuilder {
131 pub fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
132 Self {
133 rx,
134 label_filter: None,
135 edge_type_filter: None,
136 exclude_session: None,
137 debounce: None,
138 }
139 }
140
141 /// Only receive notifications that affect the given labels.
142 pub fn labels(mut self, labels: &[&str]) -> Self {
143 self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
144 self
145 }
146
147 /// Only receive notifications that affect the given edge types.
148 pub fn edge_types(mut self, types: &[&str]) -> Self {
149 self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
150 self
151 }
152
153 /// Collapse notifications within the given interval.
154 pub fn debounce(mut self, interval: Duration) -> Self {
155 self.debounce = Some(interval);
156 self
157 }
158
159 /// Exclude notifications from the given session ID.
160 pub fn exclude_session(mut self, session_id: &str) -> Self {
161 self.exclude_session = Some(session_id.to_string());
162 self
163 }
164
165 /// Build the commit stream with the configured filters.
166 pub fn build(self) -> CommitStream {
167 CommitStream {
168 rx: self.rx,
169 label_filter: self.label_filter,
170 edge_type_filter: self.edge_type_filter,
171 exclude_session: self.exclude_session,
172 debounce: self.debounce,
173 last_emitted: None,
174 }
175 }
176}