use std::collections::VecDeque;
use std::future::Future;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Notify;
use crate::synapse::SynapseError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum BackpressureStrategy {
Block,
DropOldest,
DropNewest,
Reject,
}
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub struct BackpressureConfig {
pub queue_size: usize,
pub strategy: BackpressureStrategy,
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
queue_size: 1000,
strategy: BackpressureStrategy::Block,
}
}
}
pub struct BackpressureQueue<T: Send + 'static> {
inner: Arc<BackpressureQueueInner<T>>,
}
struct BackpressureQueueInner<T: Send + 'static> {
queue: Mutex<VecDeque<T>>,
config: BackpressureConfig,
neuron_name: String,
item_added: Notify,
item_removed: Notify,
}
impl<T: Send + 'static> BackpressureQueue<T> {
pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let inner = Arc::new(BackpressureQueueInner {
queue: Mutex::new(VecDeque::with_capacity(config.queue_size)),
config,
neuron_name: neuron_name.clone(),
item_added: Notify::new(),
item_removed: Notify::new(),
});
let worker_inner = inner.clone();
tokio::spawn(async move {
loop {
let item = {
let mut queue = worker_inner.queue.lock();
if let Some(item) = queue.pop_front() {
worker_inner.item_removed.notify_waiters();
Some(item)
} else {
None
}
};
if let Some(item) = item {
processor(item).await;
} else {
worker_inner.item_added.notified().await;
}
}
});
Self { inner }
}
pub async fn push(&self, item: T) -> Result<(), SynapseError> {
loop {
let should_wait = {
let mut queue = self.inner.queue.lock();
if queue.len() < self.inner.config.queue_size {
queue.push_back(item);
self.inner.item_added.notify_one();
return Ok(());
}
match self.inner.config.strategy {
BackpressureStrategy::Block => true,
BackpressureStrategy::DropOldest => {
queue.pop_front();
queue.push_back(item);
self.inner.item_added.notify_one();
tracing::warn!(
neuron = %self.inner.neuron_name,
"Backpressure: Dropped oldest message (queue full)"
);
return Ok(());
}
BackpressureStrategy::DropNewest => {
tracing::warn!(
neuron = %self.inner.neuron_name,
"Backpressure: Dropped newest message (queue full)"
);
return Ok(()); }
BackpressureStrategy::Reject => {
tracing::warn!(
neuron = %self.inner.neuron_name,
"Backpressure: Rejected message (queue full)"
);
return Err(SynapseError::QueueFull {
neuron_name: self.inner.neuron_name.clone(),
});
}
}
};
if should_wait {
self.inner.item_removed.notified().await;
}
}
}
}