use tracing::warn;
use nodedb_bridge::backpressure::{BackpressureConfig, BackpressureController};
use nodedb_bridge::buffer::{Consumer, Producer, RingBuffer};
use crate::bridge::envelope;
use crate::control::router::vshard::VShardRouter;
use crate::data::eventfd::EventFdNotifier;
#[derive(Debug)]
pub struct BridgeRequest {
pub inner: envelope::Request,
}
#[derive(Debug)]
pub struct BridgeResponse {
pub inner: envelope::Response,
}
pub struct CoreChannel {
pub request_tx: Producer<BridgeRequest>,
pub response_rx: Consumer<BridgeResponse>,
pub backpressure: BackpressureController,
pub wake_notifier: Option<EventFdNotifier>,
}
pub struct CoreChannelDataSide {
pub request_rx: Consumer<BridgeRequest>,
pub response_tx: Producer<BridgeResponse>,
}
pub struct Dispatcher {
cores: Vec<CoreChannel>,
router: VShardRouter,
}
impl Dispatcher {
pub fn new(num_cores: usize, queue_capacity: usize) -> (Self, Vec<CoreChannelDataSide>) {
let mut cores = Vec::with_capacity(num_cores);
let mut data_sides = Vec::with_capacity(num_cores);
for _ in 0..num_cores {
let (req_tx, req_rx) = RingBuffer::channel::<BridgeRequest>(queue_capacity);
let (resp_tx, resp_rx) = RingBuffer::channel::<BridgeResponse>(queue_capacity);
cores.push(CoreChannel {
request_tx: req_tx,
response_rx: resp_rx,
backpressure: BackpressureController::new(BackpressureConfig::default()),
wake_notifier: None,
});
data_sides.push(CoreChannelDataSide {
request_rx: req_rx,
response_tx: resp_tx,
});
}
let router = VShardRouter::round_robin(num_cores);
(Self { cores, router }, data_sides)
}
pub fn dispatch(&mut self, request: envelope::Request) -> crate::Result<()> {
let core_id =
self.router
.resolve(request.vshard_id)
.ok_or_else(|| crate::Error::Dispatch {
detail: format!("no core for vshard {}", request.vshard_id),
})?;
let channel = &mut self.cores[core_id];
let util = channel.request_tx.utilization();
if let Some(new_state) = channel.backpressure.update(util) {
warn!(
core_id,
utilization = util,
state = ?new_state,
"backpressure transition"
);
}
channel
.request_tx
.try_push(BridgeRequest { inner: request })
.map_err(|e| crate::Error::Dispatch {
detail: format!("core {core_id}: {e}"),
})?;
if let Some(ref notifier) = channel.wake_notifier {
notifier.notify();
}
Ok(())
}
pub fn dispatch_to_core(
&mut self,
core_id: usize,
request: envelope::Request,
) -> crate::Result<()> {
if core_id >= self.cores.len() {
return Err(crate::Error::Dispatch {
detail: format!("core {core_id} out of range (have {})", self.cores.len()),
});
}
let channel = &mut self.cores[core_id];
channel
.request_tx
.try_push(BridgeRequest { inner: request })
.map_err(|e| crate::Error::Dispatch {
detail: format!("core {core_id}: {e}"),
})?;
if let Some(ref notifier) = channel.wake_notifier {
notifier.notify();
}
Ok(())
}
pub fn poll_responses(&mut self) -> Vec<envelope::Response> {
let mut responses = Vec::new();
for channel in &mut self.cores {
let mut batch = Vec::new();
channel.response_rx.drain_into(&mut batch, 64);
for br in batch {
responses.push(br.inner);
}
}
responses
}
pub fn num_cores(&self) -> usize {
self.cores.len()
}
pub fn set_notifier(&mut self, core_id: usize, notifier: EventFdNotifier) {
if let Some(channel) = self.cores.get_mut(core_id) {
channel.wake_notifier = Some(notifier);
}
}
pub fn router(&self) -> &VShardRouter {
&self.router
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bridge::envelope::*;
use crate::bridge::physical_plan::DocumentOp;
use crate::types::*;
use std::time::{Duration, Instant};
fn make_request(vshard: u16) -> envelope::Request {
envelope::Request {
request_id: RequestId::new(1),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(vshard),
plan: PhysicalPlan::Document(DocumentOp::PointGet {
collection: "users".into(),
document_id: "u1".into(),
rls_filters: Vec::new(),
}),
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: 0,
consistency: ReadConsistency::Strong,
idempotency_key: None,
}
}
#[test]
fn dispatch_routes_to_correct_core() {
let (mut dispatcher, data_sides) = Dispatcher::new(4, 64);
dispatcher.dispatch(make_request(0)).unwrap();
dispatcher.dispatch(make_request(1)).unwrap();
dispatcher.dispatch(make_request(4)).unwrap();
assert_eq!(data_sides[0].request_rx.len(), 2);
assert_eq!(data_sides[1].request_rx.len(), 1);
assert_eq!(data_sides[2].request_rx.len(), 0);
}
#[test]
fn response_roundtrip() {
let (mut dispatcher, mut data_sides) = Dispatcher::new(2, 64);
dispatcher.dispatch(make_request(0)).unwrap();
let _req = data_sides[0].request_rx.try_pop().unwrap();
data_sides[0]
.response_tx
.try_push(BridgeResponse {
inner: envelope::Response {
request_id: RequestId::new(1),
status: Status::Ok,
attempt: 1,
partial: false,
payload: Payload::from_arc(std::sync::Arc::from(b"result".as_slice())),
watermark_lsn: Lsn::new(42),
error_code: None,
},
})
.unwrap();
let responses = dispatcher.poll_responses();
assert_eq!(responses.len(), 1);
assert_eq!(responses[0].status, Status::Ok);
assert_eq!(&*responses[0].payload, b"result");
}
#[test]
fn full_queue_returns_error() {
let (mut dispatcher, _data_sides) = Dispatcher::new(1, 4);
for _ in 0..4 {
dispatcher.dispatch(make_request(0)).unwrap();
}
let result = dispatcher.dispatch(make_request(0));
assert!(result.is_err());
}
}