d_engine_core/watch/manager.rs
1//! Watch mechanism for monitoring key changes
2//!
3//! Architecture: Shared State + Background Dispatcher
4//!
5//! ```text
6//! StateMachine:
7//! apply_chunk() -> broadcast_watch_events() -> broadcast::send(WatchEvent) [fire-and-forget]
8//! ↓
9//! WatchDispatcher (spawned in Builder):
10//! broadcast::subscribe() -> match key in DashMap -> mpsc::send(per-watcher)
11//! ↓
12//! Watchers:
13//! Embedded: mpsc::Receiver<WatchEvent>
14//! Standalone: mpsc::Receiver -> gRPC stream (protobuf conversion)
15//! ```
16//!
17//! # Design Principles
18//!
19//! - **No hidden resource allocation**: All tokio::spawn calls are explicit in Builder
20//! - **Minimal abstraction**: Only essential data structures, no unnecessary wrappers
21//! - **Composable**: Registry and Dispatcher are independent, composed in Builder
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26
27use bytes::Bytes;
28// Re-export protobuf types for watch events
29pub use d_engine_proto::client::{WatchEventType, WatchResponse as WatchEvent};
30use dashmap::DashMap;
31use tokio::sync::broadcast;
32use tokio::sync::mpsc;
33use tracing::debug;
34use tracing::trace;
35use tracing::warn;
36
37/// Handle for a registered watcher
38///
39/// When dropped, the watcher is automatically unregistered (if unregister_tx is Some).
40pub struct WatcherHandle {
41 /// Unique identifier
42 id: u64,
43 /// Key being watched
44 key: Bytes,
45 /// Channel receiver for watch events
46 receiver: mpsc::Receiver<WatchEvent>,
47 /// Unregister channel (None if cleanup disabled via into_receiver)
48 unregister_tx: Option<mpsc::UnboundedSender<(u64, Bytes)>>,
49}
50
51impl WatcherHandle {
52 /// Get the unique identifier for this watcher
53 pub fn id(&self) -> u64 {
54 self.id
55 }
56
57 /// Get the key being watched
58 pub fn key(&self) -> &Bytes {
59 &self.key
60 }
61
62 /// Get a mutable reference to the receiver
63 pub fn receiver_mut(&mut self) -> &mut mpsc::Receiver<WatchEvent> {
64 &mut self.receiver
65 }
66
67 /// Consume the handle and return the event receiver
68 ///
69 /// Disables automatic unregistration. The watcher will remain active until
70 /// the receiver is dropped (causing send failures that trigger cleanup).
71 ///
72 /// Use this for long-lived streams (e.g., gRPC) where the receiver lifetime
73 /// extends beyond the handle's scope.
74 pub fn into_receiver(mut self) -> (u64, Bytes, mpsc::Receiver<WatchEvent>) {
75 let id = self.id;
76 let key = self.key.clone();
77
78 // Clear unregister_tx to disable Drop cleanup
79 self.unregister_tx = None;
80
81 // Create dummy receiver to satisfy Rust's move checker
82 let (dummy_tx, dummy_rx) = mpsc::channel(1);
83 drop(dummy_tx); // Close immediately
84 let receiver = std::mem::replace(&mut self.receiver, dummy_rx);
85
86 (id, key, receiver)
87 }
88}
89
90impl Drop for WatcherHandle {
91 fn drop(&mut self) {
92 if let Some(ref tx) = self.unregister_tx {
93 // Send unregister request (ignore errors if dispatcher stopped)
94 let _ = tx.send((self.id, self.key.clone()));
95 trace!(watcher_id = self.id, key = ?self.key, "Watcher unregistered");
96 }
97 }
98}
99
100/// Internal watcher state
101#[derive(Debug)]
102struct Watcher {
103 /// Unique identifier
104 id: u64,
105 /// Channel sender for events
106 sender: mpsc::Sender<WatchEvent>,
107}
108
109/// Watch registry - manages watcher registration (Arc-shareable)
110///
111/// This is the shared state that both Builder (for registration) and
112/// WatchDispatcher (for event dispatch) can access concurrently.
113pub struct WatchRegistry {
114 /// Watchers grouped by key (lock-free concurrent HashMap)
115 watchers: DashMap<Bytes, Vec<Watcher>>,
116
117 /// Next watcher ID (monotonically increasing)
118 next_id: AtomicU64,
119
120 /// Per-watcher channel buffer size
121 watcher_buffer_size: usize,
122
123 /// Unregister channel sender (cloned for each WatcherHandle)
124 unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
125}
126
127impl WatchRegistry {
128 /// Create a new watch registry
129 ///
130 /// # Arguments
131 /// * `watcher_buffer_size` - Buffer size for per-watcher mpsc channels
132 /// * `unregister_tx` - Channel for receiving unregister requests
133 pub fn new(
134 watcher_buffer_size: usize,
135 unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
136 ) -> Self {
137 Self {
138 watchers: DashMap::new(),
139 next_id: AtomicU64::new(1),
140 watcher_buffer_size,
141 unregister_tx,
142 }
143 }
144
145 /// Register a new watcher for a specific key
146 ///
147 /// Returns a handle that receives watch events via an mpsc channel.
148 /// The watcher is automatically unregistered when the handle is dropped.
149 pub fn register(
150 &self,
151 key: Bytes,
152 ) -> WatcherHandle {
153 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
154 let (sender, receiver) = mpsc::channel(self.watcher_buffer_size);
155
156 let watcher = Watcher { id, sender };
157
158 // Insert into DashMap (lock-free)
159 self.watchers.entry(key.clone()).or_default().push(watcher);
160
161 trace!(watcher_id = id, key = ?key, "Watcher registered");
162
163 WatcherHandle {
164 id,
165 key,
166 receiver,
167 unregister_tx: Some(self.unregister_tx.clone()),
168 }
169 }
170
171 /// Unregister a watcher
172 fn unregister(
173 &self,
174 id: u64,
175 key: &Bytes,
176 ) {
177 self.watchers.remove_if_mut(key, |_key, watchers| {
178 watchers.retain(|w| w.id != id);
179 watchers.is_empty()
180 });
181 }
182
183 /// Get the number of active watchers for a specific key (for testing)
184 #[cfg(test)]
185 pub(crate) fn watcher_count(
186 &self,
187 key: &Bytes,
188 ) -> usize {
189 self.watchers.get(key).map(|w| w.len()).unwrap_or(0)
190 }
191
192 /// Get the total number of watched keys (for testing)
193 #[cfg(test)]
194 pub(crate) fn watched_key_count(&self) -> usize {
195 self.watchers.len()
196 }
197}
198
199/// Watch dispatcher - distributes events to watchers (background task)
200///
201/// This is spawned explicitly in NodeBuilder::build() to make resource
202/// allocation visible. It continuously:
203/// 1. Receives events from broadcast channel
204/// 2. Matches keys in the registry
205/// 3. Dispatches to matching watchers
206pub struct WatchDispatcher {
207 /// Shared registry (same instance held by Node)
208 registry: Arc<WatchRegistry>,
209
210 /// Broadcast receiver for global events
211 broadcast_rx: broadcast::Receiver<WatchEvent>,
212
213 /// Unregister channel receiver
214 unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
215}
216
217impl WatchDispatcher {
218 /// Create a new watch dispatcher
219 ///
220 /// # Arguments
221 /// * `registry` - Shared registry for looking up watchers
222 /// * `broadcast_rx` - Receiver for watch events from StateMachine
223 /// * `unregister_rx` - Receiver for unregister requests from WatcherHandles
224 pub fn new(
225 registry: Arc<WatchRegistry>,
226 broadcast_rx: broadcast::Receiver<WatchEvent>,
227 unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
228 ) -> Self {
229 Self {
230 registry,
231 broadcast_rx,
232 unregister_rx,
233 }
234 }
235
236 /// Run the dispatcher event loop
237 ///
238 /// This should be spawned as a tokio task in NodeBuilder::build().
239 /// It will run until the broadcast channel is closed.
240 pub async fn run(mut self) {
241 debug!("WatchDispatcher started");
242
243 loop {
244 tokio::select! {
245 biased;
246
247 // Handle unregister requests first (cleanup priority)
248 Some((id, key)) = self.unregister_rx.recv() => {
249 self.registry.unregister(id, &key);
250 }
251
252 // Receive broadcast event
253 result = self.broadcast_rx.recv() => {
254 match result {
255 Ok(event) => {
256 self.dispatch_event(event).await;
257 }
258 Err(broadcast::error::RecvError::Lagged(n)) => {
259 warn!("WatchDispatcher lagged {} events (slow watchers)", n);
260 }
261 Err(broadcast::error::RecvError::Closed) => {
262 debug!("Broadcast channel closed, WatchDispatcher stopping");
263 break;
264 }
265 }
266 }
267 }
268 }
269
270 debug!("WatchDispatcher stopped");
271 }
272
273 /// Dispatch an event to all watchers of a specific key
274 async fn dispatch_event(
275 &self,
276 event: WatchEvent,
277 ) {
278 if let Some(watchers) = self.registry.watchers.get(&event.key) {
279 let mut dead_watchers = Vec::new();
280
281 for watcher in watchers.iter() {
282 // Non-blocking send
283 if watcher.sender.try_send(event.clone()).is_err() {
284 // Receiver dropped or full, mark for cleanup
285 dead_watchers.push(watcher.id);
286 }
287 }
288
289 // Cleanup dead watchers
290 drop(watchers);
291 if !dead_watchers.is_empty() {
292 for id in dead_watchers {
293 self.registry.unregister(id, &event.key);
294 }
295 }
296
297 trace!(key = ?event.key, event_type = ?event.event_type, "Event dispatched");
298 }
299 }
300}