use std::collections::HashMap;
use tracing::{error, warn};
use nodedb_bridge::BridgeError;
use nodedb_bridge::backpressure::{BackpressureConfig, BackpressureController, PressureState};
use nodedb_bridge::buffer::{Consumer, Producer, RingBuffer};
use nodedb_bridge::wfq::WeightedFairQueue;
use nodedb_types::PriorityClass;
use crate::bridge::envelope;
use crate::control::router::vshard::VShardRouter;
use crate::data::eventfd::EventFdNotifier;
#[derive(Debug)]
pub struct BridgeRequest {
pub inner: envelope::Request,
}
#[derive(Debug)]
pub struct BridgeResponse {
pub inner: envelope::Response,
}
pub trait DatabasePriorityResolver: Send + Sync {
fn priority_for(&self, database_id: u64) -> PriorityClass;
}
pub struct DefaultPriorityResolver;
impl DatabasePriorityResolver for DefaultPriorityResolver {
fn priority_for(&self, _database_id: u64) -> PriorityClass {
PriorityClass::Standard
}
}
pub struct CoreChannel {
pub request_tx: Producer<BridgeRequest>,
pub response_rx: Consumer<BridgeResponse>,
pub backpressure: BackpressureController,
pub wfq: WeightedFairQueue<envelope::Request>,
pub db_pressure: HashMap<u64, PressureState>,
pub wake_notifier: Option<EventFdNotifier>,
}
impl CoreChannel {
fn flush_wfq(&mut self) -> usize {
let mut flushed = 0;
while self.request_tx.utilization() < 100 {
let Some(req) = self.wfq.pop_next() else {
break;
};
let db_id = req.database_id.as_u64();
let req_id = req.request_id.as_u64();
match self.request_tx.try_push(BridgeRequest { inner: req }) {
Ok(()) => {
flushed += 1;
self.update_db_pressure(db_id);
}
Err(BridgeError::Full { capacity, pending }) => {
unreachable!(
"SPSC ring reported Full (capacity={capacity}, pending={pending}) \
despite utilization < 100 immediately before push — \
single-producer invariant violated"
);
}
Err(e @ BridgeError::Disconnected { .. }) => {
error!(
request_id = req_id,
database_id = db_id,
"data plane core disconnected during WFQ flush — stopping; request was lost: {e}"
);
break;
}
Err(
e @ (BridgeError::Empty
| BridgeError::Backpressure { .. }
| BridgeError::DeadlineExceeded { .. }),
) => {
unreachable!("Producer::try_push returned non-producer BridgeError: {e}");
}
}
}
flushed
}
fn update_db_pressure(&mut self, database_id: u64) {
let state = if self.wfq.is_suspended_for(database_id) {
PressureState::Suspended
} else if self.wfq.is_throttled_for(database_id) {
PressureState::Throttled
} else {
PressureState::Normal
};
self.db_pressure.insert(database_id, state);
}
}
pub struct CoreChannelDataSide {
pub request_rx: Consumer<BridgeRequest>,
pub response_tx: Producer<BridgeResponse>,
}
pub struct Dispatcher {
cores: Vec<CoreChannel>,
router: VShardRouter,
tenant_inflight: HashMap<u64, u32>,
request_tenant: HashMap<u64, u64>,
max_per_tenant_inflight: u32,
per_core_capacity: u32,
priority_resolver: Box<dyn DatabasePriorityResolver>,
}
impl Dispatcher {
pub fn new(num_cores: usize, queue_capacity: usize) -> (Self, Vec<CoreChannelDataSide>) {
Self::with_resolver(num_cores, queue_capacity, Box::new(DefaultPriorityResolver))
}
pub fn with_resolver(
num_cores: usize,
queue_capacity: usize,
priority_resolver: Box<dyn DatabasePriorityResolver>,
) -> (Self, Vec<CoreChannelDataSide>) {
let mut cores = Vec::with_capacity(num_cores);
let mut data_sides = Vec::with_capacity(num_cores);
for _ in 0..num_cores {
let (req_tx, req_rx) = RingBuffer::channel::<BridgeRequest>(queue_capacity);
let (resp_tx, resp_rx) = RingBuffer::channel::<BridgeResponse>(queue_capacity);
cores.push(CoreChannel {
request_tx: req_tx,
response_rx: resp_rx,
backpressure: BackpressureController::new(BackpressureConfig::default()),
wfq: WeightedFairQueue::new(queue_capacity, queue_capacity),
db_pressure: HashMap::new(),
wake_notifier: None,
});
data_sides.push(CoreChannelDataSide {
request_rx: req_rx,
response_tx: resp_tx,
});
}
let router = VShardRouter::round_robin(num_cores);
let total_capacity = num_cores * queue_capacity;
(
Self {
cores,
router,
tenant_inflight: HashMap::new(),
request_tenant: HashMap::new(),
max_per_tenant_inflight: total_capacity as u32,
per_core_capacity: queue_capacity as u32,
priority_resolver,
},
data_sides,
)
}
pub fn dispatch(&mut self, request: envelope::Request) -> crate::Result<()> {
let tenant_id = request.tenant_id.as_u64();
let req_id = request.request_id.as_u64();
let database_id = request.database_id.as_u64();
if self.max_per_tenant_inflight > 0 {
let inflight = self.tenant_inflight.get(&tenant_id).copied().unwrap_or(0);
if inflight >= self.max_per_tenant_inflight {
return Err(crate::Error::Dispatch {
detail: format!(
"tenant {tenant_id}: queue full ({inflight}/{} in-flight)",
self.max_per_tenant_inflight
),
});
}
}
let core_id =
self.router
.resolve(request.vshard_id)
.ok_or_else(|| crate::Error::Dispatch {
detail: format!("no core for vshard {}", request.vshard_id),
})?;
let channel = &mut self.cores[core_id];
let cls = self.priority_resolver.priority_for(database_id);
channel.wfq.set_priority(database_id, cls);
if channel.wfq.is_suspended_for(database_id) {
return Err(crate::Error::Dispatch {
detail: format!(
"database {database_id}: virtual queue suspended (≥95% of fair share on core {core_id})"
),
});
}
channel
.wfq
.try_enqueue(database_id, request)
.map_err(|_| crate::Error::Dispatch {
detail: format!("core {core_id}: total WFQ capacity exhausted"),
})?;
channel.update_db_pressure(database_id);
channel.flush_wfq();
let util = channel.request_tx.utilization();
if let Some(new_state) = channel.backpressure.update(util) {
warn!(
core_id,
utilization = util,
state = ?new_state,
"backpressure transition"
);
}
*self.tenant_inflight.entry(tenant_id).or_insert(0) += 1;
self.request_tenant.insert(req_id, tenant_id);
if let Some(ref notifier) = channel.wake_notifier {
notifier.notify();
}
Ok(())
}
pub fn tenant_response_received(&mut self, tenant_id: u64) {
if let Some(count) = self.tenant_inflight.get_mut(&tenant_id) {
*count = count.saturating_sub(1);
}
}
pub fn recalculate_tenant_limits(&mut self) {
let active = self.tenant_inflight.len().max(1) as u32;
let total_capacity: u32 = self.cores.len() as u32 * self.per_core_capacity;
self.max_per_tenant_inflight = (total_capacity / active).max(2);
self.tenant_inflight.retain(|_, count| *count > 0);
}
pub fn dispatch_to_core(
&mut self,
core_id: usize,
request: envelope::Request,
) -> crate::Result<()> {
if core_id >= self.cores.len() {
return Err(crate::Error::Dispatch {
detail: format!("core {core_id} out of range (have {})", self.cores.len()),
});
}
let tenant_id = request.tenant_id.as_u64();
let req_id = request.request_id.as_u64();
let database_id = request.database_id.as_u64();
let channel = &mut self.cores[core_id];
let cls = self.priority_resolver.priority_for(database_id);
channel.wfq.set_priority(database_id, cls);
channel
.wfq
.try_enqueue(database_id, request)
.map_err(|_| crate::Error::Dispatch {
detail: format!("core {core_id}: total WFQ capacity exhausted"),
})?;
channel.update_db_pressure(database_id);
channel.flush_wfq();
let util = channel.request_tx.utilization();
if let Some(new_state) = channel.backpressure.update(util) {
warn!(
core_id,
utilization = util,
state = ?new_state,
"backpressure transition"
);
}
*self.tenant_inflight.entry(tenant_id).or_insert(0) += 1;
self.request_tenant.insert(req_id, tenant_id);
if let Some(ref notifier) = channel.wake_notifier {
notifier.notify();
}
Ok(())
}
pub fn max_utilization(&self) -> u8 {
self.cores
.iter()
.map(|c| c.request_tx.utilization())
.max()
.unwrap_or(0)
}
pub fn db_pressure_on_core(&self, core_id: usize, database_id: u64) -> PressureState {
self.cores
.get(core_id)
.and_then(|ch| ch.db_pressure.get(&database_id).copied())
.unwrap_or(PressureState::Normal)
}
pub fn poll_responses(&mut self) -> Vec<envelope::Response> {
let mut responses = Vec::new();
for channel in &mut self.cores {
let mut batch = Vec::new();
channel.response_rx.drain_into(&mut batch, 64);
for br in batch {
let rid = br.inner.request_id.as_u64();
if let Some(tid) = self.request_tenant.remove(&rid)
&& let Some(count) = self.tenant_inflight.get_mut(&tid)
{
*count = count.saturating_sub(1);
}
responses.push(br.inner);
}
channel.flush_wfq();
}
responses
}
pub fn num_cores(&self) -> usize {
self.cores.len()
}
pub fn set_notifier(&mut self, core_id: usize, notifier: EventFdNotifier) {
if let Some(channel) = self.cores.get_mut(core_id) {
channel.wake_notifier = Some(notifier);
}
}
pub fn router(&self) -> &VShardRouter {
&self.router
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bridge::envelope::*;
use crate::bridge::physical_plan::DocumentOp;
use crate::types::*;
use std::time::{Duration, Instant};
fn make_request(vshard: u32) -> envelope::Request {
envelope::Request {
request_id: RequestId::new(1),
tenant_id: TenantId::new(1),
database_id: DatabaseId::DEFAULT,
vshard_id: VShardId::new(vshard),
plan: PhysicalPlan::Document(DocumentOp::PointGet {
collection: "users".into(),
document_id: "u1".into(),
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
}),
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: TraceId::ZERO,
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
}
}
fn make_request_for_db(vshard: u32, db: u64, req_id: u64) -> envelope::Request {
envelope::Request {
request_id: RequestId::new(req_id),
tenant_id: TenantId::new(1),
database_id: DatabaseId::new(db),
vshard_id: VShardId::new(vshard),
plan: PhysicalPlan::Document(DocumentOp::PointGet {
collection: "c".into(),
document_id: "d".into(),
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
}),
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: TraceId::ZERO,
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
}
}
#[test]
fn dispatch_routes_to_correct_core() {
let (mut dispatcher, data_sides) = Dispatcher::new(4, 64);
dispatcher.dispatch(make_request(0)).unwrap();
dispatcher.dispatch(make_request(1)).unwrap();
dispatcher.dispatch(make_request(4)).unwrap();
assert_eq!(data_sides[0].request_rx.len(), 2);
assert_eq!(data_sides[1].request_rx.len(), 1);
assert_eq!(data_sides[2].request_rx.len(), 0);
}
#[test]
fn response_roundtrip() {
let (mut dispatcher, mut data_sides) = Dispatcher::new(2, 64);
dispatcher.dispatch(make_request(0)).unwrap();
let _req = data_sides[0].request_rx.try_pop().unwrap();
data_sides[0]
.response_tx
.try_push(BridgeResponse {
inner: envelope::Response {
request_id: RequestId::new(1),
status: Status::Ok,
attempt: 1,
partial: false,
payload: Payload::from_vec(b"result".to_vec()),
watermark_lsn: Lsn::new(42),
error_code: None,
},
})
.unwrap();
let responses = dispatcher.poll_responses();
assert_eq!(responses.len(), 1);
assert_eq!(responses[0].status, Status::Ok);
assert_eq!(&*responses[0].payload, b"result");
}
#[test]
fn full_queue_returns_error() {
let (mut dispatcher, _data_sides) = Dispatcher::new(1, 4);
for i in 0..4u64 {
dispatcher
.dispatch(make_request_for_db(0, i + 1, i + 1))
.unwrap();
}
let result = dispatcher.dispatch(make_request_for_db(0, 99, 99));
assert!(result.is_err());
}
#[test]
fn dispatch_to_core_tracks_request_lifecycle() {
let (mut dispatcher, mut data_sides) = Dispatcher::new(2, 64);
let request = make_request(0);
let tenant_id = request.tenant_id.as_u64();
let request_id = request.request_id.as_u64();
dispatcher.dispatch_to_core(1, request).unwrap();
assert_eq!(dispatcher.tenant_inflight.get(&tenant_id), Some(&1));
assert_eq!(dispatcher.request_tenant.get(&request_id), Some(&tenant_id));
assert_eq!(data_sides[1].request_rx.len(), 1);
let _req = data_sides[1].request_rx.try_pop().unwrap();
data_sides[1]
.response_tx
.try_push(BridgeResponse {
inner: envelope::Response {
request_id: RequestId::new(request_id),
status: Status::Ok,
attempt: 1,
partial: false,
payload: Payload::empty(),
watermark_lsn: Lsn::ZERO,
error_code: None,
},
})
.unwrap();
let responses = dispatcher.poll_responses();
assert_eq!(responses.len(), 1);
assert_eq!(dispatcher.tenant_inflight.get(&tenant_id), Some(&0));
assert!(!dispatcher.request_tenant.contains_key(&request_id));
}
#[test]
fn per_db_pressure_reported() {
let (mut dispatcher, _) = Dispatcher::new(1, 8);
for i in 0..4u64 {
dispatcher
.dispatch(make_request_for_db(0, 1, i + 10))
.unwrap();
}
for i in 0..4u64 {
dispatcher
.dispatch(make_request_for_db(0, 2, i + 20))
.unwrap();
}
let _ = dispatcher.db_pressure_on_core(0, 1);
let _ = dispatcher.db_pressure_on_core(0, 2);
}
}