Skip to main content

buffer_core/shm/
writer.rs

1use std::fs;
2
3use core_types::{BufferId, ErrorCode, ErrorDomain, RtError};
4
5use super::config::validate_cfg;
6use super::header::{HEADER_SIZE, write_header};
7use super::rate_limiter::TokenBucket;
8use super::segment_io::{init_segment_files, io_to_err, map_mut, open_rw, segment_path, slot_for};
9use super::{SharedMemoryBufferRef, SharedMemoryWriter, ShmTransportConfig};
10
11impl SharedMemoryWriter {
12    pub fn new(cfg: ShmTransportConfig) -> Result<Self, RtError> {
13        validate_cfg(&cfg)?;
14        fs::create_dir_all(&cfg.directory)
15            .map_err(|err| io_to_err("create shm directory failed", err))?;
16        init_segment_files(&cfg)?;
17
18        let rate_limiter = cfg
19            .rate_limit
20            .as_ref()
21            .map(|limit| TokenBucket::new(limit.max_bytes_per_sec, limit.burst_bytes));
22
23        Ok(Self {
24            cfg,
25            next_buffer_id: 0,
26            rate_limiter,
27        })
28    }
29
30    pub fn publish(&mut self, payload: &[u8]) -> Result<SharedMemoryBufferRef, RtError> {
31        if payload.len() > self.cfg.segment_payload_bytes {
32            return Err(RtError::new(
33                ErrorCode::Unsupported,
34                ErrorDomain::Core,
35                false,
36                format!(
37                    "payload too large for shm segment: payload={} max={}",
38                    payload.len(),
39                    self.cfg.segment_payload_bytes
40                ),
41            ));
42        }
43
44        if let Some(bucket) = &mut self.rate_limiter
45            && !bucket.try_consume(payload.len() as u64)
46        {
47            return Err(RtError::new(
48                ErrorCode::Backpressure,
49                ErrorDomain::Core,
50                true,
51                "shared memory writer rate limit exceeded",
52            ));
53        }
54
55        self.next_buffer_id = self.next_buffer_id.saturating_add(1);
56        let buffer_id = BufferId::new(self.next_buffer_id);
57        let slot = slot_for(buffer_id, self.cfg.segment_count);
58        let segment_path = segment_path(&self.cfg, slot);
59
60        let file = open_rw(&segment_path)?;
61        let mut mmap = map_mut(&file)?;
62
63        write_header(&mut mmap, buffer_id, payload.len())?;
64        let start = HEADER_SIZE;
65        let end = HEADER_SIZE + payload.len();
66        mmap[start..end].copy_from_slice(payload);
67        mmap.flush()
68            .map_err(|err| io_to_err("flush shm segment failed", err))?;
69
70        Ok(SharedMemoryBufferRef {
71            buffer_id,
72            offset: 0,
73            len: payload.len(),
74        })
75    }
76}