use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::{debug, trace, warn};
use crate::config::SlotBusConfig;
use crate::error::SlotBusError;
use crate::events::NamedEvent;
use crate::region::{self, ShmRegion};
use crate::types::*;
#[derive(Debug)]
pub struct Request {
pub req_id: String,
pub method: String,
pub path: String,
pub route_pattern: String,
pub path_params: HashMap<String, String>,
pub query: Option<String>,
pub body: Vec<u8>,
pub headers: HashMap<String, String>,
}
#[derive(Debug)]
pub struct Response {
pub status: u16,
pub body: Vec<u8>,
pub content_type: String,
pub headers: Vec<(String, String)>,
}
type PendingEntry = (Instant, String, oneshot::Sender<Response>);
pub struct SlotBus {
config: SlotBusConfig,
region: Arc<ShmRegion>,
req_event: Arc<NamedEvent>,
rsp_event: Arc<NamedEvent>,
pending: Arc<std::sync::Mutex<HashMap<String, PendingEntry>>>,
overflow_regions: Arc<std::sync::Mutex<HashMap<usize, ShmRegion>>>,
running: Arc<AtomicBool>,
}
impl SlotBus {
pub fn create(config: SlotBusConfig) -> Result<Self, SlotBusError> {
let region_name = config.region_name();
let mut region = ShmRegion::create_or_open(®ion_name, config.region_size)?;
region.init_control(&config);
let req_event = NamedEvent::create(&config.request_event_name())?;
let rsp_event = NamedEvent::create(&config.response_event_name())?;
debug!(
name = config.name,
region = region_name,
slots = config.num_slots,
"created slotbus region + events"
);
Ok(Self {
config,
region: Arc::new(region),
req_event: Arc::new(req_event),
rsp_event: Arc::new(rsp_event),
pending: Arc::new(std::sync::Mutex::new(HashMap::new())),
overflow_regions: Arc::new(std::sync::Mutex::new(HashMap::new())),
running: Arc::new(AtomicBool::new(true)),
})
}
pub fn config(&self) -> &SlotBusConfig {
&self.config
}
pub fn region_name(&self) -> String {
self.config.region_name()
}
pub fn dispatch(
&self,
req_id: &str,
method: &str,
meta: &RequestMeta,
body: &[u8],
) -> Result<oneshot::Receiver<Response>, SlotBusError> {
self.region.try_reset_heap();
let slot_index = region::claim_free_slot(&self.region)
.ok_or(SlotBusError::NoFreeSlots(self.config.num_slots))?;
let meta_bytes = postcard::to_allocvec(meta)?;
let method_u8 = method_to_u8(method);
self.overflow_regions.lock().unwrap().remove(&slot_index);
let overflow = region::write_request(
&self.region,
slot_index,
req_id,
method_u8,
&meta_bytes,
body,
&self.config,
)?;
if let Some(ovf) = overflow {
self.overflow_regions
.lock()
.unwrap()
.insert(slot_index, ovf);
}
let (tx, rx) = oneshot::channel();
let label = format!("{method} {}", meta.path);
self.pending
.lock()
.unwrap()
.insert(req_id.to_string(), (Instant::now(), label, tx));
self.req_event.signal();
trace!(slot = slot_index, req_id, method, path = %meta.path, "dispatched request");
Ok(rx)
}
pub fn start_response_watcher(&self) -> std::thread::JoinHandle<()> {
let region = Arc::clone(&self.region);
let rsp_event = Arc::clone(&self.rsp_event);
let pending = Arc::clone(&self.pending);
let overflow_regions = Arc::clone(&self.overflow_regions);
let config = self.config.clone();
let running = Arc::clone(&self.running);
std::thread::Builder::new()
.name(format!("{}-slotbus-rsp", config.name))
.spawn(move || {
loop {
if !running.load(Ordering::Relaxed) {
break;
}
rsp_event.wait_timeout(config.wait_timeout_ms);
let mut freed_any = false;
for i in 0..region.num_slots() {
let slot = unsafe { region.slot(i) };
if slot.status.load(Ordering::Acquire) != SLOT_DONE {
continue;
}
let req_id = {
let raw = &slot.req_id;
let end = raw.iter().position(|&b| b == 0).unwrap_or(36);
String::from_utf8_lossy(&raw[..end]).to_string()
};
let resp_result = region::read_response(®ion, i, &config);
if slot
.status
.compare_exchange(
SLOT_DONE,
SLOT_FREE,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
continue;
}
freed_any = true;
overflow_regions.lock().unwrap().remove(&i);
match resp_result {
Ok((status, meta, body)) => {
if let Some((t0, label, tx)) =
pending.lock().unwrap().remove(&req_id)
{
if config.instrumentation {
let rtt_ms = t0.elapsed().as_secs_f64() * 1000.0;
debug!("{label} → {status} ({rtt_ms:.1}ms)");
}
let _ = tx.send(Response {
status,
body,
content_type: meta.content_type,
headers: meta.headers,
});
}
}
Err(e) => {
warn!(slot = i, error = %e, "failed to read response");
}
}
}
if freed_any {
region.try_reset_heap();
}
}
})
.expect("failed to spawn slotbus response watcher thread")
}
pub fn slot_diagnostics(&self) -> Vec<(usize, u32)> {
(0..self.region.num_slots())
.map(|i| {
let slot = unsafe { self.region.slot(i) };
(i, slot.status.load(std::sync::atomic::Ordering::Relaxed))
})
.collect()
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
self.rsp_event.signal();
}
}
impl Drop for SlotBus {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.rsp_event.signal();
}
}
impl std::fmt::Debug for SlotBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlotBus")
.field("name", &self.config.name)
.field("num_slots", &self.config.num_slots)
.finish()
}
}
pub struct SlotWorker {
config: SlotBusConfig,
region: Arc<ShmRegion>,
req_event: Arc<NamedEvent>,
rsp_event: Arc<NamedEvent>,
overflow_regions: Arc<std::sync::Mutex<HashMap<usize, ShmRegion>>>,
running: Arc<AtomicBool>,
}
impl SlotWorker {
pub fn open(config: SlotBusConfig) -> Result<Self, SlotBusError> {
let region_name = config.region_name();
let mut region = ShmRegion::open(®ion_name)?;
region.validate_control()?;
let req_event = NamedEvent::open(&config.request_event_name())?;
let rsp_event = NamedEvent::open(&config.response_event_name())?;
debug!(
name = config.name,
region = region_name,
"opened slotbus region + events"
);
Ok(Self {
config,
region: Arc::new(region),
req_event: Arc::new(req_event),
rsp_event: Arc::new(rsp_event),
overflow_regions: Arc::new(std::sync::Mutex::new(HashMap::new())),
running: Arc::new(AtomicBool::new(true)),
})
}
pub fn config(&self) -> &SlotBusConfig {
&self.config
}
pub fn send_response(
&self,
slot_index: usize,
status: u16,
body: Vec<u8>,
content_type: &str,
headers: Vec<(String, String)>,
) -> Result<(), SlotBusError> {
let resp_meta = ResponseMeta {
content_type: content_type.to_string(),
headers,
};
let meta_bytes = postcard::to_allocvec(&resp_meta)?;
self.overflow_regions.lock().unwrap().remove(&slot_index);
let result = region::write_response(
&self.region,
slot_index,
status,
&meta_bytes,
&body,
&self.config,
);
match result {
Ok(overflow) => {
if let Some(ovf) = overflow {
self.overflow_regions
.lock()
.unwrap()
.insert(slot_index, ovf);
}
self.rsp_event.signal();
Ok(())
}
Err(e) => {
let slot = unsafe { self.region.slot(slot_index) };
slot.status
.store(SLOT_FREE, Ordering::Release);
Err(e)
}
}
}
pub fn start_receive_loop<F>(self: Arc<Self>, handler: F) -> std::thread::JoinHandle<()>
where
F: Fn(Arc<Self>, usize, Request) + Send + Sync + 'static,
{
let handler = Arc::new(handler);
let running = Arc::clone(&self.running);
std::thread::Builder::new()
.name(format!("{}-slotbus-recv", self.config.name))
.spawn(move || loop {
if !running.load(Ordering::Relaxed) {
break;
}
self.req_event.wait_timeout(self.config.wait_timeout_ms);
let wake_time = Instant::now();
for i in 0..self.region.num_slots() {
let slot = unsafe { self.region.slot(i) };
if slot
.status
.compare_exchange(
SLOT_READY,
SLOT_CLAIMED,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
match region::read_request(&self.region, i, &self.config) {
Ok((req_id, method_u8, meta, body)) => {
let method_str = u8_to_method(method_u8);
if self.config.instrumentation {
let claim_us = wake_time.elapsed().as_micros();
debug!(
slot = i,
claim_us,
method = method_str,
path = %meta.path,
"claimed request"
);
}
let request = Request {
req_id,
method: method_str.to_string(),
path: meta.path,
route_pattern: meta.route_pattern,
path_params: meta.path_params.into_iter().collect(),
query: meta.query,
body,
headers: meta.headers.into_iter().collect(),
};
let transport = Arc::clone(&self);
let handler = Arc::clone(&handler);
handler(transport, i, request);
}
Err(e) => {
warn!(slot = i, error = %e, "failed to read request");
slot.status.store(SLOT_FREE, Ordering::Release);
}
}
}
}
})
.expect("failed to spawn slotbus receive thread")
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
self.req_event.signal();
}
}
impl Drop for SlotWorker {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.req_event.signal();
}
}
impl std::fmt::Debug for SlotWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlotWorker")
.field("name", &self.config.name)
.finish()
}
}