plexor_core/
backpressure.rs1use std::collections::VecDeque;
8use std::future::Future;
9use std::sync::Arc;
10use parking_lot::Mutex;
11use tokio::sync::Notify;
12use crate::synapse::SynapseError;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
15pub enum BackpressureStrategy {
16 Block,
17 DropOldest,
18 DropNewest,
19 Reject,
20}
21
22#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
23pub struct BackpressureConfig {
24 pub queue_size: usize,
25 pub strategy: BackpressureStrategy,
26}
27
28impl Default for BackpressureConfig {
29 fn default() -> Self {
30 Self {
31 queue_size: 1000,
32 strategy: BackpressureStrategy::Block,
33 }
34 }
35}
36
37pub struct BackpressureQueue<T: Send + 'static> {
39 inner: Arc<BackpressureQueueInner<T>>,
40}
41
42struct BackpressureQueueInner<T: Send + 'static> {
43 queue: Mutex<VecDeque<T>>,
44 config: BackpressureConfig,
45 neuron_name: String,
46 item_added: Notify,
48 item_removed: Notify,
50}
51
52impl<T: Send + 'static> BackpressureQueue<T> {
53 pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
54 where
55 F: FnMut(T) -> Fut + Send + 'static,
56 Fut: Future<Output = ()> + Send + 'static,
57 {
58 let inner = Arc::new(BackpressureQueueInner {
59 queue: Mutex::new(VecDeque::with_capacity(config.queue_size)),
60 config,
61 neuron_name: neuron_name.clone(),
62 item_added: Notify::new(),
63 item_removed: Notify::new(),
64 });
65
66 let worker_inner = inner.clone();
67
68 tokio::spawn(async move {
69 loop {
70 let item = {
71 let mut queue = worker_inner.queue.lock();
72 if let Some(item) = queue.pop_front() {
73 worker_inner.item_removed.notify_waiters();
74 Some(item)
75 } else {
76 None
77 }
78 };
79
80 if let Some(item) = item {
81 processor(item).await;
82 } else {
83 worker_inner.item_added.notified().await;
85 }
86 }
87 });
88
89 Self { inner }
90 }
91
92 pub async fn push(&self, item: T) -> Result<(), SynapseError> {
93 loop {
94 let should_wait = {
96 let mut queue = self.inner.queue.lock();
97
98 if queue.len() < self.inner.config.queue_size {
99 queue.push_back(item);
100 self.inner.item_added.notify_one();
101 return Ok(());
102 }
103
104 match self.inner.config.strategy {
105 BackpressureStrategy::Block => true,
106 BackpressureStrategy::DropOldest => {
107 queue.pop_front();
108 queue.push_back(item);
109 self.inner.item_added.notify_one();
110 tracing::warn!(
111 neuron = %self.inner.neuron_name,
112 "Backpressure: Dropped oldest message (queue full)"
113 );
114 return Ok(());
115 }
116 BackpressureStrategy::DropNewest => {
117 tracing::warn!(
118 neuron = %self.inner.neuron_name,
119 "Backpressure: Dropped newest message (queue full)"
120 );
121 return Ok(()); }
123 BackpressureStrategy::Reject => {
124 tracing::warn!(
125 neuron = %self.inner.neuron_name,
126 "Backpressure: Rejected message (queue full)"
127 );
128 return Err(SynapseError::QueueFull {
129 neuron_name: self.inner.neuron_name.clone(),
130 });
131 }
132 }
133 };
134
135 if should_wait {
136 self.inner.item_removed.notified().await;
137 }
138 }
139 }
140}