myelon 0.1.0-alpha.2

Ultra-low-latency and high-throughput typed transport for multiprocess SHM and mmap. Layers framing, codecs, typed zero-copy, and inference-fabric topology helpers on disruptor-mp's LMAX-Disruptor-style ring substrate.
Documentation
use myelon::{
    self,
    consumer::SharedConsumer,
    inference::{FixedTopology, WorkerCount},
    lock_free::{self, ConsumerBarrier},
    producer, shared_memory, transport,
};
use myelon::{attach_shared_consumer, build_shared_single_producer};
use myelon::{MultiProcessError, MultiProcessResult};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Copy, Clone, Default, Debug, PartialEq)]
struct MpEvent {
    sequence: u64,
    payload: u64,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
struct ApiFrame {
    len: usize,
    kind: u8,
    flags: u8,
    msg_id: u32,
    data: [u8; 8],
}

impl transport::FramedTransportFrame for ApiFrame {
    fn payload_capacity() -> usize {
        8
    }

    fn frame_meta(&self) -> transport::FrameMeta<'_> {
        transport::FrameMeta {
            len: self.len,
            kind: self.kind,
            flags: self.flags,
            msg_id: self.msg_id,
            timestamp_ns: None,
            data: &self.data[..self.len],
        }
    }

    fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
        self.len = payload.len();
        self.kind = kind;
        self.flags = flags;
        self.msg_id = msg_id;
        self.data[..payload.len()].copy_from_slice(payload);
    }
}

fn unique_name(prefix: &str) -> String {
    static COUNTER: AtomicUsize = AtomicUsize::new(0);
    let prefix: String = prefix
        .chars()
        .filter(|ch| ch.is_ascii_alphanumeric())
        .take(4)
        .collect();
    let pid = std::process::id() % 10_000;
    let suffix = COUNTER.fetch_add(1, Ordering::Relaxed) % 100;
    let name = format!("{prefix}{pid:04}{suffix:02}");
    assert!(
        name.len() <= 14,
        "segment name exceeds macOS budget: {name}"
    );
    name
}

#[test]
fn api_boundary_smoke_test() {
    let _ = std::mem::size_of::<shared_memory::SharedMemoryConfig>();
    let _ = std::mem::size_of::<shared_memory::ShmRingBuffer<MpEvent>>();
    let _ = std::mem::size_of::<lock_free::SharedCursor>();
    let _ = std::mem::size_of::<lock_free::ConsumerBarrier>();
    let _ = std::mem::size_of::<lock_free::ProducerBarrier>();
    let _ = std::mem::size_of::<producer::CoordinationMode>();
    let _ = std::mem::size_of::<myelon::shared_memory::SharedRingBuffer<MpEvent>>();
    let _ = std::mem::size_of::<myelon::backend::shared_memory::ShmRingBuffer<MpEvent>>();
    let _ = std::mem::size_of::<myelon::prelude::SharedProducer<MpEvent>>();
    let _ = std::mem::size_of::<myelon::prelude::SharedConsumer<MpEvent>>();
    let _ = std::mem::size_of::<ConsumerBarrier>();
    let _ = std::mem::size_of::<FixedTopology>();
    let _ = std::mem::size_of::<WorkerCount>();
    let _ = std::mem::size_of::<MultiProcessError>();
    let _ = std::mem::size_of::<MultiProcessResult<MpEvent>>();
    let _ = std::mem::size_of::<transport::MyelonTransportConfig>();
    let _ = std::mem::size_of::<transport::MyelonTransportLayout>();
    let _ = std::mem::size_of::<transport::RunnerMyelonTransportConfig>();
    let _ = std::mem::size_of::<transport::MyelonWaitStrategy>();
    let _ = std::mem::size_of::<transport::FrameMeta<'static>>();
    let _ = std::mem::size_of::<transport::FixedFrame<8>>();
    let _ = std::mem::size_of::<transport::FramedTransportProducer<ApiFrame>>();
    let _ = std::mem::size_of::<transport::FramedTransportConsumer<ApiFrame>>();
    let _ = std::mem::size_of::<myelon::MmapProducer<MpEvent>>();
    let _ = std::mem::size_of::<myelon::MmapConsumer<MpEvent>>();
    let _ = std::mem::size_of::<myelon::MmapTransportLayout>();

    let segment = unique_name("boundary");
    let buffer_size = 8usize;
    let events_to_publish = 4u64;

    let mut producer = build_shared_single_producer::<MpEvent>(&segment, buffer_size)
        .enable_discovery(1)
        .build_producer(MpEvent::default)
        .expect("producer creation should succeed");

    let _consumer: SharedConsumer<MpEvent> =
        attach_shared_consumer::<MpEvent>(&segment, buffer_size)
            .enable_discovery(1)
            .with_coordination(producer::CoordinationMode::Immediate)
            .build_consumer()
            .expect("consumer creation should succeed");

    for idx in 0..events_to_publish {
        producer
            .try_publish(|event| {
                event.sequence = idx;
                event.payload = idx.saturating_mul(2);
            })
            .expect("publish should succeed");
    }

    assert_eq!(
        producer.last_published_sequence(),
        (events_to_publish as i64) - 1
    );

    let topology = FixedTopology::new(segment.clone(), buffer_size, WorkerCount::Two);
    assert_eq!(topology.worker_count().as_usize(), 2);
    assert_eq!(
        topology
            .worker_consumer_id(1)
            .expect("worker 1 id should be valid"),
        format!("{segment}_wk_1")
    );

    let transport_layout =
        transport::MyelonTransportLayout::for_session_with_prefix("vmy", &segment, 2, 16, 8)
            .expect("transport layout should be valid");
    let transport_config = transport::MyelonTransportConfig::resolve(None, None, None)
        .expect("transport config should be valid");
    assert_eq!(transport_layout.transport_prefix(), "vmy");
    assert_eq!(transport_layout.runner_count(), 2);
    assert_eq!(
        transport_config.rpc_depth,
        transport::DEFAULT_MYELON_RPC_DEPTH
    );
    assert_eq!(
        transport_config.response_depth,
        transport::DEFAULT_MYELON_RESPONSE_DEPTH
    );
    assert!(transport::is_single_frame(transport::frame_flags(
        true, true
    )));
    assert!(transport::is_last_frame(transport::frame_flags(
        false, true
    )));
    assert_eq!(
        transport::RunnerMyelonTransportConfig::for_rank(
            &transport_layout,
            1,
            transport::MyelonWaitStrategy::Block
        )
        .expect("ranked config should be valid")
        .response_ring_name,
        transport_layout
            .response_ring_name(1)
            .expect("response ring should exist")
    );
}