robotrt-buffer-core 0.1.0-beta.1

RobotRT modular robotics runtime and middleware components.
Documentation
use std::fs;

use core_types::{BufferId, ErrorCode, ErrorDomain, RtError};

use super::config::validate_cfg;
use super::header::{HEADER_SIZE, write_header};
use super::rate_limiter::TokenBucket;
use super::segment_io::{init_segment_files, io_to_err, map_mut, open_rw, segment_path, slot_for};
use super::{SharedMemoryBufferRef, SharedMemoryWriter, ShmTransportConfig};

impl SharedMemoryWriter {
    pub fn new(cfg: ShmTransportConfig) -> Result<Self, RtError> {
        validate_cfg(&cfg)?;
        fs::create_dir_all(&cfg.directory)
            .map_err(|err| io_to_err("create shm directory failed", err))?;
        init_segment_files(&cfg)?;

        let rate_limiter = cfg
            .rate_limit
            .as_ref()
            .map(|limit| TokenBucket::new(limit.max_bytes_per_sec, limit.burst_bytes));

        Ok(Self {
            cfg,
            next_buffer_id: 0,
            rate_limiter,
        })
    }

    pub fn publish(&mut self, payload: &[u8]) -> Result<SharedMemoryBufferRef, RtError> {
        if payload.len() > self.cfg.segment_payload_bytes {
            return Err(RtError::new(
                ErrorCode::Unsupported,
                ErrorDomain::Core,
                false,
                format!(
                    "payload too large for shm segment: payload={} max={}",
                    payload.len(),
                    self.cfg.segment_payload_bytes
                ),
            ));
        }

        if let Some(bucket) = &mut self.rate_limiter
            && !bucket.try_consume(payload.len() as u64)
        {
            return Err(RtError::new(
                ErrorCode::Backpressure,
                ErrorDomain::Core,
                true,
                "shared memory writer rate limit exceeded",
            ));
        }

        self.next_buffer_id = self.next_buffer_id.saturating_add(1);
        let buffer_id = BufferId::new(self.next_buffer_id);
        let slot = slot_for(buffer_id, self.cfg.segment_count);
        let segment_path = segment_path(&self.cfg, slot);

        let file = open_rw(&segment_path)?;
        let mut mmap = map_mut(&file)?;

        write_header(&mut mmap, buffer_id, payload.len())?;
        let start = HEADER_SIZE;
        let end = HEADER_SIZE + payload.len();
        mmap[start..end].copy_from_slice(payload);
        mmap.flush()
            .map_err(|err| io_to_err("flush shm segment failed", err))?;

        Ok(SharedMemoryBufferRef {
            buffer_id,
            offset: 0,
            len: payload.len(),
        })
    }
}