1use crate::metrics::Metrics;
2use crate::ActionOp;
3use std::collections::VecDeque;
4use std::marker::PhantomData;
5use std::sync::{Arc, Condvar, Mutex};
6
7#[derive(Clone, Default)]
9pub enum BackpressurePolicy<T>
10where
11 T: Send + Sync + Clone + 'static,
12{
13 #[default]
15 BlockOnFull,
16 DropOldest,
18 DropLatest,
20 DropLatestIf {
22 predicate: Arc<dyn Fn(&ActionOp<T>) -> bool + Send + Sync>,
23 },
24 DropOldestIf {
26 predicate: Arc<dyn Fn(&ActionOp<T>) -> bool + Send + Sync>,
27 },
28}
29
30#[derive(thiserror::Error, Debug)]
31pub(crate) enum SenderError<T> {
32 #[error("Failed to send: {0}")]
33 SendError(T),
34 #[error("Channel is closed")]
35 ChannelClosed,
36}
37
38struct MpscQueue<T>
40where
41 T: Send + Sync + Clone + 'static,
42{
43 queue: Mutex<VecDeque<ActionOp<T>>>,
44 condvar: Condvar,
45 capacity: usize,
46 policy: BackpressurePolicy<T>,
47 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
48 closed: Mutex<bool>,
49}
50
51impl<T> MpscQueue<T>
52where
53 T: Send + Sync + Clone + 'static,
54{
55 fn new(
56 capacity: usize,
57 policy: BackpressurePolicy<T>,
58 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
59 ) -> Self {
60 Self {
61 queue: Mutex::new(VecDeque::new()),
62 condvar: Condvar::new(),
63 capacity,
64 policy,
65 metrics,
66 closed: Mutex::new(false),
67 }
68 }
69
70 fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
71 let mut queue = self.queue.lock().unwrap();
72
73 if *self.closed.lock().unwrap() {
75 return Err(SenderError::ChannelClosed);
76 }
77
78 if queue.len() >= self.capacity {
79 match &self.policy {
80 BackpressurePolicy::BlockOnFull => {
81 while queue.len() >= self.capacity {
83 queue = self.condvar.wait(queue).unwrap();
84 if *self.closed.lock().unwrap() {
85 return Err(SenderError::ChannelClosed);
86 }
87 }
88 queue.push_back(item);
89 }
90 BackpressurePolicy::DropOldest => {
91 if let Some(dropped_item) = queue.pop_front() {
93 if let Some(metrics) = &self.metrics {
94 if let ActionOp::Action(action) = &dropped_item {
95 metrics.action_dropped(Some(action as &dyn std::any::Any));
96 }
97 }
98 }
99 queue.push_back(item);
100 }
101 BackpressurePolicy::DropLatest => {
102 if let Some(metrics) = &self.metrics {
104 if let ActionOp::Action(action) = &item {
105 metrics.action_dropped(Some(action as &dyn std::any::Any));
106 }
107 }
108 return Ok(queue.len() as i64);
109 }
110 BackpressurePolicy::DropLatestIf { predicate } => {
111 let mut dropped_count = 0;
113 let mut i = 0;
114 while i < queue.len() {
115 if predicate(&queue[i]) {
116 if let Some(dropped_item) = queue.remove(i) {
117 dropped_count += 1;
118 if let Some(metrics) = &self.metrics {
119 if let ActionOp::Action(action) = &dropped_item {
120 metrics.action_dropped(Some(action as &dyn std::any::Any));
121 }
122 }
123 break;
124 }
125 }
126 i += 1;
127 }
128
129 if dropped_count > 0 {
130 queue.push_back(item);
131 } else {
132 return Err(SenderError::SendError(item));
133 }
134 }
135 BackpressurePolicy::DropOldestIf { predicate } => {
136 let mut dropped_count = 0;
138 let mut i = 0;
139 while i < queue.len() {
140 let index = queue.len() - i - 1;
141 if predicate(&queue[index]) {
142 if let Some(dropped_item) = queue.remove(index) {
143 dropped_count += 1;
144 if let Some(metrics) = &self.metrics {
145 if let ActionOp::Action(action) = &dropped_item {
146 metrics.action_dropped(Some(action as &dyn std::any::Any));
147 }
148 }
149 break;
150 }
151 }
152 i += 1;
153 }
154
155 if dropped_count > 0 {
156 queue.push_back(item);
157 } else {
158 return Err(SenderError::SendError(item));
159 }
160 }
161 }
162 } else {
163 queue.push_back(item);
164 }
165
166 if let Some(metrics) = &self.metrics {
168 metrics.queue_size(queue.len());
169 }
170
171 self.condvar.notify_one();
172 Ok(queue.len() as i64)
173 }
174
175 fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
176 if *self.closed.lock().unwrap() {
178 return Err(SenderError::ChannelClosed);
179 }
180
181 let mut queue = self.queue.lock().unwrap();
182
183 if queue.len() >= self.capacity {
184 match &self.policy {
185 BackpressurePolicy::BlockOnFull => {
186 return Err(SenderError::SendError(item));
187 }
188 BackpressurePolicy::DropOldest => {
189 if let Some(dropped_item) = queue.pop_front() {
191 if let Some(metrics) = &self.metrics {
192 if let ActionOp::Action(action) = &dropped_item {
193 metrics.action_dropped(Some(action as &dyn std::any::Any));
194 }
195 }
196 }
197 queue.push_back(item);
198 }
199 BackpressurePolicy::DropLatest => {
200 if let Some(metrics) = &self.metrics {
202 if let ActionOp::Action(action) = &item {
203 metrics.action_dropped(Some(action as &dyn std::any::Any));
204 }
205 }
206 return Ok(queue.len() as i64);
207 }
208 BackpressurePolicy::DropLatestIf { predicate } => {
209 let mut dropped_count = 0;
211 let mut i = 0;
212 while i < queue.len() {
213 if predicate(&queue[i]) {
214 if let Some(dropped_item) = queue.remove(i) {
215 dropped_count += 1;
216 if let Some(metrics) = &self.metrics {
217 if let ActionOp::Action(action) = &dropped_item {
218 metrics.action_dropped(Some(action as &dyn std::any::Any));
219 }
220 }
221 break;
222 }
223 }
224 i += 1;
225 }
226
227 if dropped_count > 0 {
228 queue.push_back(item);
229 } else {
230 return Err(SenderError::SendError(item));
231 }
232 }
233 BackpressurePolicy::DropOldestIf { predicate } => {
234 let mut dropped_count = 0;
236 let mut i = 0;
237 while i < queue.len() {
238 let index = queue.len() - i - 1;
239 if predicate(&queue[index]) {
240 if let Some(dropped_item) = queue.remove(index) {
241 dropped_count += 1;
242 if let Some(metrics) = &self.metrics {
243 if let ActionOp::Action(action) = &dropped_item {
244 metrics.action_dropped(Some(action as &dyn std::any::Any));
245 }
246 }
247 break;
248 }
249 }
250 i += 1;
251 }
252
253 if dropped_count > 0 {
254 queue.push_back(item);
255 } else {
256 return Err(SenderError::SendError(item));
257 }
258 }
259 }
260 } else {
261 queue.push_back(item);
262 }
263
264 if let Some(metrics) = &self.metrics {
266 metrics.queue_size(queue.len());
267 }
268
269 self.condvar.notify_one();
270 Ok(queue.len() as i64)
271 }
272
273 fn recv(&self) -> Option<ActionOp<T>> {
274 let mut queue = self.queue.lock().unwrap();
275
276 while queue.is_empty() {
278 if *self.closed.lock().unwrap() {
279 return None;
280 }
281 queue = self.condvar.wait(queue).unwrap();
282 }
283
284 let item = queue.pop_front();
285 self.condvar.notify_one();
286 item
287 }
288
289 fn try_recv(&self) -> Option<ActionOp<T>> {
290 let mut queue = self.queue.lock().unwrap();
291 let item = queue.pop_front();
292 if item.is_some() {
293 self.condvar.notify_one();
294 }
295 item
296 }
297
298 fn len(&self) -> usize {
299 self.queue.lock().unwrap().len()
300 }
301
302 fn close(&self) {
303 *self.closed.lock().unwrap() = true;
304 self.condvar.notify_all();
305 }
306}
307
308#[derive(Clone)]
310pub(crate) struct SenderChannel<T>
311where
312 T: Send + Sync + Clone + 'static,
313{
314 _name: String,
315 queue: Arc<MpscQueue<T>>,
316}
317
318impl<Action> Drop for SenderChannel<Action>
319where
320 Action: Send + Sync + Clone + 'static,
321{
322 fn drop(&mut self) {
323 #[cfg(feature = "store-log")]
324 eprintln!("store: drop '{}' sender channel", self._name);
325 }
326}
327
328#[allow(dead_code)]
329impl<T> SenderChannel<T>
330where
331 T: Send + Sync + Clone + 'static,
332{
333 pub fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
334 self.queue.send(item)
335 }
336
337 pub fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
338 self.queue.try_send(item)
339 }
340}
341
342#[allow(dead_code)]
343pub(crate) struct ReceiverChannel<T>
344where
345 T: Send + Sync + Clone + 'static,
346{
347 name: String,
348 queue: Arc<MpscQueue<T>>,
349 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
350}
351
352impl<Action> Drop for ReceiverChannel<Action>
353where
354 Action: Send + Sync + Clone + 'static,
355{
356 fn drop(&mut self) {
357 #[cfg(feature = "store-log")]
358 eprintln!("store: drop '{}' receiver channel", self.name);
359 self.close();
360 }
361}
362
363#[allow(dead_code)]
364impl<T> ReceiverChannel<T>
365where
366 T: Send + Sync + Clone + 'static,
367{
368 pub fn recv(&self) -> Option<ActionOp<T>> {
369 self.queue.recv()
370 }
371
372 #[allow(dead_code)]
373 pub fn try_recv(&self) -> Option<ActionOp<T>> {
374 self.queue.try_recv()
375 }
376
377 pub fn len(&self) -> usize {
378 self.queue.len()
379 }
380
381 pub fn close(&self) {
382 self.queue.close();
383 }
384}
385
386pub(crate) struct BackpressureChannel<MSG>
388where
389 MSG: Send + Sync + Clone + 'static,
390{
391 phantom_data: PhantomData<MSG>,
392}
393
394impl<MSG> BackpressureChannel<MSG>
395where
396 MSG: Send + Sync + Clone + 'static,
397{
398 #[allow(dead_code)]
399 pub fn pair(
400 capacity: usize,
401 policy: BackpressurePolicy<MSG>,
402 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
403 Self::pair_with("<anon>", capacity, policy, None)
404 }
405
406 #[allow(dead_code)]
407 pub fn pair_with_metrics(
408 capacity: usize,
409 policy: BackpressurePolicy<MSG>,
410 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
411 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
412 Self::pair_with("<anon>", capacity, policy, metrics)
413 }
414
415 #[allow(dead_code)]
416 pub fn pair_with(
417 name: &str,
418 capacity: usize,
419 policy: BackpressurePolicy<MSG>,
420 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
421 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
422 let queue = Arc::new(MpscQueue::new(capacity, policy, metrics.clone()));
423
424 (
425 SenderChannel {
426 _name: name.to_string(),
427 queue: queue.clone(),
428 },
429 ReceiverChannel {
430 name: name.to_string(),
431 queue,
432 metrics,
433 },
434 )
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 #[test]
443 fn test_basic_send_recv() {
444 let (sender, receiver) =
445 BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
446
447 sender.send(ActionOp::Action(1)).unwrap();
448 sender.send(ActionOp::Action(2)).unwrap();
449
450 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
451 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
452 assert_eq!(receiver.try_recv(), None);
453 }
454
455 #[test]
456 fn test_drop_oldest() {
457 let (sender, receiver) =
458 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldest);
459
460 sender.send(ActionOp::Action(1)).unwrap();
461 sender.send(ActionOp::Action(2)).unwrap();
462 sender.send(ActionOp::Action(3)).unwrap(); assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
465 assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
466 assert_eq!(receiver.try_recv(), None);
467 }
468
469 #[test]
470 fn test_drop_latest() {
471 let (sender, receiver) =
472 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatest);
473
474 sender.send(ActionOp::Action(1)).unwrap();
475 sender.send(ActionOp::Action(2)).unwrap();
476 sender.send(ActionOp::Action(3)).unwrap(); assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
479 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
480 assert_eq!(receiver.try_recv(), None);
481 }
482
483 #[test]
484 fn test_predicate_dropping() {
485 let predicate = Arc::new(|action_op: &ActionOp<i32>| match action_op {
487 ActionOp::Action(value) => *value < 5,
488 ActionOp::Exit(_) => false,
489 ActionOp::AddSubscriber => false,
490 });
491
492 let (sender, receiver) =
493 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf { predicate });
494
495 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap(); let result = sender.send(ActionOp::Action(7)); assert!(
502 result.is_ok(),
503 "Should succeed because predicate should drop the first item"
504 );
505
506 let received_item = receiver.recv();
508 assert!(received_item.is_some());
509 if let Some(ActionOp::Action(value)) = received_item {
510 assert_eq!(value, 6, "Should receive 6, not 1");
512 }
513
514 let received_item = receiver.recv();
515 assert!(received_item.is_some());
516 if let Some(ActionOp::Action(value)) = received_item {
517 assert_eq!(value, 7, "Should receive 7");
518 }
519 }
520
521 #[test]
522 fn test_add_subscriber_action() {
523 let (sender, receiver) =
524 BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
525
526 sender.send(ActionOp::AddSubscriber).unwrap();
528
529 let received = receiver.recv();
531 assert!(received.is_some());
532 match received.unwrap() {
533 ActionOp::AddSubscriber => {
534 }
536 _ => panic!("Expected AddSubscriber action"),
537 }
538 }
539
540 #[test]
541 fn test_add_subscriber_with_predicate() {
542 let predicate = Arc::new(|action_op: &ActionOp<i32>| match action_op {
544 ActionOp::Action(value) => *value < 5,
545 ActionOp::Exit(_) => false,
546 ActionOp::AddSubscriber => false, });
548
549 let (sender, receiver) =
550 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf { predicate });
551
552 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap(); let result = sender.send(ActionOp::AddSubscriber);
558 assert!(result.is_ok(), "AddSubscriber should be sent successfully");
559
560 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
562 assert_eq!(received_items.len(), 2);
563
564 let has_add_subscriber =
566 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
567 assert!(has_add_subscriber, "AddSubscriber should be received");
568 }
569
570 #[test]
571 fn test_mixed_action_types() {
572 let (sender, receiver) =
573 BackpressureChannel::<i32>::pair(10, BackpressurePolicy::BlockOnFull);
574
575 sender.send(ActionOp::Action(1)).unwrap();
577 sender.send(ActionOp::AddSubscriber).unwrap();
578 sender.send(ActionOp::Action(2)).unwrap();
579 sender.send(ActionOp::AddSubscriber).unwrap();
580 sender.send(ActionOp::Action(3)).unwrap();
581
582 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
584 assert_eq!(received_items.len(), 5);
585
586 match &received_items[0] {
588 ActionOp::Action(value) => assert_eq!(*value, 1),
589 _ => panic!("Expected Action(1)"),
590 }
591 match &received_items[1] {
592 ActionOp::AddSubscriber => {
593 }
595 _ => panic!("Expected AddSubscriber"),
596 }
597 match &received_items[2] {
598 ActionOp::Action(value) => assert_eq!(*value, 2),
599 _ => panic!("Expected Action(2)"),
600 }
601 match &received_items[3] {
602 ActionOp::AddSubscriber => {
603 }
605 _ => panic!("Expected AddSubscriber"),
606 }
607 match &received_items[4] {
608 ActionOp::Action(value) => assert_eq!(*value, 3),
609 _ => panic!("Expected Action(3)"),
610 }
611 }
612
613 #[test]
614 fn test_block_on_full() {
615 let (sender, receiver) =
616 BackpressureChannel::<i32>::pair(1, BackpressurePolicy::BlockOnFull);
617
618 sender.send(ActionOp::Action(1)).unwrap();
619
620 let result = sender.try_send(ActionOp::Action(2));
622 assert!(result.is_err(), "Should fail because channel is full");
623
624 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
626
627 sender.send(ActionOp::Action(2)).unwrap();
629 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
630 }
631
632 #[test]
633 fn test_drop_oldest_if_predicate_always_false() {
634 let (sender, receiver) = BackpressureChannel::pair(
635 3,
636 BackpressurePolicy::DropOldestIf {
637 predicate: Arc::new(|_| false), },
639 );
640
641 assert!(sender.try_send(ActionOp::Action(1)).is_ok());
643 assert!(sender.try_send(ActionOp::Action(2)).is_ok());
644 assert!(sender.try_send(ActionOp::Action(3)).is_ok());
645 assert_eq!(receiver.len(), 3);
646
647 let result = sender.try_send(ActionOp::Action(4));
650 assert!(
651 result.is_err(),
652 "Should fail because no items match the predicate"
653 );
654
655 assert_eq!(receiver.len(), 3);
657 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
658 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
659 assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
660 }
661
662 #[test]
663 fn test_drop_oldest_if_predicate_sometimes_true() {
664 let (sender, receiver) = BackpressureChannel::pair(
665 3,
666 BackpressurePolicy::DropOldestIf {
667 predicate: Arc::new(|action_op: &ActionOp<i32>| {
668 if let ActionOp::Action(value) = action_op {
669 *value < 5 } else {
671 false
672 }
673 }),
674 },
675 );
676
677 assert!(sender.try_send(ActionOp::Action(6)).is_ok()); assert!(sender.try_send(ActionOp::Action(2)).is_ok()); assert!(sender.try_send(ActionOp::Action(8)).is_ok()); assert_eq!(receiver.len(), 3);
682
683 let result = sender.try_send(ActionOp::Action(9));
685 assert!(
686 result.is_ok(),
687 "Should fail because no items match the predicate"
688 );
689
690 let result = sender.try_send(ActionOp::Action(10)); assert!(
693 result.is_err(),
694 "Should fail because no items match the predicate"
695 );
696
697 assert_eq!(receiver.len(), 3);
699 assert_eq!(receiver.recv(), Some(ActionOp::Action(6)));
700 assert_eq!(receiver.recv(), Some(ActionOp::Action(8)));
701 assert_eq!(receiver.recv(), Some(ActionOp::Action(9)));
702 }
703}