Expand description
Watch mechanism for monitoring key changes
This module provides a high-performance, lock-free watch system that allows clients to monitor specific keys for changes with minimal overhead on the write path.
§Architecture Overview
The watch system is composed of two main components created explicitly in NodeBuilder:
- WatchRegistry (Shared State): Lock-free registration via DashMap
- WatchDispatcher (Background Task): Spawned explicitly in Builder, distributes events
┌─────────────┐
│ StateMachine│
│ apply() │
└──────┬──────┘
│ notify_change() [~10ns]
▼
┌─────────────────┐
│ Event Queue │ (crossbeam-channel, lock-free)
│ (bounded 1000) │
└──────┬──────────┘
│
▼
┌─────────────────┐
│ Dispatcher │ (background thread)
│ Thread │
└──────┬──────────┘
│ lookup in DashMap (lock-free)
▼
┌─────────────────┐
│ Per-Watcher │ (tokio mpsc, bounded 10)
│ Channels │
└──────┬──────────┘
│
▼
┌─────────────────┐
│ gRPC Stream │
│ to Client │
└─────────────────┘§Performance Characteristics
- Write overhead: < 0.01% with 100+ watchers (target: <10ns per notify)
- Event latency: Typically < 100μs end-to-end
- Memory per watcher: ~2.4KB with default buffer size (10)
- Scalability: Tested with 1000+ concurrent watchers
§Usage Example
ⓘ
use d_engine_core::watch::{WatchRegistry, WatchDispatcher};
use bytes::Bytes;
// Create components (typically done in NodeBuilder)
let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
let registry = Arc::new(WatchRegistry::new(10, unregister_tx));
let dispatcher = WatchDispatcher::new(registry.clone(), broadcast_rx, unregister_rx);
// Explicitly spawn dispatcher (visible resource allocation)
tokio::spawn(async move {
dispatcher.run().await;
});
// Register a watcher
let key = Bytes::from("mykey");
let handle = registry.register(key);§Configuration
The watch system is configurable via WatchConfig:
event_queue_size: Global event queue buffer (default: 1000)watcher_buffer_size: Per-watcher channel buffer (default: 10)enable_metrics: Enable detailed logging and metrics (default: false)
use d_engine_core::watch::WatchConfig;
let config = WatchConfig {
enabled: true,
event_queue_size: 2000,
watcher_buffer_size: 20,
enable_metrics: true,
};§Error Handling
The watch system prioritizes write performance over guaranteed event delivery:
- When the global event queue is full, new events are dropped
- When a per-watcher channel is full, events for that watcher are dropped
- Clients should use the Read API to re-sync if they detect missing events
This design ensures that a slow or stuck watcher never blocks the write path.
§Thread Safety
All types in this module are thread-safe and can be safely shared across threads:
WatchRegistryis shared viaArcbetween Node and Dispatcher- All operations are lock-free (DashMap + AtomicU64)
- The dispatcher runs as an independent tokio task
§Design Principles
- No hidden resource allocation: All tokio::spawn calls are explicit in Builder
- Minimal abstraction: Only essential data structures, no unnecessary wrappers
- Composable: Registry and Dispatcher are independent, composed in Builder
- Uses
DashMapfor the watcher registry (lock-free concurrent HashMap) - Uses
tokio::sync::mpscfor per-watcher channels (async-friendly) - Watchers are automatically cleaned up when dropped (RAII pattern)
Re-exports§
pub use crate::config::WatchConfig;
Structs§
- Watch
Dispatcher - Watch dispatcher - distributes events to watchers (background task)
- Watch
Event - Response containing a watch event notification
- Watch
Registry - Watch registry - manages watcher registration (Arc-shareable)
- Watcher
Handle - Handle for a registered watcher
Enums§
- Watch
Event Type - Watch event type indicating the type of change that occurred