d_engine_core/watch/manager.rs
1//! Watch mechanism for monitoring key changes
2//!
3//! Architecture: Shared State + Background Dispatcher
4//!
5//! ```text
6//! StateMachine:
7//! apply_chunk() -> broadcast_watch_events() -> broadcast::send(WatchResponse) [fire-and-forget]
8//! ↓
9//! WatchDispatcher (spawned in Builder):
10//! broadcast::subscribe() ->
11//! exact: DashMap<Bytes, Vec<Watcher>> O(1) lookup per event
12//! prefix: DashMap<Bytes, Vec<Watcher>> O(depth) decomposition + O(1) lookup per segment
13//! ↓
14//! Watchers:
15//! Embedded: mpsc::Receiver<WatchEvent> (opaque Rust type, no proto dependency)
16//! Standalone: mpsc::Receiver -> gRPC stream (caller converts WatchEvent → WatchResponse)
17//! ```
18//!
19//! # Design Principles
20//!
21//! - **No hidden resource allocation**: All tokio::spawn calls are explicit in Builder
22//! - **Minimal abstraction**: Only essential data structures, no unnecessary wrappers
23//! - **Composable**: Registry and Dispatcher are independent, composed in Builder
24//! - **O(depth) prefix dispatch**: Decompose event key into slash-terminated path segments,
25//! O(1) DashMap lookup per segment — dispatch cost is independent of prefix watcher count
26//! - **Proto boundary**: WatchResponse (proto) lives only in the broadcast channel and handler;
27//! WatchEvent (opaque) is what callers see — no proto import required.
28
29use std::collections::hash_map::DefaultHasher;
30use std::hash::{Hash, Hasher};
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
33use std::time::Duration;
34
35use bytes::Bytes;
36use d_engine_proto::client::WatchResponse;
37use dashmap::DashMap;
38use tokio::sync::broadcast;
39use tokio::sync::mpsc;
40use tracing::debug;
41use tracing::trace;
42use tracing::warn;
43
44// ── Public opaque types ───────────────────────────────────────────────────────
45
46/// High-level watch event type. No proto import required by callers.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum WatchEventType {
49 Put,
50 Delete,
51 Canceled,
52 Progress,
53}
54
55/// Opaque watch event delivered to callers.
56///
57/// All fields use standard Rust types — callers never need to import proto types.
58///
59/// `prev_value` semantics:
60/// - `None` — watcher was registered with `prev_kv = false`, or the event is
61/// `Progress` / `Canceled` (not a data mutation).
62/// - `Some(Bytes::new())` — watcher has `prev_kv = true` and the key did not exist
63/// before this write (fresh insert).
64/// - `Some(v)` — watcher has `prev_kv = true` and `v` is the previous value.
65#[derive(Debug, Clone)]
66pub struct WatchEvent {
67 pub event_type: WatchEventType,
68 pub key: Bytes,
69 pub value: Bytes,
70 pub prev_value: Option<Bytes>,
71 pub revision: u64,
72}
73
74// ── Proto ↔ opaque conversions ────────────────────────────────────────────────
75
76/// Convert a proto WatchResponse into an opaque WatchEvent.
77///
78/// Called by the dispatcher when delivering to a per-watcher mpsc channel.
79/// `prev_kv`: when false, prev_value is zeroed so callers that didn't opt in
80/// never receive stale memory from a watcher that did.
81fn proto_to_event(
82 proto: &WatchResponse,
83 prev_kv: bool,
84) -> WatchEvent {
85 use d_engine_proto::client::WatchEventType as ProtoType;
86
87 let event_type = match ProtoType::try_from(proto.event_type) {
88 Ok(ProtoType::Put) => WatchEventType::Put,
89 Ok(ProtoType::Delete) => WatchEventType::Delete,
90 Ok(ProtoType::Canceled) => WatchEventType::Canceled,
91 Ok(ProtoType::Progress) => WatchEventType::Progress,
92 Err(_) => WatchEventType::Canceled,
93 };
94
95 WatchEvent {
96 event_type,
97 key: proto.key.clone(),
98 value: proto.value.clone(),
99 // None when prev_kv = false so callers can distinguish "not requested" from
100 // "key didn't exist" (Some(empty)).
101 prev_value: if prev_kv {
102 Some(proto.prev_value.clone())
103 } else {
104 None
105 },
106 revision: proto.revision,
107 }
108}
109
110/// Convert an opaque WatchEvent back to a proto WatchResponse (for gRPC).
111impl From<&WatchEvent> for WatchResponse {
112 fn from(e: &WatchEvent) -> Self {
113 use d_engine_proto::client::WatchEventType as ProtoType;
114 WatchResponse {
115 key: e.key.clone(),
116 value: e.value.clone(),
117 // Proto transport uses bytes (no optional); None collapses to empty.
118 prev_value: e.prev_value.clone().unwrap_or_default(),
119 event_type: match e.event_type {
120 WatchEventType::Put => ProtoType::Put as i32,
121 WatchEventType::Delete => ProtoType::Delete as i32,
122 WatchEventType::Canceled => ProtoType::Canceled as i32,
123 WatchEventType::Progress => ProtoType::Progress as i32,
124 },
125 error: 0,
126 revision: e.revision,
127 }
128 }
129}
130
131// ── WatchError ────────────────────────────────────────────────────────────────
132
133/// Errors returned by WatchRegistry registration methods.
134#[derive(Debug)]
135pub enum WatchError {
136 /// Active watcher count has reached the configured max_watcher_count cap.
137 LimitExceeded(usize),
138 /// Prefix must start with '/' and end with '/'.
139 InvalidPrefix,
140}
141
142impl std::fmt::Display for WatchError {
143 fn fmt(
144 &self,
145 f: &mut std::fmt::Formatter<'_>,
146 ) -> std::fmt::Result {
147 match self {
148 WatchError::LimitExceeded(n) => write!(f, "watcher limit ({n}) exceeded"),
149 WatchError::InvalidPrefix => {
150 write!(f, "prefix must start with '/' and end with '/'")
151 }
152 }
153 }
154}
155
156impl std::error::Error for WatchError {}
157
158// ── Prefix helpers ────────────────────────────────────────────────────────────
159
160/// Decompose a key into all its slash-terminated path prefix candidates.
161///
162/// The dispatcher calls this on every event key to perform O(depth) prefix
163/// lookup via O(1) DashMap gets — one per path segment — instead of scanning
164/// all registered prefixes linearly.
165///
166/// Examples:
167/// "/config/db/host" → ["/", "/config/", "/config/db/"]
168/// "/" → ["/"]
169/// "/config" → ["/"]
170/// "/config/" → ["/", "/config/"]
171pub(crate) fn prefix_segments(key: &Bytes) -> Vec<Bytes> {
172 key.iter()
173 .enumerate()
174 .filter(|&(_, &b)| b == b'/')
175 .map(|(i, _)| key.slice(0..i + 1))
176 .collect()
177}
178
179// ── WatcherHandle ─────────────────────────────────────────────────────────────
180
181/// Handle for a registered watcher.
182///
183/// When dropped, the watcher is automatically unregistered (if unregister_tx is Some).
184pub struct WatcherHandle {
185 /// Unique identifier
186 id: u64,
187 /// Key being watched (exact key) or prefix being watched (prefix watcher)
188 key: Bytes,
189 /// True when registered via register_prefix()
190 is_prefix: bool,
191 /// Channel receiver for watch events
192 receiver: mpsc::Receiver<WatchEvent>,
193 /// Unregister channel (None if cleanup disabled via into_receiver)
194 unregister_tx: Option<mpsc::UnboundedSender<(u64, Bytes)>>,
195}
196
197impl WatcherHandle {
198 pub fn id(&self) -> u64 {
199 self.id
200 }
201
202 pub fn key(&self) -> &Bytes {
203 &self.key
204 }
205
206 /// True if this handle was registered via register_prefix().
207 pub fn is_prefix(&self) -> bool {
208 self.is_prefix
209 }
210
211 pub fn receiver_mut(&mut self) -> &mut mpsc::Receiver<WatchEvent> {
212 &mut self.receiver
213 }
214
215 /// Consume the handle and return the event receiver.
216 ///
217 /// Disables automatic unregistration. The watcher remains active until
218 /// the receiver is dropped (causing send failures that trigger cleanup).
219 ///
220 /// Use this for long-lived streams (e.g., gRPC) where the receiver lifetime
221 /// extends beyond the handle's scope.
222 pub fn into_receiver(mut self) -> (u64, Bytes, mpsc::Receiver<WatchEvent>) {
223 let id = self.id;
224 let key = self.key.clone();
225 self.unregister_tx = None;
226 let (dummy_tx, dummy_rx) = mpsc::channel(1);
227 drop(dummy_tx);
228 let receiver = std::mem::replace(&mut self.receiver, dummy_rx);
229 (id, key, receiver)
230 }
231}
232
233impl Drop for WatcherHandle {
234 fn drop(&mut self) {
235 if let Some(ref tx) = self.unregister_tx {
236 let _ = tx.send((self.id, self.key.clone()));
237 trace!(
238 watcher_id = self.id,
239 key = ?self.key,
240 is_prefix = self.is_prefix,
241 "Watcher unregistered"
242 );
243 }
244 }
245}
246
247// ── Internal watcher state ────────────────────────────────────────────────────
248
249/// Internal watcher state
250#[derive(Debug)]
251struct Watcher {
252 id: u64,
253 sender: mpsc::Sender<WatchEvent>,
254 /// When true, prev_value is populated before delivery.
255 prev_kv: bool,
256}
257
258// ── WatchRegistry ─────────────────────────────────────────────────────────────
259
260/// Watch registry — manages watcher registration (Arc-shareable).
261///
262/// Two independent DashMaps keep exact and prefix watchers separate so
263/// dispatch can use different lookup strategies for each without coupling.
264pub struct WatchRegistry {
265 /// Exact-match watchers: event_key → watchers
266 exact: DashMap<Bytes, Vec<Watcher>>,
267 /// Prefix watchers: prefix → watchers (prefix must start and end with '/')
268 prefix: DashMap<Bytes, Vec<Watcher>>,
269 /// Next watcher ID (monotonically increasing, globally unique)
270 next_id: AtomicU64,
271 /// Total active watchers across exact + prefix (for limit enforcement)
272 total_count: AtomicUsize,
273 /// Count of active watchers that requested prev_kv = true.
274 /// Shared Arc lets the state machine handler poll this without holding a registry ref.
275 prev_kv_watcher_count: Arc<AtomicUsize>,
276 /// Per-watcher channel buffer size
277 watcher_buffer_size: usize,
278 /// Hard cap on total active watchers; register() returns LimitExceeded when reached
279 max_watcher_count: usize,
280 /// Unregister channel sender (cloned for each WatcherHandle)
281 unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
282}
283
284impl WatchRegistry {
285 /// Create a new registry with no watcher count limit.
286 pub fn new(
287 watcher_buffer_size: usize,
288 unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
289 ) -> Self {
290 Self::new_with_limits(watcher_buffer_size, usize::MAX, unregister_tx)
291 }
292
293 /// Create a new registry with a hard watcher count cap.
294 ///
295 /// `register()` and `register_prefix()` return `WatchError::LimitExceeded`
296 /// once `max_watcher_count` active watchers are registered.
297 pub fn new_with_limits(
298 watcher_buffer_size: usize,
299 max_watcher_count: usize,
300 unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
301 ) -> Self {
302 Self {
303 exact: DashMap::new(),
304 prefix: DashMap::new(),
305 next_id: AtomicU64::new(1),
306 total_count: AtomicUsize::new(0),
307 prev_kv_watcher_count: Arc::new(AtomicUsize::new(0)),
308 watcher_buffer_size,
309 max_watcher_count,
310 unregister_tx,
311 }
312 }
313
314 /// Register an exact-key watcher.
315 ///
316 /// `prev_kv`: when true the server reads the old value before each write
317 /// and populates `WatchEvent::prev_value`. Only pays the read cost when
318 /// at least one watcher has prev_kv = true.
319 ///
320 /// Returns `WatchError::LimitExceeded` if max_watcher_count is reached.
321 pub fn register(
322 &self,
323 key: Bytes,
324 prev_kv: bool,
325 ) -> Result<WatcherHandle, WatchError> {
326 self.do_register(key, false, prev_kv)
327 }
328
329 /// Register a prefix watcher.
330 ///
331 /// `prefix` must start with '/' and end with '/'.
332 /// E.g. "/config/" watches all keys under /config/.
333 ///
334 /// `prev_kv`: same semantics as `register()`.
335 ///
336 /// Returns `WatchError::InvalidPrefix` if format is wrong.
337 /// Returns `WatchError::LimitExceeded` if max_watcher_count is reached.
338 pub fn register_prefix(
339 &self,
340 prefix: Bytes,
341 prev_kv: bool,
342 ) -> Result<WatcherHandle, WatchError> {
343 if !prefix.starts_with(b"/") || !prefix.ends_with(b"/") {
344 return Err(WatchError::InvalidPrefix);
345 }
346 self.do_register(prefix, true, prev_kv)
347 }
348
349 fn do_register(
350 &self,
351 key: Bytes,
352 is_prefix: bool,
353 prev_kv: bool,
354 ) -> Result<WatcherHandle, WatchError> {
355 // Reserve the slot first, then check. This eliminates the TOCTOU window
356 // between a load-check and a separate fetch_add under concurrent registration.
357 let prev = self.total_count.fetch_add(1, Ordering::Relaxed);
358 if prev >= self.max_watcher_count {
359 self.total_count.fetch_sub(1, Ordering::Relaxed);
360 return Err(WatchError::LimitExceeded(self.max_watcher_count));
361 }
362
363 if prev_kv {
364 self.prev_kv_watcher_count.fetch_add(1, Ordering::Relaxed);
365 }
366
367 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
368 // +1 reserves one slot for the CANCELED sentinel (always deliverable even when full)
369 let (sender, receiver) = mpsc::channel(self.watcher_buffer_size + 1);
370 let watcher = Watcher {
371 id,
372 sender,
373 prev_kv,
374 };
375
376 if is_prefix {
377 self.prefix.entry(key.clone()).or_default().push(watcher);
378 } else {
379 self.exact.entry(key.clone()).or_default().push(watcher);
380 }
381 trace!(watcher_id = id, key = ?key, is_prefix, prev_kv, "Watcher registered");
382
383 Ok(WatcherHandle {
384 id,
385 key,
386 is_prefix,
387 receiver,
388 unregister_tx: Some(self.unregister_tx.clone()),
389 })
390 }
391
392 fn unregister(
393 &self,
394 id: u64,
395 key: &Bytes,
396 ) {
397 let mut found = false;
398 let mut had_prev_kv = false;
399
400 // Try exact map first
401 self.exact.remove_if_mut(key, |_, watchers| {
402 if let Some(pos) = watchers.iter().position(|w| w.id == id) {
403 had_prev_kv = watchers[pos].prev_kv;
404 watchers.remove(pos);
405 found = true;
406 }
407 watchers.is_empty()
408 });
409
410 // Fall back to prefix map
411 if !found {
412 self.prefix.remove_if_mut(key, |_, watchers| {
413 if let Some(pos) = watchers.iter().position(|w| w.id == id) {
414 had_prev_kv = watchers[pos].prev_kv;
415 watchers.remove(pos);
416 found = true;
417 }
418 watchers.is_empty()
419 });
420 }
421
422 if found {
423 self.total_count.fetch_sub(1, Ordering::Relaxed);
424 if had_prev_kv {
425 self.prev_kv_watcher_count.fetch_sub(1, Ordering::Relaxed);
426 }
427 }
428 }
429
430 /// Number of active watchers that opted in to prev_kv.
431 pub fn prev_kv_watcher_count(&self) -> usize {
432 self.prev_kv_watcher_count.load(Ordering::Relaxed)
433 }
434
435 /// Clone of the shared prev_kv counter Arc.
436 ///
437 /// Pass this to `DefaultStateMachineHandler` so it can poll the live value
438 /// without holding a reference to the registry.
439 pub fn prev_kv_watcher_count_arc(&self) -> Arc<AtomicUsize> {
440 Arc::clone(&self.prev_kv_watcher_count)
441 }
442
443 #[cfg(test)]
444 pub(crate) fn watcher_count(
445 &self,
446 key: &Bytes,
447 ) -> usize {
448 self.exact.get(key).map(|w| w.len()).unwrap_or(0)
449 }
450
451 #[cfg(test)]
452 pub(crate) fn watched_key_count(&self) -> usize {
453 self.exact.len()
454 }
455
456 #[cfg(test)]
457 pub(crate) fn prefix_watcher_count(
458 &self,
459 prefix: &Bytes,
460 ) -> usize {
461 self.prefix.get(prefix).map(|w| w.len()).unwrap_or(0)
462 }
463}
464
465// ── WatchDispatcher ───────────────────────────────────────────────────────────
466
467/// Watch dispatcher — distributes events to watchers (background task).
468///
469/// Spawned explicitly in NodeBuilder::build() to make resource allocation visible.
470/// Receives proto WatchResponse from the broadcast channel, converts to opaque
471/// WatchEvent per-watcher, and delivers via per-watcher mpsc channels.
472pub struct WatchDispatcher {
473 registry: Arc<WatchRegistry>,
474 broadcast_rx: broadcast::Receiver<WatchResponse>,
475 unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
476 /// Shared applied index for Progress event revision field.
477 last_applied: Arc<std::sync::atomic::AtomicU64>,
478 /// Heartbeat interval. 0 = disabled.
479 heartbeat_interval_ms: u64,
480}
481
482impl WatchDispatcher {
483 pub fn new(
484 registry: Arc<WatchRegistry>,
485 broadcast_rx: broadcast::Receiver<WatchResponse>,
486 unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
487 last_applied: Arc<std::sync::atomic::AtomicU64>,
488 heartbeat_interval_ms: u64,
489 ) -> Self {
490 Self {
491 registry,
492 broadcast_rx,
493 unregister_rx,
494 last_applied,
495 heartbeat_interval_ms,
496 }
497 }
498
499 pub async fn run(mut self) {
500 debug!("WatchDispatcher started");
501
502 // Build optional heartbeat future. When interval_ms == 0 the future
503 // is a pending sleep that never fires, adding zero overhead.
504 // Build the heartbeat interval only when enabled.
505 // Constructing `interval_at(now + Duration::from_millis(u64::MAX), ...)` panics on
506 // overflow, so we must skip interval creation entirely when heartbeat is off.
507 let mut heartbeat: Option<tokio::time::Interval> = if self.heartbeat_interval_ms > 0 {
508 let base_ms = self.heartbeat_interval_ms;
509 let jitter = (base_ms / 10).max(1);
510 // Mix thread ID and wall-clock nanoseconds into a single hash so that nodes
511 // started simultaneously (e.g. k8s rolling restart within the same millisecond)
512 // still get different offsets. No external crate needed.
513 let mut h = DefaultHasher::new();
514 std::thread::current().id().hash(&mut h);
515 std::time::SystemTime::now().hash(&mut h);
516 let seed = h.finish();
517 let offset = seed % (jitter * 2);
518 let first_tick_ms = base_ms.saturating_sub(jitter) + offset;
519 let mut interval = tokio::time::interval_at(
520 tokio::time::Instant::now() + Duration::from_millis(first_tick_ms),
521 Duration::from_millis(base_ms),
522 );
523 // Skip missed ticks: heartbeat is a liveness signal, not a counter.
524 // Bursting N progress events after a slow-watcher stall would mislead clients
525 // into thinking the stream was alive during the stall period.
526 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
527 Some(interval)
528 } else {
529 None
530 };
531
532 loop {
533 tokio::select! {
534 biased;
535 // Cleanup first so dead watchers don't receive the next event
536 Some((id, key)) = self.unregister_rx.recv() => {
537 self.registry.unregister(id, &key);
538 }
539 result = self.broadcast_rx.recv() => {
540 match result {
541 Ok(event) => self.dispatch_event(event).await,
542 Err(broadcast::error::RecvError::Lagged(n)) => {
543 warn!("WatchDispatcher lagged {} events (slow watchers)", n);
544 }
545 Err(broadcast::error::RecvError::Closed) => {
546 debug!("Broadcast channel closed, WatchDispatcher stopping");
547 break;
548 }
549 }
550 }
551 Some(t) = async { if let Some(ref mut hb) = heartbeat { Some(hb.tick().await) } else { std::future::pending().await } } => {
552 let _ = t;
553 self.broadcast_progress().await;
554 }
555 }
556 }
557 debug!("WatchDispatcher stopped");
558 }
559
560 /// Broadcast a synthetic Progress event to ALL active watchers regardless of key.
561 ///
562 /// Unlike data events (routed by key), Progress is a liveness signal — every watcher
563 /// must receive it so clients can detect silent stream death.
564 /// Keys are collected first to avoid holding DashMap shard locks across awaits.
565 async fn broadcast_progress(&self) {
566 let revision = self.last_applied.load(Ordering::Relaxed);
567 let progress = WatchResponse {
568 key: Bytes::new(),
569 value: Bytes::new(),
570 prev_value: Bytes::new(),
571 event_type: d_engine_proto::client::WatchEventType::Progress as i32,
572 error: 0,
573 revision,
574 };
575
576 let exact_keys: Vec<Bytes> = self.registry.exact.iter().map(|e| e.key().clone()).collect();
577 let prefix_keys: Vec<Bytes> =
578 self.registry.prefix.iter().map(|e| e.key().clone()).collect();
579
580 for key in exact_keys {
581 self.dispatch_to_map(&self.registry.exact, &key, &progress).await;
582 }
583 for key in prefix_keys {
584 self.dispatch_to_map(&self.registry.prefix, &key, &progress).await;
585 }
586 }
587
588 async fn dispatch_event(
589 &self,
590 event: WatchResponse,
591 ) {
592 // Step 1: exact match — O(1) DashMap lookup
593 self.dispatch_to_map(&self.registry.exact, &event.key, &event).await;
594
595 // Step 2: prefix match — O(depth) where depth = number of '/' in key
596 for prefix in prefix_segments(&event.key) {
597 self.dispatch_to_map(&self.registry.prefix, &prefix, &event).await;
598 }
599 }
600
601 /// Dispatch an event to all watchers under `lookup_key` in `map`.
602 ///
603 /// Converts proto WatchResponse → opaque WatchEvent per watcher, respecting
604 /// each watcher's prev_kv preference. Handles overflow detection and dead
605 /// watcher cleanup identically for both the exact and prefix maps.
606 async fn dispatch_to_map(
607 &self,
608 map: &DashMap<Bytes, Vec<Watcher>>,
609 lookup_key: &Bytes,
610 event: &WatchResponse,
611 ) {
612 if let Some(watchers) = map.get(lookup_key) {
613 let mut dead_watchers = Vec::new();
614
615 for watcher in watchers.iter() {
616 let available = watcher.sender.capacity();
617
618 if available <= 1 {
619 // capacity == 1: only the reserved cancel slot remains → overflow
620 // capacity == 0: defensive, shouldn't happen in normal flow
621 if available == 1 {
622 warn!(
623 watcher_id = watcher.id,
624 key = ?event.key,
625 buffer_capacity = watcher.sender.max_capacity(),
626 buffer_len = watcher.sender.max_capacity() - available,
627 "watcher buffer overflow, sending cancel"
628 );
629 let _ = watcher
630 .sender
631 .try_send(crate::watch::make_cancel_event(event.key.clone()));
632 }
633 dead_watchers.push(watcher.id);
634 continue;
635 }
636
637 // Convert proto → opaque, respecting per-watcher prev_kv flag
638 let watch_event = proto_to_event(event, watcher.prev_kv);
639
640 // Normal send: reserved slot untouched
641 if let Err(mpsc::error::TrySendError::Closed(_)) =
642 watcher.sender.try_send(watch_event)
643 {
644 // Receiver dropped: silent cleanup, no cancel needed
645 dead_watchers.push(watcher.id);
646 }
647 }
648
649 drop(watchers);
650 for id in dead_watchers {
651 self.registry.unregister(id, lookup_key);
652 }
653
654 trace!(
655 event_key = ?event.key,
656 lookup_key = ?lookup_key,
657 event_type = ?event.event_type,
658 "Event dispatched"
659 );
660 }
661 }
662}