d_engine_core/watch/
manager.rs

1//! Watch mechanism for monitoring key changes
2//!
3//! Architecture: Shared State + Background Dispatcher
4//!
5//! ```text
6//! StateMachine:
7//!   apply_chunk() -> broadcast_watch_events() -> broadcast::send(WatchEvent) [fire-and-forget]
8//!                                                        ↓
9//! WatchDispatcher (spawned in Builder):
10//!   broadcast::subscribe() -> match key in DashMap -> mpsc::send(per-watcher)
11//!                                                           ↓
12//! Watchers:
13//!   Embedded: mpsc::Receiver<WatchEvent>
14//!   Standalone: mpsc::Receiver -> gRPC stream (protobuf conversion)
15//! ```
16//!
17//! # Design Principles
18//!
19//! - **No hidden resource allocation**: All tokio::spawn calls are explicit in Builder
20//! - **Minimal abstraction**: Only essential data structures, no unnecessary wrappers
21//! - **Composable**: Registry and Dispatcher are independent, composed in Builder
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26
27use bytes::Bytes;
28// Re-export protobuf types for watch events
29pub use d_engine_proto::client::{WatchEventType, WatchResponse as WatchEvent};
30use dashmap::DashMap;
31use tokio::sync::broadcast;
32use tokio::sync::mpsc;
33use tracing::debug;
34use tracing::trace;
35use tracing::warn;
36
37/// Handle for a registered watcher
38///
39/// When dropped, the watcher is automatically unregistered (if unregister_tx is Some).
40pub struct WatcherHandle {
41    /// Unique identifier
42    id: u64,
43    /// Key being watched
44    key: Bytes,
45    /// Channel receiver for watch events
46    receiver: mpsc::Receiver<WatchEvent>,
47    /// Unregister channel (None if cleanup disabled via into_receiver)
48    unregister_tx: Option<mpsc::UnboundedSender<(u64, Bytes)>>,
49}
50
51impl WatcherHandle {
52    /// Get the unique identifier for this watcher
53    pub fn id(&self) -> u64 {
54        self.id
55    }
56
57    /// Get the key being watched
58    pub fn key(&self) -> &Bytes {
59        &self.key
60    }
61
62    /// Get a mutable reference to the receiver
63    pub fn receiver_mut(&mut self) -> &mut mpsc::Receiver<WatchEvent> {
64        &mut self.receiver
65    }
66
67    /// Consume the handle and return the event receiver
68    ///
69    /// Disables automatic unregistration. The watcher will remain active until
70    /// the receiver is dropped (causing send failures that trigger cleanup).
71    ///
72    /// Use this for long-lived streams (e.g., gRPC) where the receiver lifetime
73    /// extends beyond the handle's scope.
74    pub fn into_receiver(mut self) -> (u64, Bytes, mpsc::Receiver<WatchEvent>) {
75        let id = self.id;
76        let key = self.key.clone();
77
78        // Clear unregister_tx to disable Drop cleanup
79        self.unregister_tx = None;
80
81        // Create dummy receiver to satisfy Rust's move checker
82        let (dummy_tx, dummy_rx) = mpsc::channel(1);
83        drop(dummy_tx); // Close immediately
84        let receiver = std::mem::replace(&mut self.receiver, dummy_rx);
85
86        (id, key, receiver)
87    }
88}
89
90impl Drop for WatcherHandle {
91    fn drop(&mut self) {
92        if let Some(ref tx) = self.unregister_tx {
93            // Send unregister request (ignore errors if dispatcher stopped)
94            let _ = tx.send((self.id, self.key.clone()));
95            trace!(watcher_id = self.id, key = ?self.key, "Watcher unregistered");
96        }
97    }
98}
99
100/// Internal watcher state
101#[derive(Debug)]
102struct Watcher {
103    /// Unique identifier
104    id: u64,
105    /// Channel sender for events
106    sender: mpsc::Sender<WatchEvent>,
107}
108
109/// Watch registry - manages watcher registration (Arc-shareable)
110///
111/// This is the shared state that both Builder (for registration) and
112/// WatchDispatcher (for event dispatch) can access concurrently.
113pub struct WatchRegistry {
114    /// Watchers grouped by key (lock-free concurrent HashMap)
115    watchers: DashMap<Bytes, Vec<Watcher>>,
116
117    /// Next watcher ID (monotonically increasing)
118    next_id: AtomicU64,
119
120    /// Per-watcher channel buffer size
121    watcher_buffer_size: usize,
122
123    /// Unregister channel sender (cloned for each WatcherHandle)
124    unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
125}
126
127impl WatchRegistry {
128    /// Create a new watch registry
129    ///
130    /// # Arguments
131    /// * `watcher_buffer_size` - Buffer size for per-watcher mpsc channels
132    /// * `unregister_tx` - Channel for receiving unregister requests
133    pub fn new(
134        watcher_buffer_size: usize,
135        unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
136    ) -> Self {
137        Self {
138            watchers: DashMap::new(),
139            next_id: AtomicU64::new(1),
140            watcher_buffer_size,
141            unregister_tx,
142        }
143    }
144
145    /// Register a new watcher for a specific key
146    ///
147    /// Returns a handle that receives watch events via an mpsc channel.
148    /// The watcher is automatically unregistered when the handle is dropped.
149    pub fn register(
150        &self,
151        key: Bytes,
152    ) -> WatcherHandle {
153        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
154        let (sender, receiver) = mpsc::channel(self.watcher_buffer_size);
155
156        let watcher = Watcher { id, sender };
157
158        // Insert into DashMap (lock-free)
159        self.watchers.entry(key.clone()).or_default().push(watcher);
160
161        trace!(watcher_id = id, key = ?key, "Watcher registered");
162
163        WatcherHandle {
164            id,
165            key,
166            receiver,
167            unregister_tx: Some(self.unregister_tx.clone()),
168        }
169    }
170
171    /// Unregister a watcher
172    fn unregister(
173        &self,
174        id: u64,
175        key: &Bytes,
176    ) {
177        self.watchers.remove_if_mut(key, |_key, watchers| {
178            watchers.retain(|w| w.id != id);
179            watchers.is_empty()
180        });
181    }
182
183    /// Get the number of active watchers for a specific key (for testing)
184    #[cfg(test)]
185    pub(crate) fn watcher_count(
186        &self,
187        key: &Bytes,
188    ) -> usize {
189        self.watchers.get(key).map(|w| w.len()).unwrap_or(0)
190    }
191
192    /// Get the total number of watched keys (for testing)
193    #[cfg(test)]
194    pub(crate) fn watched_key_count(&self) -> usize {
195        self.watchers.len()
196    }
197}
198
199/// Watch dispatcher - distributes events to watchers (background task)
200///
201/// This is spawned explicitly in NodeBuilder::build() to make resource
202/// allocation visible. It continuously:
203/// 1. Receives events from broadcast channel
204/// 2. Matches keys in the registry
205/// 3. Dispatches to matching watchers
206pub struct WatchDispatcher {
207    /// Shared registry (same instance held by Node)
208    registry: Arc<WatchRegistry>,
209
210    /// Broadcast receiver for global events
211    broadcast_rx: broadcast::Receiver<WatchEvent>,
212
213    /// Unregister channel receiver
214    unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
215}
216
217impl WatchDispatcher {
218    /// Create a new watch dispatcher
219    ///
220    /// # Arguments
221    /// * `registry` - Shared registry for looking up watchers
222    /// * `broadcast_rx` - Receiver for watch events from StateMachine
223    /// * `unregister_rx` - Receiver for unregister requests from WatcherHandles
224    pub fn new(
225        registry: Arc<WatchRegistry>,
226        broadcast_rx: broadcast::Receiver<WatchEvent>,
227        unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
228    ) -> Self {
229        Self {
230            registry,
231            broadcast_rx,
232            unregister_rx,
233        }
234    }
235
236    /// Run the dispatcher event loop
237    ///
238    /// This should be spawned as a tokio task in NodeBuilder::build().
239    /// It will run until the broadcast channel is closed.
240    pub async fn run(mut self) {
241        debug!("WatchDispatcher started");
242
243        loop {
244            tokio::select! {
245                biased;
246
247                // Handle unregister requests first (cleanup priority)
248                Some((id, key)) = self.unregister_rx.recv() => {
249                    self.registry.unregister(id, &key);
250                }
251
252                // Receive broadcast event
253                result = self.broadcast_rx.recv() => {
254                    match result {
255                        Ok(event) => {
256                            self.dispatch_event(event).await;
257                        }
258                        Err(broadcast::error::RecvError::Lagged(n)) => {
259                            warn!("WatchDispatcher lagged {} events (slow watchers)", n);
260                        }
261                        Err(broadcast::error::RecvError::Closed) => {
262                            debug!("Broadcast channel closed, WatchDispatcher stopping");
263                            break;
264                        }
265                    }
266                }
267            }
268        }
269
270        debug!("WatchDispatcher stopped");
271    }
272
273    /// Dispatch an event to all watchers of a specific key
274    async fn dispatch_event(
275        &self,
276        event: WatchEvent,
277    ) {
278        if let Some(watchers) = self.registry.watchers.get(&event.key) {
279            let mut dead_watchers = Vec::new();
280
281            for watcher in watchers.iter() {
282                // Non-blocking send
283                if watcher.sender.try_send(event.clone()).is_err() {
284                    // Receiver dropped or full, mark for cleanup
285                    dead_watchers.push(watcher.id);
286                }
287            }
288
289            // Cleanup dead watchers
290            drop(watchers);
291            if !dead_watchers.is_empty() {
292                for id in dead_watchers {
293                    self.registry.unregister(id, &event.key);
294                }
295            }
296
297            trace!(key = ?event.key, event_type = ?event.event_type, "Event dispatched");
298        }
299    }
300}