nodedb 0.3.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
// SPDX-License-Identifier: BUSL-1.1

//! Event bus: per-core ring buffers from Data Plane to Event Plane.
//!
//! Same design pattern as the SPSC bridge (Control → Data), but in the
//! opposite direction for events. One ring buffer per Data Plane core —
//! no cross-core contention.
//!
//! ```text
//! Data Plane Core 0 ──► [Bounded Ring Buffer 0] ──► Event Plane Consumer 0
//! Data Plane Core 1 ──► [Bounded Ring Buffer 1] ──► Event Plane Consumer 1
//! ...
//! Data Plane Core N ──► [Bounded Ring Buffer N] ──► Event Plane Consumer N
//! ```

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use nodedb_bridge::backpressure::{BackpressureConfig, BackpressureController, PressureState};
use nodedb_bridge::buffer::{Consumer, Producer, RingBuffer};
use nodedb_bridge::error::BridgeError;

use super::types::WriteEvent;

/// Default ring buffer capacity per core (must be power of two).
const DEFAULT_EVENT_BUS_CAPACITY: usize = 65_536;

/// The producer half given to a Data Plane core.
///
/// `Send` at the trait level, but **logically owned by exactly one
/// thread once delivered to its core**. The `Send` bound is required
/// during setup: `create_event_bus` returns a `Vec<EventProducer>` that
/// the runtime distributes to per-core worker threads via `tokio::spawn`
/// / `LocalSet::spawn_local`, and the distribute step is the only
/// cross-thread move ever performed. After that move the producer never
/// leaves its core.
///
/// The SPSC invariant (single writer at any time) is enforced statically
/// by `&mut self` on [`Self::emit`]: there can be at most one mutable
/// borrow, and that borrow lives on the owning thread. A future refactor
/// that shares the producer across threads would have to either clone it
/// (no `Clone` impl exists) or wrap it in a lock (deliberately not
/// provided) — both surface in code review.
pub struct EventProducer {
    inner: Producer<WriteEvent>,
    core_id: usize,
    backpressure: Arc<BackpressureController>,
    /// Latched once the consumer half is dropped, so we log the
    /// disconnect exactly once per producer instead of spamming
    /// a warning for every dropped event.
    disconnect_logged: AtomicBool,
}

impl EventProducer {
    /// Try to emit a write event. Returns `true` if enqueued, `false` if dropped.
    ///
    /// The Data Plane NEVER blocks waiting for the Event Plane to process —
    /// fire-and-forget into the ring buffer. Dropped events are WAL-backed:
    /// the Event Plane detects gaps via sequence numbers and replays from WAL.
    ///
    /// Updates backpressure state after each emit. When Suspended (>95%),
    /// events are dropped more aggressively (the Event Plane will enter
    /// WAL Catchup Mode to recover).
    pub fn emit(&mut self, event: WriteEvent) -> bool {
        let util = self.inner.utilization();

        // Update backpressure state.
        if let Some(new_state) = self.backpressure.update(util) {
            match new_state {
                PressureState::Throttled => {
                    tracing::info!(
                        core = self.core_id,
                        utilization = util,
                        "event bus backpressure: THROTTLED (>85%)"
                    );
                }
                PressureState::Suspended => {
                    tracing::warn!(
                        core = self.core_id,
                        utilization = util,
                        "event bus backpressure: SUSPENDED (>95%) — events will be dropped, WAL catchup needed"
                    );
                }
                PressureState::Normal => {
                    tracing::info!(
                        core = self.core_id,
                        utilization = util,
                        "event bus backpressure: NORMAL"
                    );
                }
            }
        }

        match self.inner.try_push(event) {
            Ok(()) => true,
            Err(BridgeError::Full { .. }) => {
                tracing::warn!(
                    core = self.core_id,
                    utilization = util,
                    "event bus full — event dropped (WAL-backed, will replay on gap)"
                );
                false
            }
            Err(BridgeError::Disconnected { .. }) => {
                // Consumer dropped — Event Plane is gone (shutdown or lifecycle bug).
                // Log once per producer; further events would just be noise.
                if !self.disconnect_logged.swap(true, Ordering::Relaxed) {
                    tracing::warn!(
                        core = self.core_id,
                        "event bus consumer disconnected — Event Plane is not running; \
                         events will be silently dropped on this core until restart"
                    );
                }
                false
            }
            Err(e) => {
                tracing::warn!(
                    core = self.core_id,
                    error = %e,
                    "event bus push failed — event dropped"
                );
                false
            }
        }
    }

    pub fn core_id(&self) -> usize {
        self.core_id
    }

    /// Current ring buffer utilization as a percentage (0–100).
    pub fn utilization(&self) -> u8 {
        self.inner.utilization()
    }

    /// Current backpressure state.
    pub fn pressure_state(&self) -> PressureState {
        self.backpressure.state()
    }

