Skip to main content

d_engine_core/watch/
manager.rs

1//! Watch mechanism for monitoring key changes
2//!
3//! Architecture: Shared State + Background Dispatcher
4//!
5//! ```text
6//! StateMachine:
7//!   apply_chunk() -> broadcast_watch_events() -> broadcast::send(WatchResponse) [fire-and-forget]
8//!                                                        ↓
9//! WatchDispatcher (spawned in Builder):
10//!   broadcast::subscribe() ->
11//!     exact:  DashMap<Bytes, Vec<Watcher>>  O(1) lookup per event
12//!     prefix: DashMap<Bytes, Vec<Watcher>>  O(depth) decomposition + O(1) lookup per segment
13//!                                                           ↓
14//! Watchers:
15//!   Embedded:   mpsc::Receiver<WatchEvent>   (opaque Rust type, no proto dependency)
16//!   Standalone: mpsc::Receiver -> gRPC stream (caller converts WatchEvent → WatchResponse)
17//! ```
18//!
19//! # Design Principles
20//!
21//! - **No hidden resource allocation**: All tokio::spawn calls are explicit in Builder
22//! - **Minimal abstraction**: Only essential data structures, no unnecessary wrappers
23//! - **Composable**: Registry and Dispatcher are independent, composed in Builder
24//! - **O(depth) prefix dispatch**: Decompose event key into slash-terminated path segments,
25//!   O(1) DashMap lookup per segment — dispatch cost is independent of prefix watcher count
26//! - **Proto boundary**: WatchResponse (proto) lives only in the broadcast channel and handler;
27//!   WatchEvent (opaque) is what callers see — no proto import required.
28
29use std::collections::hash_map::DefaultHasher;
30use std::hash::{Hash, Hasher};
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
33use std::time::Duration;
34
35use bytes::Bytes;
36use d_engine_proto::client::WatchResponse;
37use dashmap::DashMap;
38use tokio::sync::broadcast;
39use tokio::sync::mpsc;
40use tracing::debug;
41use tracing::trace;
42use tracing::warn;
43
44// ── Public opaque types ───────────────────────────────────────────────────────
45
46/// High-level watch event type.  No proto import required by callers.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum WatchEventType {
49    Put,
50    Delete,
51    Canceled,
52    Progress,
53}
54
55/// Opaque watch event delivered to callers.
56///
57/// All fields use standard Rust types — callers never need to import proto types.
58///
59/// `prev_value` semantics:
60/// - `None` — watcher was registered with `prev_kv = false`, or the event is
61///   `Progress` / `Canceled` (not a data mutation).
62/// - `Some(Bytes::new())` — watcher has `prev_kv = true` and the key did not exist
63///   before this write (fresh insert).
64/// - `Some(v)` — watcher has `prev_kv = true` and `v` is the previous value.
65#[derive(Debug, Clone)]
66pub struct WatchEvent {
67    pub event_type: WatchEventType,
68    pub key: Bytes,
69    pub value: Bytes,
70    pub prev_value: Option<Bytes>,
71    pub revision: u64,
72}
73
74// ── Proto ↔ opaque conversions ────────────────────────────────────────────────
75
76/// Convert a proto WatchResponse into an opaque WatchEvent.
77///
78/// Called by the dispatcher when delivering to a per-watcher mpsc channel.
79/// `prev_kv`: when false, prev_value is zeroed so callers that didn't opt in
80/// never receive stale memory from a watcher that did.
81fn proto_to_event(
82    proto: &WatchResponse,
83    prev_kv: bool,
84) -> WatchEvent {
85    use d_engine_proto::client::WatchEventType as ProtoType;
86
87    let event_type = match ProtoType::try_from(proto.event_type) {
88        Ok(ProtoType::Put) => WatchEventType::Put,
89        Ok(ProtoType::Delete) => WatchEventType::Delete,
90        Ok(ProtoType::Canceled) => WatchEventType::Canceled,
91        Ok(ProtoType::Progress) => WatchEventType::Progress,
92        Err(_) => WatchEventType::Canceled,
93    };
94
95    WatchEvent {
96        event_type,
97        key: proto.key.clone(),
98        value: proto.value.clone(),
99        // None when prev_kv = false so callers can distinguish "not requested" from
100        // "key didn't exist" (Some(empty)).
101        prev_value: if prev_kv {
102            Some(proto.prev_value.clone())
103        } else {
104            None
105        },
106        revision: proto.revision,
107    }
108}
109
110/// Convert an opaque WatchEvent back to a proto WatchResponse (for gRPC).
111impl From<&WatchEvent> for WatchResponse {
112    fn from(e: &WatchEvent) -> Self {
113        use d_engine_proto::client::WatchEventType as ProtoType;
114        WatchResponse {
115            key: e.key.clone(),
116            value: e.value.clone(),
117            // Proto transport uses bytes (no optional); None collapses to empty.
118            prev_value: e.prev_value.clone().unwrap_or_default(),
119            event_type: match e.event_type {
120                WatchEventType::Put => ProtoType::Put as i32,
121                WatchEventType::Delete => ProtoType::Delete as i32,
122                WatchEventType::Canceled => ProtoType::Canceled as i32,
123                WatchEventType::Progress => ProtoType::Progress as i32,
124            },
125            error: 0,
126            revision: e.revision,
127        }
128    }
129}
130
131// ── WatchError ────────────────────────────────────────────────────────────────
132
133/// Errors returned by WatchRegistry registration methods.
134#[derive(Debug)]
135pub enum WatchError {
136    /// Active watcher count has reached the configured max_watcher_count cap.
137    LimitExceeded(usize),
138    /// Prefix must start with '/' and end with '/'.
139    InvalidPrefix,
140}
141
142impl std::fmt::Display for WatchError {
143    fn fmt(
144        &self,
145        f: &mut std::fmt::Formatter<'_>,
146    ) -> std::fmt::Result {
147        match self {
148            WatchError::LimitExceeded(n) => write!(f, "watcher limit ({n}) exceeded"),
149            WatchError::InvalidPrefix => {
150                write!(f, "prefix must start with '/' and end with '/'")
151            }
152        }
153    }
154}
155
156impl std::error::Error for WatchError {}
157
158// ── Prefix helpers ────────────────────────────────────────────────────────────
159
160/// Decompose a key into all its slash-terminated path prefix candidates.
161///
162/// The dispatcher calls this on every event key to perform O(depth) prefix
163/// lookup via O(1) DashMap gets — one per path segment — instead of scanning
164/// all registered prefixes linearly.
165///
166/// Examples:
167///   "/config/db/host" → ["/", "/config/", "/config/db/"]
168///   "/"               → ["/"]
169///   "/config"         → ["/"]
170///   "/config/"        → ["/", "/config/"]
171pub(crate) fn prefix_segments(key: &Bytes) -> Vec<Bytes> {
172    key.iter()
173        .enumerate()
174        .filter(|&(_, &b)| b == b'/')
175        .map(|(i, _)| key.slice(0..i + 1))
176        .collect()
177}
178
179// ── WatcherHandle ─────────────────────────────────────────────────────────────
180
181/// Handle for a registered watcher.
182///
183/// When dropped, the watcher is automatically unregistered (if unregister_tx is Some).
184pub struct WatcherHandle {
185    /// Unique identifier
186    id: u64,
187    /// Key being watched (exact key) or prefix being watched (prefix watcher)
188    key: Bytes,
189    /// True when registered via register_prefix()
190    is_prefix: bool,
191    /// Channel receiver for watch events
192    receiver: mpsc::Receiver<WatchEvent>,
193    /// Unregister channel (None if cleanup disabled via into_receiver)
194    unregister_tx: Option<mpsc::UnboundedSender<(u64, Bytes)>>,
195}
196
197impl WatcherHandle {
198    pub fn id(&self) -> u64 {
199        self.id
200    }
201
202    pub fn key(&self) -> &Bytes {
203        &self.key
204    }
205
206    /// True if this handle was registered via register_prefix().
207    pub fn is_prefix(&self) -> bool {
208        self.is_prefix
209    }
210
211    pub fn receiver_mut(&mut self) -> &mut mpsc::Receiver<WatchEvent> {
212        &mut self.receiver
213    }
214
215    /// Consume the handle and return the event receiver.
216    ///
217    /// Disables automatic unregistration. The watcher remains active until
218    /// the receiver is dropped (causing send failures that trigger cleanup).
219    ///
220    /// Use this for long-lived streams (e.g., gRPC) where the receiver lifetime
221    /// extends beyond the handle's scope.
222    pub fn into_receiver(mut self) -> (u64, Bytes, mpsc::Receiver<WatchEvent>) {
223        let id = self.id;
224        let key = self.key.clone();
225        self.unregister_tx = None;
226        let (dummy_tx, dummy_rx) = mpsc::channel(1);
227        drop(dummy_tx);
228        let receiver = std::mem::replace(&mut self.receiver, dummy_rx);
229        (id, key, receiver)
230    }
231}
232
233impl Drop for WatcherHandle {
234    fn drop(&mut self) {
235        if let Some(ref tx) = self.unregister_tx {
236            let _ = tx.send((self.id, self.key.clone()));
237            trace!(
238                watcher_id = self.id,
239                key = ?self.key,
240                is_prefix = self.is_prefix,
241                "Watcher unregistered"
242            );
243        }
244    }
245}
246
247// ── Internal watcher state ────────────────────────────────────────────────────
248
249/// Internal watcher state
250#[derive(Debug)]
251struct Watcher {
252    id: u64,
253    sender: mpsc::Sender<WatchEvent>,
254    /// When true, prev_value is populated before delivery.
255    prev_kv: bool,
256}
257
258// ── WatchRegistry ─────────────────────────────────────────────────────────────
259
260/// Watch registry — manages watcher registration (Arc-shareable).
261///
262/// Two independent DashMaps keep exact and prefix watchers separate so
263/// dispatch can use different lookup strategies for each without coupling.
264pub struct WatchRegistry {
265    /// Exact-match watchers: event_key → watchers
266    exact: DashMap<Bytes, Vec<Watcher>>,
267    /// Prefix watchers: prefix → watchers (prefix must start and end with '/')
268    prefix: DashMap<Bytes, Vec<Watcher>>,
269    /// Next watcher ID (monotonically increasing, globally unique)
270    next_id: AtomicU64,
271    /// Total active watchers across exact + prefix (for limit enforcement)
272    total_count: AtomicUsize,
273    /// Count of active watchers that requested prev_kv = true.
274    /// Shared Arc lets the state machine handler poll this without holding a registry ref.
275    prev_kv_watcher_count: Arc<AtomicUsize>,
276    /// Per-watcher channel buffer size
277    watcher_buffer_size: usize,
278    /// Hard cap on total active watchers; register() returns LimitExceeded when reached
279    max_watcher_count: usize,
280    /// Unregister channel sender (cloned for each WatcherHandle)
281    unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
282}
283
284impl WatchRegistry {
285    /// Create a new registry with no watcher count limit.
286    pub fn new(
287        watcher_buffer_size: usize,
288        unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
289    ) -> Self {
290        Self::new_with_limits(watcher_buffer_size, usize::MAX, unregister_tx)
291    }
292
293    /// Create a new registry with a hard watcher count cap.
294    ///
295    /// `register()` and `register_prefix()` return `WatchError::LimitExceeded`
296    /// once `max_watcher_count` active watchers are registered.
297    pub fn new_with_limits(
298        watcher_buffer_size: usize,
299        max_watcher_count: usize,
300        unregister_tx: mpsc::UnboundedSender<(u64, Bytes)>,
301    ) -> Self {
302        Self {
303            exact: DashMap::new(),
304            prefix: DashMap::new(),
305            next_id: AtomicU64::new(1),
306            total_count: AtomicUsize::new(0),
307            prev_kv_watcher_count: Arc::new(AtomicUsize::new(0)),
308            watcher_buffer_size,
309            max_watcher_count,
310            unregister_tx,
311        }
312    }
313
314    /// Register an exact-key watcher.
315    ///
316    /// `prev_kv`: when true the server reads the old value before each write
317    /// and populates `WatchEvent::prev_value`.  Only pays the read cost when
318    /// at least one watcher has prev_kv = true.
319    ///
320    /// Returns `WatchError::LimitExceeded` if max_watcher_count is reached.
321    pub fn register(
322        &self,
323        key: Bytes,
324        prev_kv: bool,
325    ) -> Result<WatcherHandle, WatchError> {
326        self.do_register(key, false, prev_kv)
327    }
328
329    /// Register a prefix watcher.
330    ///
331    /// `prefix` must start with '/' and end with '/'.
332    /// E.g. "/config/" watches all keys under /config/.
333    ///
334    /// `prev_kv`: same semantics as `register()`.
335    ///
336    /// Returns `WatchError::InvalidPrefix` if format is wrong.
337    /// Returns `WatchError::LimitExceeded` if max_watcher_count is reached.
338    pub fn register_prefix(
339        &self,
340        prefix: Bytes,
341        prev_kv: bool,
342    ) -> Result<WatcherHandle, WatchError> {
343        if !prefix.starts_with(b"/") || !prefix.ends_with(b"/") {
344            return Err(WatchError::InvalidPrefix);
345        }
346        self.do_register(prefix, true, prev_kv)
347    }
348
349    fn do_register(
350        &self,
351        key: Bytes,
352        is_prefix: bool,
353        prev_kv: bool,
354    ) -> Result<WatcherHandle, WatchError> {
355        // Reserve the slot first, then check. This eliminates the TOCTOU window
356        // between a load-check and a separate fetch_add under concurrent registration.
357        let prev = self.total_count.fetch_add(1, Ordering::Relaxed);
358        if prev >= self.max_watcher_count {
359            self.total_count.fetch_sub(1, Ordering::Relaxed);
360            return Err(WatchError::LimitExceeded(self.max_watcher_count));
361        }
362
363        if prev_kv {
364            self.prev_kv_watcher_count.fetch_add(1, Ordering::Relaxed);
365        }
366
367        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
368        // +1 reserves one slot for the CANCELED sentinel (always deliverable even when full)
369        let (sender, receiver) = mpsc::channel(self.watcher_buffer_size + 1);
370        let watcher = Watcher {
371            id,
372            sender,
373            prev_kv,
374        };
375
376        if is_prefix {
377            self.prefix.entry(key.clone()).or_default().push(watcher);
378        } else {
379            self.exact.entry(key.clone()).or_default().push(watcher);
380        }
381        trace!(watcher_id = id, key = ?key, is_prefix, prev_kv, "Watcher registered");
382
383        Ok(WatcherHandle {
384            id,
385            key,
386            is_prefix,
387            receiver,
388            unregister_tx: Some(self.unregister_tx.clone()),
389        })
390    }
391
392    fn unregister(
393        &self,
394        id: u64,
395        key: &Bytes,
396    ) {
397        let mut found = false;
398        let mut had_prev_kv = false;
399
400        // Try exact map first
401        self.exact.remove_if_mut(key, |_, watchers| {
402            if let Some(pos) = watchers.iter().position(|w| w.id == id) {
403                had_prev_kv = watchers[pos].prev_kv;
404                watchers.remove(pos);
405                found = true;
406            }
407            watchers.is_empty()
408        });
409
410        // Fall back to prefix map
411        if !found {
412            self.prefix.remove_if_mut(key, |_, watchers| {
413                if let Some(pos) = watchers.iter().position(|w| w.id == id) {
414                    had_prev_kv = watchers[pos].prev_kv;
415                    watchers.remove(pos);
416                    found = true;
417                }
418                watchers.is_empty()
419            });
420        }
421
422        if found {
423            self.total_count.fetch_sub(1, Ordering::Relaxed);
424            if had_prev_kv {
425                self.prev_kv_watcher_count.fetch_sub(1, Ordering::Relaxed);
426            }
427        }
428    }
429
430    /// Number of active watchers that opted in to prev_kv.
431    pub fn prev_kv_watcher_count(&self) -> usize {
432        self.prev_kv_watcher_count.load(Ordering::Relaxed)
433    }
434
435    /// Clone of the shared prev_kv counter Arc.
436    ///
437    /// Pass this to `DefaultStateMachineHandler` so it can poll the live value
438    /// without holding a reference to the registry.
439    pub fn prev_kv_watcher_count_arc(&self) -> Arc<AtomicUsize> {
440        Arc::clone(&self.prev_kv_watcher_count)
441    }
442
443    #[cfg(test)]
444    pub(crate) fn watcher_count(
445        &self,
446        key: &Bytes,
447    ) -> usize {
448        self.exact.get(key).map(|w| w.len()).unwrap_or(0)
449    }
450
451    #[cfg(test)]
452    pub(crate) fn watched_key_count(&self) -> usize {
453        self.exact.len()
454    }
455
456    #[cfg(test)]
457    pub(crate) fn prefix_watcher_count(
458        &self,
459        prefix: &Bytes,
460    ) -> usize {
461        self.prefix.get(prefix).map(|w| w.len()).unwrap_or(0)
462    }
463}
464
465// ── WatchDispatcher ───────────────────────────────────────────────────────────
466
467/// Watch dispatcher — distributes events to watchers (background task).
468///
469/// Spawned explicitly in NodeBuilder::build() to make resource allocation visible.
470/// Receives proto WatchResponse from the broadcast channel, converts to opaque
471/// WatchEvent per-watcher, and delivers via per-watcher mpsc channels.
472pub struct WatchDispatcher {
473    registry: Arc<WatchRegistry>,
474    broadcast_rx: broadcast::Receiver<WatchResponse>,
475    unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
476    /// Shared applied index for Progress event revision field.
477    last_applied: Arc<std::sync::atomic::AtomicU64>,
478    /// Heartbeat interval.  0 = disabled.
479    heartbeat_interval_ms: u64,
480}
481
482impl WatchDispatcher {
483    pub fn new(
484        registry: Arc<WatchRegistry>,
485        broadcast_rx: broadcast::Receiver<WatchResponse>,
486        unregister_rx: mpsc::UnboundedReceiver<(u64, Bytes)>,
487        last_applied: Arc<std::sync::atomic::AtomicU64>,
488        heartbeat_interval_ms: u64,
489    ) -> Self {
490        Self {
491            registry,
492            broadcast_rx,
493            unregister_rx,
494            last_applied,
495            heartbeat_interval_ms,
496        }
497    }
498
499    pub async fn run(mut self) {
500        debug!("WatchDispatcher started");
501
502        // Build optional heartbeat future.  When interval_ms == 0 the future
503        // is a pending sleep that never fires, adding zero overhead.
504        // Build the heartbeat interval only when enabled.
505        // Constructing `interval_at(now + Duration::from_millis(u64::MAX), ...)` panics on
506        // overflow, so we must skip interval creation entirely when heartbeat is off.
507        let mut heartbeat: Option<tokio::time::Interval> = if self.heartbeat_interval_ms > 0 {
508            let base_ms = self.heartbeat_interval_ms;
509            let jitter = (base_ms / 10).max(1);
510            // Mix thread ID and wall-clock nanoseconds into a single hash so that nodes
511            // started simultaneously (e.g. k8s rolling restart within the same millisecond)
512            // still get different offsets.  No external crate needed.
513            let mut h = DefaultHasher::new();
514            std::thread::current().id().hash(&mut h);
515            std::time::SystemTime::now().hash(&mut h);
516            let seed = h.finish();
517            let offset = seed % (jitter * 2);
518            let first_tick_ms = base_ms.saturating_sub(jitter) + offset;
519            let mut interval = tokio::time::interval_at(
520                tokio::time::Instant::now() + Duration::from_millis(first_tick_ms),
521                Duration::from_millis(base_ms),
522            );
523            // Skip missed ticks: heartbeat is a liveness signal, not a counter.
524            // Bursting N progress events after a slow-watcher stall would mislead clients
525            // into thinking the stream was alive during the stall period.
526            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
527            Some(interval)
528        } else {
529            None
530        };
531
532        loop {
533            tokio::select! {
534                biased;
535                // Cleanup first so dead watchers don't receive the next event
536                Some((id, key)) = self.unregister_rx.recv() => {
537                    self.registry.unregister(id, &key);
538                }
539                result = self.broadcast_rx.recv() => {
540                    match result {
541                        Ok(event) => self.dispatch_event(event).await,
542                        Err(broadcast::error::RecvError::Lagged(n)) => {
543                            warn!("WatchDispatcher lagged {} events (slow watchers)", n);
544                        }
545                        Err(broadcast::error::RecvError::Closed) => {
546                            debug!("Broadcast channel closed, WatchDispatcher stopping");
547                            break;
548                        }
549                    }
550                }
551                Some(t) = async { if let Some(ref mut hb) = heartbeat { Some(hb.tick().await) } else { std::future::pending().await } } => {
552                    let _ = t;
553                    self.broadcast_progress().await;
554                }
555            }
556        }
557        debug!("WatchDispatcher stopped");
558    }
559
560    /// Broadcast a synthetic Progress event to ALL active watchers regardless of key.
561    ///
562    /// Unlike data events (routed by key), Progress is a liveness signal — every watcher
563    /// must receive it so clients can detect silent stream death.
564    /// Keys are collected first to avoid holding DashMap shard locks across awaits.
565    async fn broadcast_progress(&self) {
566        let revision = self.last_applied.load(Ordering::Relaxed);
567        let progress = WatchResponse {
568            key: Bytes::new(),
569            value: Bytes::new(),
570            prev_value: Bytes::new(),
571            event_type: d_engine_proto::client::WatchEventType::Progress as i32,
572            error: 0,
573            revision,
574        };
575
576        let exact_keys: Vec<Bytes> = self.registry.exact.iter().map(|e| e.key().clone()).collect();
577        let prefix_keys: Vec<Bytes> =
578            self.registry.prefix.iter().map(|e| e.key().clone()).collect();
579
580        for key in exact_keys {
581            self.dispatch_to_map(&self.registry.exact, &key, &progress).await;
582        }
583        for key in prefix_keys {
584            self.dispatch_to_map(&self.registry.prefix, &key, &progress).await;
585        }
586    }
587
588    async fn dispatch_event(
589        &self,
590        event: WatchResponse,
591    ) {
592        // Step 1: exact match — O(1) DashMap lookup
593        self.dispatch_to_map(&self.registry.exact, &event.key, &event).await;
594
595        // Step 2: prefix match — O(depth) where depth = number of '/' in key
596        for prefix in prefix_segments(&event.key) {
597            self.dispatch_to_map(&self.registry.prefix, &prefix, &event).await;
598        }
599    }
600
601    /// Dispatch an event to all watchers under `lookup_key` in `map`.
602    ///
603    /// Converts proto WatchResponse → opaque WatchEvent per watcher, respecting
604    /// each watcher's prev_kv preference.  Handles overflow detection and dead
605    /// watcher cleanup identically for both the exact and prefix maps.
606    async fn dispatch_to_map(
607        &self,
608        map: &DashMap<Bytes, Vec<Watcher>>,
609        lookup_key: &Bytes,
610        event: &WatchResponse,
611    ) {
612        if let Some(watchers) = map.get(lookup_key) {
613            let mut dead_watchers = Vec::new();
614
615            for watcher in watchers.iter() {
616                let available = watcher.sender.capacity();
617
618                if available <= 1 {
619                    // capacity == 1: only the reserved cancel slot remains → overflow
620                    // capacity == 0: defensive, shouldn't happen in normal flow
621                    if available == 1 {
622                        warn!(
623                            watcher_id = watcher.id,
624                            key = ?event.key,
625                            buffer_capacity = watcher.sender.max_capacity(),
626                            buffer_len = watcher.sender.max_capacity() - available,
627                            "watcher buffer overflow, sending cancel"
628                        );
629                        let _ = watcher
630                            .sender
631                            .try_send(crate::watch::make_cancel_event(event.key.clone()));
632                    }
633                    dead_watchers.push(watcher.id);
634                    continue;
635                }
636
637                // Convert proto → opaque, respecting per-watcher prev_kv flag
638                let watch_event = proto_to_event(event, watcher.prev_kv);
639
640                // Normal send: reserved slot untouched
641                if let Err(mpsc::error::TrySendError::Closed(_)) =
642                    watcher.sender.try_send(watch_event)
643                {
644                    // Receiver dropped: silent cleanup, no cancel needed
645                    dead_watchers.push(watcher.id);
646                }
647            }
648
649            drop(watchers);
650            for id in dead_watchers {
651                self.registry.unregister(id, lookup_key);
652            }
653
654            trace!(
655                event_key = ?event.key,
656                lookup_key = ?lookup_key,
657                event_type = ?event.event_type,
658                "Event dispatched"
659            );
660        }
661    }
662}