1use crate::metrics::Metrics;
2#[cfg(feature = "store-log")]
3use crate::store_impl::describe_action_op;
4use crate::ActionOp;
5use std::collections::VecDeque;
6use std::fmt;
7use std::marker::PhantomData;
8use std::sync::{Arc, Condvar, Mutex};
9
10#[derive(Default)]
12pub enum BackpressurePolicy<T>
13where
14 T: Send + Sync + Clone + 'static,
15{
16 #[default]
18 BlockOnFull,
19 #[allow(clippy::type_complexity)]
23 DropLatestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
24 #[allow(clippy::type_complexity)]
28 DropOldestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
29}
30
31#[derive(thiserror::Error)]
32pub(crate) enum SenderError<T> {
33 #[error("Failed to send item")]
34 SendError(T),
35 #[error("Channel is closed")]
36 ChannelClosed,
37}
38
39impl<T> fmt::Debug for SenderError<T> {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 SenderError::SendError(_) => f.write_str("SendError(..)"),
43 SenderError::ChannelClosed => f.write_str("ChannelClosed"),
44 }
45 }
46}
47
48struct MpscQueue<T>
50where
51 T: Send + Sync + Clone + 'static,
52{
53 queue: Mutex<VecDeque<ActionOp<T>>>,
54 condvar: Condvar,
55 capacity: usize,
56 policy: BackpressurePolicy<T>,
57 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
58 closed: Mutex<bool>,
59}
60
61impl<T> MpscQueue<T>
62where
63 T: Send + Sync + Clone + 'static,
64{
65 fn new(
66 capacity: usize,
67 policy: BackpressurePolicy<T>,
68 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
69 ) -> Self {
70 Self {
71 queue: Mutex::new(VecDeque::new()),
72 condvar: Condvar::new(),
73 capacity,
74 policy,
75 metrics,
76 closed: Mutex::new(false),
77 }
78 }
79
80 fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
88 if *self.closed.lock().unwrap() {
90 return Err(SenderError::ChannelClosed);
91 }
92
93 let mut queue: std::sync::MutexGuard<'_, VecDeque<ActionOp<T>>> =
94 self.queue.lock().unwrap();
95
96 loop {
98 if queue.len() < self.capacity {
99 queue.push_back(item);
101 break;
102 }
103
104 #[allow(deprecated)]
106 match &self.policy {
107 BackpressurePolicy::BlockOnFull => {
108 while queue.len() >= self.capacity {
110 queue = self.condvar.wait(queue).unwrap();
111 if *self.closed.lock().unwrap() {
112 return Err(SenderError::ChannelClosed);
113 }
114 }
115 }
117 BackpressurePolicy::DropOldestIf(None) => {
118 let mut found_action_to_drop = false;
120 let mut i = 0;
121 while i < queue.len() {
122 if matches!(queue[i], ActionOp::Action(_)) {
123 if let Some(dropped_item) = queue.remove(i) {
124 found_action_to_drop = true;
125 if let Some(metrics) = &self.metrics {
126 if let ActionOp::Action(action) = &dropped_item {
127 metrics.action_dropped(Some(action as &dyn std::any::Any));
128 }
129 }
130 break;
131 }
132 }
133 i += 1;
134 }
135
136 if !found_action_to_drop {
138 queue = self.condvar.wait(queue).unwrap();
139 if *self.closed.lock().unwrap() {
140 return Err(SenderError::ChannelClosed);
141 }
142 continue; }
144 }
146 BackpressurePolicy::DropLatestIf(None) => {
147 let mut found_action_to_drop = false;
149 let mut i = 0;
150 while i < queue.len() {
151 let idx = queue.len() - i - 1;
152 if matches!(queue[idx], ActionOp::Action(_)) {
153 if let Some(dropped_item) = queue.remove(idx) {
154 found_action_to_drop = true;
155 if let Some(metrics) = &self.metrics {
156 if let ActionOp::Action(action) = &dropped_item {
157 metrics.action_dropped(Some(action as &dyn std::any::Any));
158 }
159 }
160 break;
161 }
162 }
163 i += 1;
164 }
165
166 if !found_action_to_drop {
168 queue = self.condvar.wait(queue).unwrap();
169 if *self.closed.lock().unwrap() {
170 return Err(SenderError::ChannelClosed);
171 }
172 continue; }
174 }
176 BackpressurePolicy::DropOldestIf(Some(predicate)) => {
177 let mut dropped_count = 0;
179 let mut i = 0;
180 while i < queue.len() {
181 #[cfg(feature = "store-log")]
182 eprintln!(
183 "store: check droppable {}/{}: {}",
184 i,
185 queue.len(),
186 describe_action_op(&queue[i])
187 );
188 let should_drop = if let ActionOp::Action(action) = &queue[i] {
190 predicate(action)
191 } else {
192 false };
194 if should_drop {
195 if let Some(dropped_item) = queue.remove(i) {
196 dropped_count += 1;
197 if let Some(metrics) = &self.metrics {
198 if let ActionOp::Action(action) = &dropped_item {
199 metrics.action_dropped(Some(action as &dyn std::any::Any));
200 }
201 }
202 break;
203 }
204 }
205 i += 1;
206 }
207
208 if dropped_count == 0 {
209 #[cfg(feature = "store-log")]
211 eprintln!(
212 "store: no droppable items found, blocking until space available: queue len={}",
213 queue.len()
214 );
215 queue = self.condvar.wait(queue).unwrap();
216 if *self.closed.lock().unwrap() {
217 return Err(SenderError::ChannelClosed);
218 }
219 }
220 }
222 BackpressurePolicy::DropLatestIf(Some(predicate)) => {
223 let mut dropped_count = 0;
225 let mut i = 0;
226 while i < queue.len() {
227 let index = queue.len() - i - 1;
228 #[cfg(feature = "store-log")]
229 eprintln!(
230 "store: check droppable {}/{}: {}",
231 index,
232 queue.len(),
233 describe_action_op(&queue[index])
234 );
235 let should_drop = if let ActionOp::Action(action) = &queue[index] {
237 predicate(action)
238 } else {
239 false };
241 if should_drop {
242 if let Some(dropped_item) = queue.remove(index) {
243 dropped_count += 1;
244 if let Some(metrics) = &self.metrics {
245 if let ActionOp::Action(action) = &dropped_item {
246 metrics.action_dropped(Some(action as &dyn std::any::Any));
247 }
248 }
249 break;
250 }
251 }
252 i += 1;
253 }
254
255 if dropped_count == 0 {
256 #[cfg(feature = "store-log")]
258 eprintln!(
259 "store: no droppable items found, blocking until space available: queue len={}",
260 queue.len()
261 );
262 queue = self.condvar.wait(queue).unwrap();
263 if *self.closed.lock().unwrap() {
264 return Err(SenderError::ChannelClosed);
265 }
266 }
267 }
269 }
270 }
271
272 if let Some(metrics) = &self.metrics {
274 metrics.queue_size(queue.len());
275 }
276
277 self.condvar.notify_one();
278 Ok(queue.len() as i64)
279 }
280
281 fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
282 if *self.closed.lock().unwrap() {
284 return Err(SenderError::ChannelClosed);
285 }
286
287 let mut queue = self.queue.lock().unwrap();
288
289 if queue.len() < self.capacity {
290 queue.push_back(item);
291 } else {
292 #[allow(deprecated)]
293 match &self.policy {
294 BackpressurePolicy::BlockOnFull => {
295 return Err(SenderError::SendError(item));
296 }
297 BackpressurePolicy::DropOldestIf(None) => {
298 let mut found_action_to_drop = false;
300 let mut i = 0;
301 while i < queue.len() {
302 if matches!(queue[i], ActionOp::Action(_)) {
303 if let Some(dropped_item) = queue.remove(i) {
304 found_action_to_drop = true;
305 if let Some(metrics) = &self.metrics {
306 if let ActionOp::Action(action) = &dropped_item {
307 metrics.action_dropped(Some(action as &dyn std::any::Any));
308 }
309 }
310 break;
311 }
312 }
313 i += 1;
314 }
315
316 if found_action_to_drop {
317 queue.push_back(item);
318 } else {
319 #[cfg(feature = "store-log")]
321 eprintln!(
322 "store: failed to drop oldest action while trying to send: queue len={}",
323 queue.len()
324 );
325 return Err(SenderError::SendError(item));
326 }
327 }
328 BackpressurePolicy::DropLatestIf(None) => {
329 let mut found_action_to_drop = false;
330 let mut i = 0;
331 while i < queue.len() {
332 let index = queue.len() - i - 1;
333 if matches!(queue[index], ActionOp::Action(_)) {
334 if let Some(dropped_item) = queue.remove(index) {
335 found_action_to_drop = true;
336 if let Some(metrics) = &self.metrics {
337 if let ActionOp::Action(action) = &dropped_item {
338 metrics.action_dropped(Some(action as &dyn std::any::Any));
339 }
340 }
341 break;
342 }
343 }
344 i += 1;
345 }
346
347 if found_action_to_drop {
348 queue.push_back(item);
349 } else {
350 #[cfg(feature = "store-log")]
352 eprintln!(
353 "store: failed to drop latest action while trying to send: queue len={}",
354 queue.len()
355 );
356 return Err(SenderError::SendError(item));
357 }
358 }
359 BackpressurePolicy::DropOldestIf(Some(predicate)) => {
360 let mut dropped_count = 0;
362 let mut i = 0;
363 while i < queue.len() {
364 #[cfg(feature = "store-log")]
365 eprintln!(
366 "store: check droppable {}/{}: {}",
367 i,
368 queue.len(),
369 describe_action_op(&queue[i])
370 );
371 let should_drop = if let ActionOp::Action(action) = &queue[i] {
373 predicate(action)
374 } else {
375 false };
377 if should_drop {
378 if let Some(dropped_item) = queue.remove(i) {
379 dropped_count += 1;
380 if let Some(metrics) = &self.metrics {
381 if let ActionOp::Action(action) = &dropped_item {
382 metrics.action_dropped(Some(action as &dyn std::any::Any));
383 }
384 }
385 break;
386 }
387 }
388 i += 1;
389 }
390
391 if dropped_count > 0 {
392 queue.push_back(item);
393 } else {
394 #[cfg(feature = "store-log")]
395 eprintln!(
396 "store: failed to drop the oldestif while trying to send: queue len={}",
397 queue.len()
398 );
399 return Err(SenderError::SendError(item));
400 }
401 }
402 BackpressurePolicy::DropLatestIf(Some(predicate)) => {
403 let mut dropped_count = 0;
405 let mut i = 0;
406 while i < queue.len() {
407 let index = queue.len() - i - 1;
408 #[cfg(feature = "store-log")]
409 eprintln!(
410 "store: check droppable {}/{}: {}",
411 index,
412 queue.len(),
413 describe_action_op(&queue[index])
414 );
415 let should_drop = if let ActionOp::Action(action) = &queue[index] {
417 predicate(action)
418 } else {
419 false };
421 if should_drop {
422 if let Some(dropped_item) = queue.remove(index) {
423 dropped_count += 1;
424 if let Some(metrics) = &self.metrics {
425 if let ActionOp::Action(action) = &dropped_item {
426 metrics.action_dropped(Some(action as &dyn std::any::Any));
427 }
428 }
429 break;
430 }
431 }
432 i += 1;
433 }
434
435 if dropped_count > 0 {
436 queue.push_back(item);
437 } else {
438 #[cfg(feature = "store-log")]
439 eprintln!(
440 "store: failed to drop the latestif while trying to send: queue len={}",
441 queue.len()
442 );
443 return Err(SenderError::SendError(item));
444 }
445 }
446 }
447 }
448
449 if let Some(metrics) = &self.metrics {
451 metrics.queue_size(queue.len());
452 }
453
454 self.condvar.notify_one();
455 Ok(queue.len() as i64)
456 }
457
458 fn recv(&self) -> Option<ActionOp<T>> {
459 let mut queue = self.queue.lock().unwrap();
460
461 while queue.is_empty() {
463 if *self.closed.lock().unwrap() {
464 return None;
465 }
466 queue = self.condvar.wait(queue).unwrap();
467 }
468
469 let item = queue.pop_front();
470 self.condvar.notify_one();
471 item
472 }
473
474 fn try_recv(&self) -> Option<ActionOp<T>> {
475 let mut queue = self.queue.lock().unwrap();
476 let item = queue.pop_front();
477 if item.is_some() {
478 self.condvar.notify_one();
479 }
480 item
481 }
482
483 fn len(&self) -> usize {
484 self.queue.lock().unwrap().len()
485 }
486
487 fn close(&self) {
488 *self.closed.lock().unwrap() = true;
489 self.condvar.notify_all();
490 }
491}
492
493#[derive(Clone)]
495pub(crate) struct SenderChannel<T>
496where
497 T: Send + Sync + Clone + 'static,
498{
499 _name: String,
500 queue: Arc<MpscQueue<T>>,
501}
502
503impl<Action> Drop for SenderChannel<Action>
504where
505 Action: Send + Sync + Clone + 'static,
506{
507 fn drop(&mut self) {
508 #[cfg(feature = "store-log")]
509 eprintln!("store: drop '{}' sender channel", self._name);
510 }
511}
512
513#[allow(dead_code)]
514impl<T> SenderChannel<T>
515where
516 T: Send + Sync + Clone + 'static,
517{
518 pub fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
521 self.queue.send(item)
522 }
523
524 pub fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
526 self.queue.try_send(item)
527 }
528}
529
530#[allow(dead_code)]
531pub(crate) struct ReceiverChannel<T>
532where
533 T: Send + Sync + Clone + 'static,
534{
535 name: String,
536 queue: Arc<MpscQueue<T>>,
537 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
538}
539
540impl<Action> Drop for ReceiverChannel<Action>
541where
542 Action: Send + Sync + Clone + 'static,
543{
544 fn drop(&mut self) {
545 #[cfg(feature = "store-log")]
546 eprintln!("store: drop '{}' receiver channel", self.name);
547 self.close();
548 }
549}
550
551#[allow(dead_code)]
552impl<T> ReceiverChannel<T>
553where
554 T: Send + Sync + Clone + 'static,
555{
556 pub fn recv(&self) -> Option<ActionOp<T>> {
557 self.queue.recv()
558 }
559
560 #[allow(dead_code)]
561 pub fn try_recv(&self) -> Option<ActionOp<T>> {
562 self.queue.try_recv()
563 }
564
565 pub fn len(&self) -> usize {
566 self.queue.len()
567 }
568
569 pub fn close(&self) {
570 self.queue.close();
571 }
572}
573
574pub(crate) struct BackpressureChannel<MSG>
576where
577 MSG: Send + Sync + Clone + 'static,
578{
579 phantom_data: PhantomData<MSG>,
580}
581
582impl<MSG> BackpressureChannel<MSG>
583where
584 MSG: Send + Sync + Clone + 'static,
585{
586 #[allow(dead_code)]
587 pub fn pair(
588 capacity: usize,
589 policy: BackpressurePolicy<MSG>,
590 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
591 Self::pair_with("<anon>", capacity, policy, None)
592 }
593
594 #[allow(dead_code)]
595 pub fn pair_with_metrics(
596 capacity: usize,
597 policy: BackpressurePolicy<MSG>,
598 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
599 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
600 Self::pair_with("<anon>", capacity, policy, metrics)
601 }
602
603 #[allow(dead_code)]
604 pub fn pair_with(
605 name: &str,
606 capacity: usize,
607 policy: BackpressurePolicy<MSG>,
608 metrics: Option<Arc<dyn Metrics + Send + Sync>>,
609 ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
610 let queue = Arc::new(MpscQueue::new(capacity, policy, metrics.clone()));
611
612 (
613 SenderChannel {
614 _name: name.to_string(),
615 queue: queue.clone(),
616 },
617 ReceiverChannel {
618 name: name.to_string(),
619 queue,
620 metrics,
621 },
622 )
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[test]
631 fn test_basic_send_recv() {
632 let (sender, receiver) =
633 BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
634
635 sender.send(ActionOp::Action(1)).unwrap();
636 sender.send(ActionOp::Action(2)).unwrap();
637
638 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
639 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
640 assert_eq!(receiver.try_recv(), None);
641 }
642
643 #[test]
644 fn test_drop_oldest() {
645 let (sender, receiver) =
646 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
647
648 sender.send(ActionOp::Action(1)).unwrap();
649 sender.send(ActionOp::Action(2)).unwrap();
650 sender.send(ActionOp::Action(3)).unwrap(); assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
653 assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
654 assert_eq!(receiver.try_recv(), None);
655 }
656
657 #[test]
658 fn test_drop_latest() {
659 let (sender, receiver) =
660 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
661
662 sender.send(ActionOp::Action(1)).unwrap();
663 sender.send(ActionOp::Action(2)).unwrap(); sender.send(ActionOp::Action(3)).unwrap();
665
666 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
667 assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
668 assert_eq!(receiver.try_recv(), None);
669 }
670
671 #[test]
672 fn test_predicate_dropping() {
673 let predicate = Box::new(|value: &i32| *value < 5);
675
676 let (sender, receiver) =
677 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
678
679 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap(); let result = sender.send(ActionOp::Action(7)); assert!(
686 result.is_ok(),
687 "Should succeed because predicate should drop the first item"
688 );
689
690 let received_item = receiver.recv();
692 assert!(received_item.is_some());
693 if let Some(ActionOp::Action(value)) = received_item {
694 assert_eq!(value, 6, "Should receive 6, not 1");
696 }
697
698 let received_item = receiver.recv();
699 assert!(received_item.is_some());
700 if let Some(ActionOp::Action(value)) = received_item {
701 assert_eq!(value, 7, "Should receive 7");
702 }
703 }
704
705 #[test]
706 fn test_add_subscriber_action() {
707 let (sender, receiver) =
708 BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
709
710 sender.send(ActionOp::AddSubscriber).unwrap();
712
713 let received = receiver.recv();
715 assert!(received.is_some());
716 match received.unwrap() {
717 ActionOp::AddSubscriber => {
718 }
720 _ => panic!("Expected AddSubscriber action"),
721 }
722 }
723
724 #[test]
725 fn test_add_subscriber_with_predicate() {
726 let predicate = Box::new(|value: &i32| *value < 5);
728
729 let (sender, receiver) =
730 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
731
732 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap(); let result = sender.send(ActionOp::AddSubscriber);
738 assert!(result.is_ok(), "AddSubscriber should be sent successfully");
739
740 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
742 assert_eq!(received_items.len(), 2);
743
744 let has_add_subscriber =
746 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
747 assert!(has_add_subscriber, "AddSubscriber should be received");
748 }
749
750 #[test]
751 fn test_mixed_action_types() {
752 let (sender, receiver) =
753 BackpressureChannel::<i32>::pair(10, BackpressurePolicy::BlockOnFull);
754
755 sender.send(ActionOp::Action(1)).unwrap();
757 sender.send(ActionOp::AddSubscriber).unwrap();
758 sender.send(ActionOp::Action(2)).unwrap();
759 sender.send(ActionOp::AddSubscriber).unwrap();
760 sender.send(ActionOp::Action(3)).unwrap();
761
762 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
764 assert_eq!(received_items.len(), 5);
765
766 match &received_items[0] {
768 ActionOp::Action(value) => assert_eq!(*value, 1),
769 _ => panic!("Expected Action(1)"),
770 }
771 match &received_items[1] {
772 ActionOp::AddSubscriber => {
773 }
775 _ => panic!("Expected AddSubscriber"),
776 }
777 match &received_items[2] {
778 ActionOp::Action(value) => assert_eq!(*value, 2),
779 _ => panic!("Expected Action(2)"),
780 }
781 match &received_items[3] {
782 ActionOp::AddSubscriber => {
783 }
785 _ => panic!("Expected AddSubscriber"),
786 }
787 match &received_items[4] {
788 ActionOp::Action(value) => assert_eq!(*value, 3),
789 _ => panic!("Expected Action(3)"),
790 }
791 }
792
793 #[test]
794 fn test_block_on_full() {
795 let (sender, receiver) =
796 BackpressureChannel::<i32>::pair(1, BackpressurePolicy::BlockOnFull);
797
798 sender.send(ActionOp::Action(1)).unwrap();
799
800 let result = sender.try_send(ActionOp::Action(2));
802 assert!(result.is_err(), "Should fail because channel is full");
803
804 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
806
807 sender.send(ActionOp::Action(2)).unwrap();
809 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
810 }
811
812 #[test]
813 fn test_drop_oldest_if_predicate_always_false() {
814 let (sender, receiver) = BackpressureChannel::pair(
815 3,
816 BackpressurePolicy::DropOldestIf(Some(Box::new(|_| false))), );
818
819 assert!(sender.try_send(ActionOp::Action(1)).is_ok());
821 assert!(sender.try_send(ActionOp::Action(2)).is_ok());
822 assert!(sender.try_send(ActionOp::Action(3)).is_ok());
823 assert_eq!(receiver.len(), 3);
824
825 let result = sender.try_send(ActionOp::Action(4));
828 assert!(
829 result.is_err(),
830 "Should fail because no items match the predicate"
831 );
832
833 assert_eq!(receiver.len(), 3);
835 assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
836 assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
837 assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
838 }
839
840 #[test]
841 fn test_drop_oldest_if_predicate_sometimes_true() {
842 let (sender, receiver) = BackpressureChannel::pair(
843 3,
844 BackpressurePolicy::DropOldestIf(Some(Box::new(|value: &i32| *value < 5))), );
846
847 assert!(sender.try_send(ActionOp::Action(6)).is_ok()); assert!(sender.try_send(ActionOp::Action(2)).is_ok()); assert!(sender.try_send(ActionOp::Action(8)).is_ok()); assert_eq!(receiver.len(), 3);
852
853 let result = sender.try_send(ActionOp::Action(9));
855 assert!(
856 result.is_ok(),
857 "Should fail because no items match the predicate"
858 );
859
860 let result = sender.try_send(ActionOp::Action(10)); assert!(
863 result.is_err(),
864 "Should fail because no items match the predicate"
865 );
866
867 assert_eq!(receiver.len(), 3);
869 assert_eq!(receiver.recv(), Some(ActionOp::Action(6)));
870 assert_eq!(receiver.recv(), Some(ActionOp::Action(8)));
871 assert_eq!(receiver.recv(), Some(ActionOp::Action(9)));
872 }
873
874 #[test]
875 fn test_drop_oldest_only_actions() {
876 let (sender, receiver) =
877 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
878
879 sender.send(ActionOp::AddSubscriber).unwrap();
881 sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
882 assert_eq!(receiver.len(), 2);
883
884 let result = sender.try_send(ActionOp::Action(1));
886 assert!(
887 result.is_err(),
888 "Should fail because no Actions can be dropped"
889 );
890
891 assert_eq!(receiver.len(), 2);
893 assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
894 assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
895 }
896
897 #[test]
898 fn test_drop_oldest_with_mixed_types() {
899 let (sender, receiver) =
900 BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
901
902 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::AddSubscriber).unwrap(); sender.send(ActionOp::Action(2)).unwrap(); assert_eq!(receiver.len(), 3);
907
908 sender.send(ActionOp::Action(3)).unwrap();
910 assert_eq!(receiver.len(), 3);
911
912 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
914 assert_eq!(received_items.len(), 3);
915
916 let has_add_subscriber =
918 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
919 let has_action_2 = received_items.iter().any(|item| matches!(item, ActionOp::Action(2)));
920 let has_action_3 = received_items.iter().any(|item| matches!(item, ActionOp::Action(3)));
921 let has_action_1 = received_items.iter().any(|item| matches!(item, ActionOp::Action(1)));
922
923 assert!(has_add_subscriber, "AddSubscriber should be preserved");
924 assert!(has_action_2, "Action(2) should be preserved");
925 assert!(has_action_3, "Action(3) should be added");
926 assert!(!has_action_1, "Action(1) should be dropped");
927 }
928
929 #[test]
930 fn test_drop_latest_only_actions() {
931 let (sender, receiver) =
932 BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
933
934 sender.send(ActionOp::AddSubscriber).unwrap();
936 sender.send(ActionOp::Action(1)).unwrap();
937 sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
938 assert_eq!(receiver.len(), 3);
939
940 let result = sender.try_send(ActionOp::Action(2));
942 assert!(result.is_ok(), "Action should be dropped successfully");
943
944 let result = sender.try_send(ActionOp::StateFunction);
946 assert!(result.is_ok(), "Action should be dropped successfully");
947
948 let result = sender.try_send(ActionOp::AddSubscriber);
950 assert!(
951 result.is_err(),
952 "Should fail because channel is full and non-Actions can't be dropped"
953 );
954
955 assert_eq!(receiver.len(), 3);
957 assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
958 assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
959 assert_eq!(receiver.recv(), Some(ActionOp::StateFunction));
960 }
961
962 #[test]
963 fn test_drop_policy_preserves_critical_operations() {
964 let (sender, receiver) =
966 BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
967
968 sender.send(ActionOp::Action(1)).unwrap();
970 sender.send(ActionOp::Action(2)).unwrap();
971 sender.send(ActionOp::AddSubscriber).unwrap();
972 assert_eq!(receiver.len(), 3);
973
974 sender.send(ActionOp::Action(3)).unwrap();
976 assert_eq!(receiver.len(), 3);
977
978 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
980 let has_add_subscriber =
981 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
982 assert!(
983 has_add_subscriber,
984 "AddSubscriber should never be dropped by drop policy"
985 );
986
987 let action_values: Vec<i32> = received_items
989 .iter()
990 .filter_map(|item| {
991 if let ActionOp::Action(val) = item {
992 Some(*val)
993 } else {
994 None
995 }
996 })
997 .collect();
998 assert_eq!(action_values.len(), 2, "Should have 2 Actions remaining");
999 assert!(action_values.contains(&2), "Action(2) should be preserved");
1000 assert!(action_values.contains(&3), "Action(3) should be added");
1001 assert!(!action_values.contains(&1), "Action(1) should be dropped");
1002 }
1003
1004 #[test]
1005 fn test_drop_policy_with_exit_operations() {
1006 let (sender, receiver) =
1008 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
1009
1010 let exit_time = std::time::Instant::now();
1011
1012 sender.send(ActionOp::Action(1)).unwrap();
1014 sender.send(ActionOp::Exit(exit_time)).unwrap();
1015 assert_eq!(receiver.len(), 2);
1016
1017 let result = sender.send(ActionOp::Action(2));
1019 assert!(result.is_ok(), "Action should be dropped, not Exit");
1020
1021 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1023 assert_eq!(received_items.len(), 2);
1024
1025 let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1026 assert!(!has_action_1, "Action(1) should be dropped");
1027
1028 let has_exit = received_items.get(0).unwrap() == &ActionOp::Exit(exit_time);
1029 assert!(has_exit, "Exit should never be dropped by drop policy");
1030
1031 let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
1032 assert!(has_action_2, "Action(2) added");
1033 }
1034
1035 #[test]
1036 fn test_drop_oldest_action_ordering() {
1037 let (sender, receiver) =
1039 BackpressureChannel::<i32>::pair(4, BackpressurePolicy::DropOldestIf(None));
1040
1041 sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::AddSubscriber).unwrap();
1044 sender.send(ActionOp::Action(2)).unwrap();
1045 sender.send(ActionOp::Action(3)).unwrap();
1046 assert_eq!(receiver.len(), 4);
1047
1048 sender.send(ActionOp::Action(4)).unwrap();
1050 assert_eq!(receiver.len(), 4);
1051
1052 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1053
1054 let has_add_subscriber =
1056 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
1057 assert!(has_add_subscriber, "AddSubscriber should be preserved");
1058
1059 let action_values: Vec<i32> = received_items
1061 .iter()
1062 .filter_map(|item| {
1063 if let ActionOp::Action(val) = item {
1064 Some(*val)
1065 } else {
1066 None
1067 }
1068 })
1069 .collect();
1070
1071 assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
1072 assert!(
1073 !action_values.contains(&1),
1074 "Action(1) should be dropped (oldest)"
1075 );
1076 assert!(action_values.contains(&2), "Action(2) should be preserved");
1077 assert!(action_values.contains(&3), "Action(3) should be preserved");
1078 assert!(action_values.contains(&4), "Action(4) should be added");
1079 }
1080
1081 #[test]
1082 fn test_drop_policy_blocking_behavior() {
1083 let (sender, receiver) =
1085 BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
1086
1087 sender.send(ActionOp::AddSubscriber).unwrap();
1089 sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
1090 assert_eq!(receiver.len(), 2);
1091
1092 let result = sender.try_send(ActionOp::Action(1));
1094 assert!(
1095 result.is_err(),
1096 "Should fail because no Actions available to drop"
1097 );
1098
1099 let result = sender.try_send(ActionOp::AddSubscriber);
1101 assert!(
1102 result.is_err(),
1103 "Should fail because channel is full and no Actions to drop"
1104 );
1105
1106 assert_eq!(receiver.len(), 2);
1108 assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
1109 assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
1110 }
1111
1112 #[test]
1113 fn test_drop_oldest_if_predicate_always_true() {
1114 let (sender, receiver) = BackpressureChannel::<i32>::pair(
1115 3,
1116 BackpressurePolicy::DropOldestIf(Some(Box::new(|_| true))),
1117 );
1118
1119 sender.send(ActionOp::Action(1)).unwrap();
1120 sender.send(ActionOp::Action(2)).unwrap();
1121 sender.send(ActionOp::Action(3)).unwrap();
1122
1123 let result = sender.send(ActionOp::Action(4));
1124 assert!(result.is_ok(), "Action should be dropped successfully");
1125
1126 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1127 assert_eq!(received_items.len(), 3);
1128
1129 let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1130 assert!(!has_action_1, "Action(1) should be dropped");
1131
1132 let has_action_2 = received_items.get(0).unwrap() == &ActionOp::Action(2);
1133 assert!(has_action_2, "Action(2) should be preserved");
1134
1135 let has_action_3 = received_items.get(1).unwrap() == &ActionOp::Action(3);
1136 assert!(has_action_3, "Action(3) should be preserved");
1137
1138 let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
1139 assert!(has_action_4, "Action(4) should be added");
1140 }
1141
1142 #[test]
1143 fn test_drop_latest_if_predicate_always_true() {
1144 let (sender, receiver) = BackpressureChannel::<i32>::pair(
1145 3,
1146 BackpressurePolicy::DropLatestIf(Some(Box::new(|_| true))),
1147 );
1148
1149 sender.send(ActionOp::Action(1)).unwrap();
1150 sender.send(ActionOp::Action(2)).unwrap();
1151 sender.send(ActionOp::Action(3)).unwrap(); let result = sender.send(ActionOp::Action(4));
1154 assert!(result.is_ok(), "Action should be dropped successfully");
1155
1156 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1157 assert_eq!(received_items.len(), 3);
1158
1159 let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1160 assert!(has_action_1, "Action(1) should be preserved");
1161
1162 let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
1163 assert!(has_action_2, "Action(2) should be preserved");
1164
1165 let has_action_3 = received_items.get(2).unwrap() == &ActionOp::Action(3);
1166 assert!(!has_action_3, "Action(3) should be dropped");
1167
1168 let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
1169 assert!(has_action_4, "Action(4) should be added");
1170 }
1171
1172 #[test]
1173 fn test_drop_latest_vs_drop_oldest_action_selection() {
1174 let (sender_oldest, receiver_oldest) =
1178 BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
1179
1180 sender_oldest.send(ActionOp::Action(10)).unwrap(); sender_oldest.send(ActionOp::AddSubscriber).unwrap();
1182 sender_oldest.send(ActionOp::Action(20)).unwrap(); sender_oldest.send(ActionOp::Action(30)).unwrap(); let oldest_items: Vec<_> = std::iter::from_fn(|| receiver_oldest.try_recv()).collect();
1187 let oldest_actions: Vec<i32> = oldest_items
1188 .iter()
1189 .filter_map(|item| {
1190 if let ActionOp::Action(val) = item {
1191 Some(*val)
1192 } else {
1193 None
1194 }
1195 })
1196 .collect();
1197
1198 assert!(
1199 oldest_actions.contains(&20),
1200 "DropOldest should preserve Action(20)"
1201 );
1202 assert!(
1203 oldest_actions.contains(&30),
1204 "DropOldest should preserve Action(30)"
1205 );
1206
1207 let (sender_latest, receiver_latest) =
1209 BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
1210
1211 sender_latest.send(ActionOp::Action(100)).unwrap(); sender_latest.send(ActionOp::AddSubscriber).unwrap();
1213 sender_latest.send(ActionOp::Action(200)).unwrap(); let result = sender_latest.send(ActionOp::Action(300));
1217 assert!(
1218 result.is_ok(),
1219 "send Action should be success, should drop the latest Action(200)"
1220 );
1221
1222 let latest_items: Vec<_> = std::iter::from_fn(|| receiver_latest.try_recv()).collect();
1223 assert_eq!(latest_items.len(), 3);
1224
1225 let has_action_100 = latest_items.get(0).unwrap() == &ActionOp::Action(100);
1226 assert!(has_action_100, "DropLatest should preserve Action(100)");
1227
1228 assert_eq!(
1229 latest_items.get(1).unwrap(),
1230 &ActionOp::AddSubscriber,
1231 "DropLatest should preserve AddSubscriber"
1232 );
1233
1234 let has_action_200 = latest_items.get(2).unwrap() == &ActionOp::Action(200);
1235 assert!(!has_action_200, "DropLatest should drop Action(200)");
1236
1237 let has_action_300 = latest_items.get(2).unwrap() == &ActionOp::Action(300);
1238 assert!(has_action_300, "DropLatest should add Action(300)");
1239 }
1240
1241 #[test]
1242 fn test_comprehensive_drop_policy_verification() {
1243 let (sender, receiver) =
1245 BackpressureChannel::<String>::pair(5, BackpressurePolicy::DropOldestIf(None));
1246
1247 sender.send(ActionOp::Action("action1".to_string())).unwrap();
1249 sender.send(ActionOp::AddSubscriber).unwrap();
1250 sender.send(ActionOp::Action("action2".to_string())).unwrap();
1251 sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
1252 sender.send(ActionOp::Action("action3".to_string())).unwrap();
1253 assert_eq!(receiver.len(), 5);
1254
1255 sender.send(ActionOp::Action("action4".to_string())).unwrap();
1258 assert_eq!(receiver.len(), 5);
1259
1260 let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1261
1262 let has_add_subscriber =
1264 received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
1265 let has_exit = received_items.iter().any(|item| matches!(item, ActionOp::Exit(_)));
1266 assert!(has_add_subscriber, "AddSubscriber must be preserved");
1267 assert!(has_exit, "Exit must be preserved");
1268
1269 let action_values: Vec<String> = received_items
1271 .iter()
1272 .filter_map(|item| {
1273 if let ActionOp::Action(val) = item {
1274 Some(val.clone())
1275 } else {
1276 None
1277 }
1278 })
1279 .collect();
1280
1281 assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
1282 assert!(
1283 !action_values.contains(&"action1".to_string()),
1284 "action1 should be dropped (oldest Action)"
1285 );
1286 assert!(
1287 action_values.contains(&"action2".to_string()),
1288 "action2 should be preserved"
1289 );
1290 assert!(
1291 action_values.contains(&"action3".to_string()),
1292 "action3 should be preserved"
1293 );
1294 assert!(
1295 action_values.contains(&"action4".to_string()),
1296 "action4 should be added"
1297 );
1298
1299 assert_eq!(received_items.len(), 5, "Total items should remain 5");
1301 }
1302}