    /// Whether the producer has observed the consumer half being dropped.
    ///
    /// Latched on the first disconnect detection — once `true`, stays `true`
    /// for the producer's lifetime. A `true` reading means the Event Plane
    /// is gone (lifecycle bug or shutdown) and every subsequent `emit` will
    /// be silently dropped at the bus layer; the WAL replay path is the
    /// only recovery.
    pub fn is_consumer_disconnected(&self) -> bool {
        self.disconnect_logged.load(Ordering::Relaxed)
    }
}

/// The consumer half given to an Event Plane consumer task.
pub struct EventConsumerRx {
    inner: Consumer<WriteEvent>,
    core_id: usize,
    backpressure: Arc<BackpressureController>,
}

impl EventConsumerRx {
    /// Try to dequeue the next event. Returns `None` if the buffer is empty.
    pub fn try_recv(&mut self) -> Option<WriteEvent> {
        self.inner.try_pop().ok()
    }

    pub fn core_id(&self) -> usize {
        self.core_id
    }

    /// Current backpressure state (read from the shared controller).
    pub fn pressure_state(&self) -> PressureState {
        self.backpressure.state()
    }
}

/// Creates the event bus: one ring buffer pair per Data Plane core.
///
/// Returns `(producers, consumers)` — producers go to Data Plane cores,
/// consumers go to Event Plane Tokio tasks.
pub fn create_event_bus(num_cores: usize) -> (Vec<EventProducer>, Vec<EventConsumerRx>) {
    create_event_bus_with_capacity(num_cores, DEFAULT_EVENT_BUS_CAPACITY)
}

