uni_db/api/
notifications.rs1use std::collections::HashSet;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone)]
17pub struct CommitNotification {
18 pub version: u64,
20 pub mutation_count: usize,
22 pub labels_affected: Vec<String>,
24 pub edge_types_affected: Vec<String>,
26 pub rules_promoted: usize,
28 pub timestamp: chrono::DateTime<chrono::Utc>,
30 pub tx_id: String,
32 pub session_id: String,
34 pub causal_version: u64,
36}
37
38pub struct CommitStream {
40 rx: broadcast::Receiver<Arc<CommitNotification>>,
41 label_filter: Option<HashSet<String>>,
42 edge_type_filter: Option<HashSet<String>>,
43 exclude_session: Option<String>,
44 debounce: Option<Duration>,
45 last_emitted: Option<Instant>,
46}
47
48impl CommitStream {
49 pub async fn next(&mut self) -> Option<CommitNotification> {
54 loop {
55 match self.rx.recv().await {
56 Ok(notif) => {
57 if self
59 .exclude_session
60 .as_ref()
61 .is_some_and(|excluded| notif.session_id == *excluded)
62 {
63 continue;
64 }
65
66 if self.label_filter.as_ref().is_some_and(|labels| {
68 !notif.labels_affected.iter().any(|l| labels.contains(l))
69 }) {
70 continue;
71 }
72
73 if self.edge_type_filter.as_ref().is_some_and(|types| {
75 !notif.edge_types_affected.iter().any(|t| types.contains(t))
76 }) {
77 continue;
78 }
79
80 if let Some(debounce) = self.debounce {
82 if self
83 .last_emitted
84 .is_some_and(|last| last.elapsed() < debounce)
85 {
86 continue;
87 }
88 self.last_emitted = Some(Instant::now());
89 }
90
91 return Some((*notif).clone());
92 }
93 Err(broadcast::error::RecvError::Lagged(n)) => {
94 tracing::warn!("CommitStream lagged by {} notifications", n);
95 continue;
97 }
98 Err(broadcast::error::RecvError::Closed) => {
99 return None;
100 }
101 }
102 }
103 }
104}
105
106pub struct WatchBuilder {
108 rx: broadcast::Receiver<Arc<CommitNotification>>,
109 label_filter: Option<HashSet<String>>,
110 edge_type_filter: Option<HashSet<String>>,
111 exclude_session: Option<String>,
112 debounce: Option<Duration>,
113}
114
115impl WatchBuilder {
116 pub(crate) fn new(rx: broadcast::Receiver<Arc<CommitNotification>>) -> Self {
117 Self {
118 rx,
119 label_filter: None,
120 edge_type_filter: None,
121 exclude_session: None,
122 debounce: None,
123 }
124 }
125
126 pub fn labels(mut self, labels: &[&str]) -> Self {
128 self.label_filter = Some(labels.iter().map(|s| s.to_string()).collect());
129 self
130 }
131
132 pub fn edge_types(mut self, types: &[&str]) -> Self {
134 self.edge_type_filter = Some(types.iter().map(|s| s.to_string()).collect());
135 self
136 }
137
138 pub fn debounce(mut self, interval: Duration) -> Self {
140 self.debounce = Some(interval);
141 self
142 }
143
144 pub fn exclude_session(mut self, session_id: &str) -> Self {
146 self.exclude_session = Some(session_id.to_string());
147 self
148 }
149
150 pub fn build(self) -> CommitStream {
152 CommitStream {
153 rx: self.rx,
154 label_filter: self.label_filter,
155 edge_type_filter: self.edge_type_filter,
156 exclude_session: self.exclude_session,
157 debounce: self.debounce,
158 last_emitted: None,
159 }
160 }
161}