use std::fs;
use core_types::{BufferId, ErrorCode, ErrorDomain, RtError};
use super::config::validate_cfg;
use super::header::{HEADER_SIZE, write_header};
use super::rate_limiter::TokenBucket;
use super::segment_io::{init_segment_files, io_to_err, map_mut, open_rw, segment_path, slot_for};
use super::{SharedMemoryBufferRef, SharedMemoryWriter, ShmTransportConfig};
impl SharedMemoryWriter {
pub fn new(cfg: ShmTransportConfig) -> Result<Self, RtError> {
validate_cfg(&cfg)?;
fs::create_dir_all(&cfg.directory)
.map_err(|err| io_to_err("create shm directory failed", err))?;
init_segment_files(&cfg)?;
let rate_limiter = cfg
.rate_limit
.as_ref()
.map(|limit| TokenBucket::new(limit.max_bytes_per_sec, limit.burst_bytes));
Ok(Self {
cfg,
next_buffer_id: 0,
rate_limiter,
})
}
pub fn publish(&mut self, payload: &[u8]) -> Result<SharedMemoryBufferRef, RtError> {
if payload.len() > self.cfg.segment_payload_bytes {
return Err(RtError::new(
ErrorCode::Unsupported,
ErrorDomain::Core,
false,
format!(
"payload too large for shm segment: payload={} max={}",
payload.len(),
self.cfg.segment_payload_bytes
),
));
}
if let Some(bucket) = &mut self.rate_limiter
&& !bucket.try_consume(payload.len() as u64)
{
return Err(RtError::new(
ErrorCode::Backpressure,
ErrorDomain::Core,
true,
"shared memory writer rate limit exceeded",
));
}
self.next_buffer_id = self.next_buffer_id.saturating_add(1);
let buffer_id = BufferId::new(self.next_buffer_id);
let slot = slot_for(buffer_id, self.cfg.segment_count);
let segment_path = segment_path(&self.cfg, slot);
let file = open_rw(&segment_path)?;
let mut mmap = map_mut(&file)?;
write_header(&mut mmap, buffer_id, payload.len())?;
let start = HEADER_SIZE;
let end = HEADER_SIZE + payload.len();
mmap[start..end].copy_from_slice(payload);
mmap.flush()
.map_err(|err| io_to_err("flush shm segment failed", err))?;
Ok(SharedMemoryBufferRef {
buffer_id,
offset: 0,
len: payload.len(),
})
}
}