nodo_std 0.18.5

Standard codelets for NODO
Documentation
// Copyright 2023 David Weikersdorfer

use nodo::prelude::*;
use std::{
    collections::VecDeque,
    sync::{Arc, RwLock},
};

/// Stores all received messages in memory
pub struct Buffer<M> {
    archive: Arc<RwLock<VecDeque<M>>>,
}

impl<M> Default for Buffer<M> {
    fn default() -> Self {
        Self {
            archive: Arc::new(RwLock::new(VecDeque::default())),
        }
    }
}

impl<M> Buffer<M> {
    /// Get shared access to stored messages
    pub fn shared_access(&self) -> Arc<RwLock<VecDeque<M>>> {
        self.archive.clone()
    }
}

#[derive(Config, Default)]
pub struct BufferConfig {
    /// Maximum number of messages to keep
    pub max_count: Option<usize>,
}

impl<M: Clone + Send + Sync> Codelet for Buffer<M> {
    type Status = DefaultStatus;
    type Config = BufferConfig;
    type Rx = DoubleBufferRx<M>;
    type Tx = ();
    type Signals = ();

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        (DoubleBufferRx::new_auto_size(), ())
    }

    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
        let mut archive = self.archive.write().unwrap();

        while let Some(msg) = rx.try_pop() {
            archive.push_back(msg);
        }

        if let Some(max_count) = cx.config.max_count {
            if archive.len() > max_count {
                let drain_count = archive.len() - max_count;
                archive.drain(0..drain_count);
            }
        }

        SUCCESS
    }
}