buffer_core/shm/
writer.rs1use std::fs;
2
3use core_types::{BufferId, ErrorCode, ErrorDomain, RtError};
4
5use super::config::validate_cfg;
6use super::header::{HEADER_SIZE, write_header};
7use super::rate_limiter::TokenBucket;
8use super::segment_io::{init_segment_files, io_to_err, map_mut, open_rw, segment_path, slot_for};
9use super::{SharedMemoryBufferRef, SharedMemoryWriter, ShmTransportConfig};
10
11impl SharedMemoryWriter {
12 pub fn new(cfg: ShmTransportConfig) -> Result<Self, RtError> {
13 validate_cfg(&cfg)?;
14 fs::create_dir_all(&cfg.directory)
15 .map_err(|err| io_to_err("create shm directory failed", err))?;
16 init_segment_files(&cfg)?;
17
18 let rate_limiter = cfg
19 .rate_limit
20 .as_ref()
21 .map(|limit| TokenBucket::new(limit.max_bytes_per_sec, limit.burst_bytes));
22
23 Ok(Self {
24 cfg,
25 next_buffer_id: 0,
26 rate_limiter,
27 })
28 }
29
30 pub fn publish(&mut self, payload: &[u8]) -> Result<SharedMemoryBufferRef, RtError> {
31 if payload.len() > self.cfg.segment_payload_bytes {
32 return Err(RtError::new(
33 ErrorCode::Unsupported,
34 ErrorDomain::Core,
35 false,
36 format!(
37 "payload too large for shm segment: payload={} max={}",
38 payload.len(),
39 self.cfg.segment_payload_bytes
40 ),
41 ));
42 }
43
44 if let Some(bucket) = &mut self.rate_limiter
45 && !bucket.try_consume(payload.len() as u64)
46 {
47 return Err(RtError::new(
48 ErrorCode::Backpressure,
49 ErrorDomain::Core,
50 true,
51 "shared memory writer rate limit exceeded",
52 ));
53 }
54
55 self.next_buffer_id = self.next_buffer_id.saturating_add(1);
56 let buffer_id = BufferId::new(self.next_buffer_id);
57 let slot = slot_for(buffer_id, self.cfg.segment_count);
58 let segment_path = segment_path(&self.cfg, slot);
59
60 let file = open_rw(&segment_path)?;
61 let mut mmap = map_mut(&file)?;
62
63 write_header(&mut mmap, buffer_id, payload.len())?;
64 let start = HEADER_SIZE;
65 let end = HEADER_SIZE + payload.len();
66 mmap[start..end].copy_from_slice(payload);
67 mmap.flush()
68 .map_err(|err| io_to_err("flush shm segment failed", err))?;
69
70 Ok(SharedMemoryBufferRef {
71 buffer_id,
72 offset: 0,
73 len: payload.len(),
74 })
75 }
76}