rusteron-rb 0.1.121

Provides ring buffer and broadcast functionalities via aeron c bindings, allowing efficient, low-latency message passing between different threads or processes. This module implements Single Producer, Single Consumer (SPSC) ring buffers, Multi-Producer, Single Consumer (MPSC) ring buffers, and broadcast channels.
Documentation
use rusteron_rb::*;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::thread;
use std::time::{Duration, Instant};

const BURST_LENGTH: usize = 1_000_000;
const MESSAGE_LENGTH: usize = 32;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let running = Arc::new(AtomicBool::new(true));

    println!("message length {}", MESSAGE_LENGTH);

    let running_ctrl_c = Arc::clone(&running);
    ctrlc::set_handler(move || {
        running_ctrl_c.store(false, Ordering::SeqCst);
    })
    .expect("Error setting Ctrl-C handler");

    let running_publisher = Arc::clone(&running);
    let running_subscriber = Arc::clone(&running);

    let rb = AeronSpscRb::new_with_capacity(1024 * 1024, MESSAGE_LENGTH)?;

    let publisher_thread = {
        let rb = rb.clone();
        thread::spawn(move || {
            Publisher::new(running_publisher, rb).run();
            Ok::<_, AeronCError>(())
        })
    };

    let subscriber_thread = thread::spawn(move || {
        let mut image_rate_subscriber =
            ImageRateSubscriber::new(running_subscriber, rb, MESSAGE_LENGTH);
        image_rate_subscriber.run();
        Ok::<_, AeronCError>(())
    });

    publisher_thread
        .join()
        .expect("Publisher thread failed")
        .unwrap();
    subscriber_thread
        .join()
        .expect("Subscriber thread failed")
        .unwrap();

    Ok(())
}

struct Publisher {
    running: Arc<AtomicBool>,
    publication: AeronSpscRb,
}

impl Publisher {
    fn new(running: Arc<AtomicBool>, publication: AeronSpscRb) -> Self {
        Publisher {
            running,
            publication,
        }
    }

    fn run(&self) {
        let mut back_pressure_count = 0;
        let mut total_message_count = 0;
        while self.running.load(Ordering::Acquire) {
            if let Ok(mut msg) = self.publication.try_claim_slice(1, MESSAGE_LENGTH) {
                msg[0] = 1u8;
            } else {
                back_pressure_count += 1;
                if !self.running.load(Ordering::Acquire) {
                    let back_pressure_ratio =
                        back_pressure_count as f64 / total_message_count as f64;
                    println!("Publisher back pressure ratio: {:.6}", back_pressure_ratio);
                    println!("total_message_count: {total_message_count}");
                    return;
                }
            }
            total_message_count += 1;
        }

        let back_pressure_ratio = back_pressure_count as f64 / total_message_count as f64;
        println!("Publisher back pressure ratio: {:.6}", back_pressure_ratio);
        println!("total_message_count: {total_message_count}");
    }
}

struct ImageRateSubscriber {
    running: Arc<AtomicBool>,
    subscription: AeronSpscRb,
    handler: Handler<AeronRingBufferHandlerWrapper<MsgCount>>,
    message_length: usize,
    start_time: Instant,
}

struct MsgCount {
    message_count: usize,
}

impl AeronRingBufferHandlerCallback for MsgCount {
    fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) {
        self.message_count += 1;
        assert_eq!(msg_type_id, 1);
        assert_eq!(buffer[0], 1);
    }
}

impl ImageRateSubscriber {
    fn new(running: Arc<AtomicBool>, subscription: AeronSpscRb, message_length: usize) -> Self {
        let poll_handler = AeronRingBufferHandlerWrapper::new(MsgCount { message_count: 0 });
        ImageRateSubscriber {
            running,
            subscription,
            handler: poll_handler,
            message_length,
            start_time: Instant::now(),
        }
    }

    fn run(&mut self) {
        let mut next_check = BURST_LENGTH;
        while self.running.load(Ordering::Acquire) {
            let _ = self.subscription.read_msgs(&self.handler, BURST_LENGTH);

            if self.handler.message_count >= next_check
                && self.start_time.elapsed() >= Duration::from_secs(1)
            {
                next_check += BURST_LENGTH;
                let elapsed = self.start_time.elapsed().as_secs_f64();
                let rate = self.handler.message_count as f64 / elapsed;
                let throughput = rate * self.message_length as f64;

                use num_format::{Locale, ToFormattedString};
                println!(
                    "Throughput: {} msgs/sec, {} bytes/sec",
                    (rate.round() as u64).to_formatted_string(&Locale::en),
                    (throughput.round() as u64).to_formatted_string(&Locale::en)
                );

                self.start_time = Instant::now();
                self.handler.message_count = 0;
            }
        }
    }
}