/// Creates the event bus with a custom ring buffer capacity.
pub fn create_event_bus_with_capacity(
    num_cores: usize,
    capacity: usize,
) -> (Vec<EventProducer>, Vec<EventConsumerRx>) {
    let mut producers = Vec::with_capacity(num_cores);
    let mut consumers = Vec::with_capacity(num_cores);

    for core_id in 0..num_cores {
        let (producer, consumer) = RingBuffer::channel::<WriteEvent>(capacity);
        let backpressure = Arc::new(BackpressureController::new(BackpressureConfig::default()));

        producers.push(EventProducer {
            inner: producer,
            core_id,
            backpressure: Arc::clone(&backpressure),
            disconnect_logged: AtomicBool::new(false),
        });

        consumers.push(EventConsumerRx {
            inner: consumer,
            core_id,
            backpressure,
        });
    }

    (producers, consumers)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::event::types::{EventSource, RowId, WriteOp};
    use crate::types::{Lsn, TenantId, VShardId};
    use std::sync::Arc;

    fn make_event(seq: u64) -> WriteEvent {
        WriteEvent {
            sequence: seq,
            collection: Arc::from("test"),
            op: WriteOp::Insert,
            row_id: RowId::new("row-1"),
            lsn: Lsn::new(seq),
            tenant_id: TenantId::new(1),
            vshard_id: VShardId::new(0),
            source: EventSource::User,
            new_value: Some(Arc::from(b"data".as_slice())),
            old_value: None,
            system_time_ms: None,
            valid_time_ms: None,
            user_id: None,
            statement_digest: None,
        }
    }

    #[test]
    fn single_core_roundtrip() {
        let (mut producers, mut consumers) = create_event_bus_with_capacity(1, 16);
        let producer = &mut producers[0];
        let consumer = &mut consumers[0];

        assert!(producer.emit(make_event(1)));
        assert!(producer.emit(make_event(2)));

        let e1 = consumer.try_recv().expect("should have event");
        assert_eq!(e1.sequence, 1);

        let e2 = consumer.try_recv().expect("should have event");
        assert_eq!(e2.sequence, 2);

        assert!(consumer.try_recv().is_none());
    }

    #[test]
    fn multi_core_isolation() {
        let (mut producers, mut consumers) = create_event_bus_with_capacity(4, 16);

        // Each core emits to its own buffer.
        for (i, p) in producers.iter_mut().enumerate() {
            assert!(p.emit(make_event(i as u64)));
        }

        // Each consumer sees only its core's event.
        for (i, c) in consumers.iter_mut().enumerate() {
            let event = c.try_recv().expect("should have event");
            assert_eq!(event.sequence, i as u64);
            assert!(c.try_recv().is_none());
        }
    }

    #[test]
    fn full_buffer_drops_event() {
        // Capacity 4 → rounded to 4 (already power of two).
        let (mut producers, _consumers) = create_event_bus_with_capacity(1, 4);
        let producer = &mut producers[0];

        // Fill the buffer.
        for i in 0..4 {
            assert!(producer.emit(make_event(i)));
        }

        // Next emit should fail (buffer full).
        assert!(!producer.emit(make_event(99)));
    }

    #[test]
    fn core_id_propagated() {
        let (producers, consumers) = create_event_bus(2);
        assert_eq!(producers[0].core_id(), 0);
        assert_eq!(producers[1].core_id(), 1);
        assert_eq!(consumers[0].core_id(), 0);
        assert_eq!(consumers[1].core_id(), 1);
    }

    // ─────────────────────────────────────────────────────────────────────
    // Consumer-disconnect classification.
    //
    // When the consumer half is dropped (e.g. the EventPlane handle that
    // owns the consumer tasks is itself dropped), the producer must
    // distinguish the resulting failure from a "buffer full" failure.
    // Conflating the two — by way of an `Err(_)` catch-all on `try_push` —
    // surfaces as a misleading "event bus full, utilization=0" warning at
    // boot and silently loses every subsequent emit on that core. The
    // post-fix code latches a single clear "consumer disconnected" warning
    // and exposes the latched state for runtime inspection.
    // ─────────────────────────────────────────────────────────────────────

    #[test]
    fn emit_after_consumer_drop_is_classified_as_disconnect_not_full() {
        let (mut producers, consumers) = create_event_bus_with_capacity(1, 16);
        let producer = &mut producers[0];

        // Sanity check: a fresh bus is not in disconnected state.
        assert!(
            !producer.is_consumer_disconnected(),
            "fresh producer must not report disconnect before consumer drop"
        );

        // Simulate the lifecycle bug: the EventPlane handle (which owns
        // the consumer side) is dropped while producers are still live.
        drop(consumers);

        // The first emit detects the disconnect.
        assert!(
            !producer.emit(make_event(1)),
            "emit must report failure when consumer is gone"
        );
        assert!(
            producer.is_consumer_disconnected(),
            "producer must classify a dropped-consumer failure as disconnect, \
             not as a generic push failure or a full buffer"
        );

        // Regression guard against the specific failure mode: utilization
        // must read 0 (the buffer is empty — the consumer was never there
        // to drain it). A non-zero utilization paired with a failing emit
        // would mean the producer is misclassifying disconnect as Full.
        assert_eq!(
            producer.utilization(),
            0,
            "utilization must be 0 after consumer drop; a non-zero reading \
             means the producer is mistaking disconnect for buffer-full"
        );
    }

    #[test]
    fn emit_after_consumer_drop_continues_to_fail_silently() {
        // Once the consumer is gone, every subsequent emit on that core
        // must return false. The producer must not silently start
        // succeeding (e.g. by re-creating an internal queue) — recovery
        // requires a full restart and WAL replay on the Event Plane side.
        let (mut producers, consumers) = create_event_bus_with_capacity(1, 16);
        let producer = &mut producers[0];
        drop(consumers);

        for seq in 1..=10 {
            assert!(
                !producer.emit(make_event(seq)),
                "emit #{seq} after consumer drop must fail"
            );
        }
        assert!(producer.is_consumer_disconnected());
    }

    #[test]
    fn full_buffer_does_not_set_disconnected_flag() {
        // Regression guard against the inverse misclassification: a
        // legitimately-full buffer (consumer is alive but slow) must NOT
        // be reported as a consumer disconnect. Conflating the two would
        // make every backpressure event look like a fatal lifecycle bug
        // and trigger the latched "disconnected" log + metric.
        let (mut producers, _consumers) = create_event_bus_with_capacity(1, 4);
        let producer = &mut producers[0];

        // Fill the buffer (consumer is held by `_consumers` and never drains).
        for i in 0..4 {
            assert!(producer.emit(make_event(i)));
        }
        // Next push fails because buffer is full, not because consumer is gone.
        assert!(!producer.emit(make_event(99)));
        assert!(
            !producer.is_consumer_disconnected(),
            "a full buffer must not be classified as a consumer disconnect; \
             the consumer half is alive and will eventually drain"
        );
    }

    #[test]
    fn consumer_drop_is_isolated_to_its_own_core() {
        // The bus is per-core. Dropping one core's consumer must not
        // affect any other core's producer. This protects against a
        // refactor that shares disconnect state across cores (e.g. via a
        // single global `AtomicBool`), which would surface as one slow
        // consumer poisoning every core's bus.
        let (mut producers, mut consumers) = create_event_bus_with_capacity(3, 16);

        // Drop only core 0's consumer. Cores 1 and 2 stay alive.
        let core0_consumer = consumers.remove(0);
        drop(core0_consumer);

        // Core 0 producer detects disconnect.
        assert!(!producers[0].emit(make_event(1)));
        assert!(producers[0].is_consumer_disconnected());

        // Core 1 and core 2 producers continue to operate normally.
        assert!(producers[1].emit(make_event(1)));
        assert!(!producers[1].is_consumer_disconnected());
        assert!(producers[2].emit(make_event(1)));
        assert!(!producers[2].is_consumer_disconnected());

        // Verify the events landed in the correct (still-alive) consumers.
        // (consumers[0] in the original index 1 is now consumers[0] after remove.)
        assert_eq!(
            consumers[0]
                .try_recv()
                .expect("core 1 should have event")
                .sequence,
            1
        );
        assert_eq!(
            consumers[1]
                .try_recv()
                .expect("core 2 should have event")
                .sequence,
            1
        );
    }
}