use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use nodedb_bridge::backpressure::{BackpressureConfig, BackpressureController, PressureState};
use nodedb_bridge::buffer::{Consumer, Producer, RingBuffer};
use nodedb_bridge::error::BridgeError;
use super::types::WriteEvent;
const DEFAULT_EVENT_BUS_CAPACITY: usize = 65_536;
pub struct EventProducer {
inner: Producer<WriteEvent>,
core_id: usize,
backpressure: Arc<BackpressureController>,
disconnect_logged: AtomicBool,
}
impl EventProducer {
pub fn emit(&mut self, event: WriteEvent) -> bool {
let util = self.inner.utilization();
if let Some(new_state) = self.backpressure.update(util) {
match new_state {
PressureState::Throttled => {
tracing::info!(
core = self.core_id,
utilization = util,
"event bus backpressure: THROTTLED (>85%)"
);
}
PressureState::Suspended => {
tracing::warn!(
core = self.core_id,
utilization = util,
"event bus backpressure: SUSPENDED (>95%) — events will be dropped, WAL catchup needed"
);
}
PressureState::Normal => {
tracing::info!(
core = self.core_id,
utilization = util,
"event bus backpressure: NORMAL"
);
}
}
}
match self.inner.try_push(event) {
Ok(()) => true,
Err(BridgeError::Full { .. }) => {
tracing::warn!(
core = self.core_id,
utilization = util,
"event bus full — event dropped (WAL-backed, will replay on gap)"
);
false
}
Err(BridgeError::Disconnected { .. }) => {
if !self.disconnect_logged.swap(true, Ordering::Relaxed) {
tracing::warn!(
core = self.core_id,
"event bus consumer disconnected — Event Plane is not running; \
events will be silently dropped on this core until restart"
);
}
false
}
Err(e) => {
tracing::warn!(
core = self.core_id,
error = %e,
"event bus push failed — event dropped"
);
false
}
}
}
pub fn core_id(&self) -> usize {
self.core_id
}
pub fn utilization(&self) -> u8 {
self.inner.utilization()
}
pub fn pressure_state(&self) -> PressureState {
self.backpressure.state()
}
pub fn is_consumer_disconnected(&self) -> bool {
self.disconnect_logged.load(Ordering::Relaxed)
}
}
pub struct EventConsumerRx {
inner: Consumer<WriteEvent>,
core_id: usize,
backpressure: Arc<BackpressureController>,
}
impl EventConsumerRx {
pub fn try_recv(&mut self) -> Option<WriteEvent> {
self.inner.try_pop().ok()
}
pub fn core_id(&self) -> usize {
self.core_id
}
pub fn pressure_state(&self) -> PressureState {
self.backpressure.state()
}
}
pub fn create_event_bus(num_cores: usize) -> (Vec<EventProducer>, Vec<EventConsumerRx>) {
create_event_bus_with_capacity(num_cores, DEFAULT_EVENT_BUS_CAPACITY)
}
pub fn create_event_bus_with_capacity(
num_cores: usize,
capacity: usize,
) -> (Vec<EventProducer>, Vec<EventConsumerRx>) {
let mut producers = Vec::with_capacity(num_cores);
let mut consumers = Vec::with_capacity(num_cores);
for core_id in 0..num_cores {
let (producer, consumer) = RingBuffer::channel::<WriteEvent>(capacity);
let backpressure = Arc::new(BackpressureController::new(BackpressureConfig::default()));
producers.push(EventProducer {
inner: producer,
core_id,
backpressure: Arc::clone(&backpressure),
disconnect_logged: AtomicBool::new(false),
});
consumers.push(EventConsumerRx {
inner: consumer,
core_id,
backpressure,
});
}
(producers, consumers)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::types::{EventSource, RowId, WriteOp};
use crate::types::{Lsn, TenantId, VShardId};
use std::sync::Arc;
fn make_event(seq: u64) -> WriteEvent {
WriteEvent {
sequence: seq,
collection: Arc::from("test"),
op: WriteOp::Insert,
row_id: RowId::new("row-1"),
lsn: Lsn::new(seq),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(0),
source: EventSource::User,
new_value: Some(Arc::from(b"data".as_slice())),
old_value: None,
system_time_ms: None,
valid_time_ms: None,
user_id: None,
statement_digest: None,
}
}
#[test]
fn single_core_roundtrip() {
let (mut producers, mut consumers) = create_event_bus_with_capacity(1, 16);
let producer = &mut producers[0];
let consumer = &mut consumers[0];
assert!(producer.emit(make_event(1)));
assert!(producer.emit(make_event(2)));
let e1 = consumer.try_recv().expect("should have event");
assert_eq!(e1.sequence, 1);
let e2 = consumer.try_recv().expect("should have event");
assert_eq!(e2.sequence, 2);
assert!(consumer.try_recv().is_none());
}
#[test]
fn multi_core_isolation() {
let (mut producers, mut consumers) = create_event_bus_with_capacity(4, 16);
for (i, p) in producers.iter_mut().enumerate() {
assert!(p.emit(make_event(i as u64)));
}
for (i, c) in consumers.iter_mut().enumerate() {
let event = c.try_recv().expect("should have event");
assert_eq!(event.sequence, i as u64);
assert!(c.try_recv().is_none());
}
}
#[test]
fn full_buffer_drops_event() {
let (mut producers, _consumers) = create_event_bus_with_capacity(1, 4);
let producer = &mut producers[0];
for i in 0..4 {
assert!(producer.emit(make_event(i)));
}
assert!(!producer.emit(make_event(99)));
}
#[test]
fn core_id_propagated() {
let (producers, consumers) = create_event_bus(2);
assert_eq!(producers[0].core_id(), 0);
assert_eq!(producers[1].core_id(), 1);
assert_eq!(consumers[0].core_id(), 0);
assert_eq!(consumers[1].core_id(), 1);
}
#[test]
fn emit_after_consumer_drop_is_classified_as_disconnect_not_full() {
let (mut producers, consumers) = create_event_bus_with_capacity(1, 16);
let producer = &mut producers[0];
assert!(
!producer.is_consumer_disconnected(),
"fresh producer must not report disconnect before consumer drop"
);
drop(consumers);
assert!(
!producer.emit(make_event(1)),
"emit must report failure when consumer is gone"
);
assert!(
producer.is_consumer_disconnected(),
"producer must classify a dropped-consumer failure as disconnect, \
not as a generic push failure or a full buffer"
);
assert_eq!(
producer.utilization(),
0,
"utilization must be 0 after consumer drop; a non-zero reading \
means the producer is mistaking disconnect for buffer-full"
);
}
#[test]
fn emit_after_consumer_drop_continues_to_fail_silently() {
let (mut producers, consumers) = create_event_bus_with_capacity(1, 16);
let producer = &mut producers[0];
drop(consumers);
for seq in 1..=10 {
assert!(
!producer.emit(make_event(seq)),
"emit #{seq} after consumer drop must fail"
);
}
assert!(producer.is_consumer_disconnected());
}
#[test]
fn full_buffer_does_not_set_disconnected_flag() {
let (mut producers, _consumers) = create_event_bus_with_capacity(1, 4);
let producer = &mut producers[0];
for i in 0..4 {
assert!(producer.emit(make_event(i)));
}
assert!(!producer.emit(make_event(99)));
assert!(
!producer.is_consumer_disconnected(),
"a full buffer must not be classified as a consumer disconnect; \
the consumer half is alive and will eventually drain"
);
}
#[test]
fn consumer_drop_is_isolated_to_its_own_core() {
let (mut producers, mut consumers) = create_event_bus_with_capacity(3, 16);
let core0_consumer = consumers.remove(0);
drop(core0_consumer);
assert!(!producers[0].emit(make_event(1)));
assert!(producers[0].is_consumer_disconnected());
assert!(producers[1].emit(make_event(1)));
assert!(!producers[1].is_consumer_disconnected());
assert!(producers[2].emit(make_event(1)));
assert!(!producers[2].is_consumer_disconnected());
assert_eq!(
consumers[0]
.try_recv()
.expect("core 1 should have event")
.sequence,
1
);
assert_eq!(
consumers[1]
.try_recv()
.expect("core 2 should have event")
.sequence,
1
);
}
}