Skip to main content

plexor_core/
backpressure.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use std::collections::VecDeque;
8use std::future::Future;
9use std::sync::Arc;
10use parking_lot::Mutex;
11use tokio::sync::Notify;
12use crate::synapse::SynapseError;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
15pub enum BackpressureStrategy {
16    Block,
17    DropOldest,
18    DropNewest,
19    Reject,
20}
21
22#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
23pub struct BackpressureConfig {
24    pub queue_size: usize,
25    pub strategy: BackpressureStrategy,
26}
27
28impl Default for BackpressureConfig {
29    fn default() -> Self {
30        Self {
31            queue_size: 1000,
32            strategy: BackpressureStrategy::Block,
33        }
34    }
35}
36
37/// A helper to manage backpressure for a stream of items.
38pub struct BackpressureQueue<T: Send + 'static> {
39    inner: Arc<BackpressureQueueInner<T>>,
40}
41
42struct BackpressureQueueInner<T: Send + 'static> {
43    queue: Mutex<VecDeque<T>>,
44    config: BackpressureConfig,
45    neuron_name: String,
46    // Notify the worker when a new item is added
47    item_added: Notify,
48    // Notify the producers when an item is removed (for Block strategy)
49    item_removed: Notify,
50}
51
52impl<T: Send + 'static> BackpressureQueue<T> {
53    pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
54    where
55        F: FnMut(T) -> Fut + Send + 'static,
56        Fut: Future<Output = ()> + Send + 'static,
57    {
58        let inner = Arc::new(BackpressureQueueInner {
59            queue: Mutex::new(VecDeque::with_capacity(config.queue_size)),
60            config,
61            neuron_name: neuron_name.clone(),
62            item_added: Notify::new(),
63            item_removed: Notify::new(),
64        });
65
66        let worker_inner = inner.clone();
67
68        tokio::spawn(async move {
69            loop {
70                let item = {
71                    let mut queue = worker_inner.queue.lock();
72                    if let Some(item) = queue.pop_front() {
73                        worker_inner.item_removed.notify_waiters();
74                        Some(item)
75                    } else {
76                        None
77                    }
78                };
79
80                if let Some(item) = item {
81                    processor(item).await;
82                } else {
83                    // Wait for a new item
84                    worker_inner.item_added.notified().await;
85                }
86            }
87        });
88
89        Self { inner }
90    }
91
92    pub async fn push(&self, item: T) -> Result<(), SynapseError> {
93        loop {
94            // Use a block to ensure the lock is dropped before any await
95            let should_wait = {
96                let mut queue = self.inner.queue.lock();
97
98                if queue.len() < self.inner.config.queue_size {
99                    queue.push_back(item);
100                    self.inner.item_added.notify_one();
101                    return Ok(());
102                }
103
104                match self.inner.config.strategy {
105                    BackpressureStrategy::Block => true,
106                    BackpressureStrategy::DropOldest => {
107                        queue.pop_front();
108                        queue.push_back(item);
109                        self.inner.item_added.notify_one();
110                        tracing::warn!(
111                            neuron = %self.inner.neuron_name,
112                            "Backpressure: Dropped oldest message (queue full)"
113                        );
114                        return Ok(());
115                    }
116                    BackpressureStrategy::DropNewest => {
117                        tracing::warn!(
118                            neuron = %self.inner.neuron_name,
119                            "Backpressure: Dropped newest message (queue full)"
120                        );
121                        return Ok(()); // Just drop it
122                    }
123                    BackpressureStrategy::Reject => {
124                        tracing::warn!(
125                            neuron = %self.inner.neuron_name,
126                            "Backpressure: Rejected message (queue full)"
127                        );
128                        return Err(SynapseError::QueueFull {
129                            neuron_name: self.inner.neuron_name.clone(),
130                        });
131                    }
132                }
133            };
134
135            if should_wait {
136                self.inner.item_removed.notified().await;
137            }
138        }
139    }
140}