plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use std::collections::VecDeque;
use std::future::Future;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Notify;
use crate::synapse::SynapseError;

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum BackpressureStrategy {
    Block,
    DropOldest,
    DropNewest,
    Reject,
}

#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub struct BackpressureConfig {
    pub queue_size: usize,
    pub strategy: BackpressureStrategy,
}

impl Default for BackpressureConfig {
    fn default() -> Self {
        Self {
            queue_size: 1000,
            strategy: BackpressureStrategy::Block,
        }
    }
}

/// A helper to manage backpressure for a stream of items.
pub struct BackpressureQueue<T: Send + 'static> {
    inner: Arc<BackpressureQueueInner<T>>,
}

struct BackpressureQueueInner<T: Send + 'static> {
    queue: Mutex<VecDeque<T>>,
    config: BackpressureConfig,
    neuron_name: String,
    // Notify the worker when a new item is added
    item_added: Notify,
    // Notify the producers when an item is removed (for Block strategy)
    item_removed: Notify,
}

impl<T: Send + 'static> BackpressureQueue<T> {
    pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
    where
        F: FnMut(T) -> Fut + Send + 'static,
        Fut: Future<Output = ()> + Send + 'static,
    {
        let inner = Arc::new(BackpressureQueueInner {
            queue: Mutex::new(VecDeque::with_capacity(config.queue_size)),
            config,
            neuron_name: neuron_name.clone(),
            item_added: Notify::new(),
            item_removed: Notify::new(),
        });

        let worker_inner = inner.clone();

        tokio::spawn(async move {
            loop {
                let item = {
                    let mut queue = worker_inner.queue.lock();
                    if let Some(item) = queue.pop_front() {
                        worker_inner.item_removed.notify_waiters();
                        Some(item)
                    } else {
                        None
                    }
                };

                if let Some(item) = item {
                    processor(item).await;
                } else {
                    // Wait for a new item
                    worker_inner.item_added.notified().await;
                }
            }
        });

        Self { inner }
    }

    pub async fn push(&self, item: T) -> Result<(), SynapseError> {
        loop {
            // Use a block to ensure the lock is dropped before any await
            let should_wait = {
                let mut queue = self.inner.queue.lock();

                if queue.len() < self.inner.config.queue_size {
                    queue.push_back(item);
                    self.inner.item_added.notify_one();
                    return Ok(());
                }

                match self.inner.config.strategy {
                    BackpressureStrategy::Block => true,
                    BackpressureStrategy::DropOldest => {
                        queue.pop_front();
                        queue.push_back(item);
                        self.inner.item_added.notify_one();
                        tracing::warn!(
                            neuron = %self.inner.neuron_name,
                            "Backpressure: Dropped oldest message (queue full)"
                        );
                        return Ok(());
                    }
                    BackpressureStrategy::DropNewest => {
                        tracing::warn!(
                            neuron = %self.inner.neuron_name,
                            "Backpressure: Dropped newest message (queue full)"
                        );
                        return Ok(()); // Just drop it
                    }
                    BackpressureStrategy::Reject => {
                        tracing::warn!(
                            neuron = %self.inner.neuron_name,
                            "Backpressure: Rejected message (queue full)"
                        );
                        return Err(SynapseError::QueueFull {
                            neuron_name: self.inner.neuron_name.clone(),
                        });
                    }
                }
            };

            if should_wait {
                self.inner.item_removed.notified().await;
            }
        }
    }
}