use crate::metrics::Metrics;
#[cfg(feature = "store-log")]
use crate::store_impl::describe_action_op;
use crate::ActionOp;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Default)]
pub enum BackpressurePolicy<T>
where
T: Send + Sync + Clone + 'static,
{
#[default]
BlockOnFull,
#[allow(clippy::type_complexity)]
DropLatestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
#[allow(clippy::type_complexity)]
DropOldestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
}
#[derive(thiserror::Error)]
pub(crate) enum SenderError<T> {
#[error("Failed to send item")]
SendError(T),
#[error("Channel is closed")]
ChannelClosed,
}
impl<T> fmt::Debug for SenderError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SenderError::SendError(_) => f.write_str("SendError(..)"),
SenderError::ChannelClosed => f.write_str("ChannelClosed"),
}
}
}
struct MpscQueue<T>
where
T: Send + Sync + Clone + 'static,
{
queue: Mutex<VecDeque<ActionOp<T>>>,
condvar: Condvar,
capacity: usize,
policy: BackpressurePolicy<T>,
metrics: Option<Arc<dyn Metrics + Send + Sync>>,
closed: Mutex<bool>,
}
impl<T> MpscQueue<T>
where
T: Send + Sync + Clone + 'static,
{
fn new(
capacity: usize,
policy: BackpressurePolicy<T>,
metrics: Option<Arc<dyn Metrics + Send + Sync>>,
) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
capacity,
policy,
metrics,
closed: Mutex::new(false),
}
}
fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
let mut queue: std::sync::MutexGuard<'_, VecDeque<ActionOp<T>>> =
self.queue.lock().unwrap();
loop {
if queue.len() < self.capacity {
queue.push_back(item);
break;
}
#[allow(deprecated)]
match &self.policy {
BackpressurePolicy::BlockOnFull => {
while queue.len() >= self.capacity {
queue = self.condvar.wait(queue).unwrap();
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
}
}
BackpressurePolicy::DropOldestIf(None) => {
let mut found_action_to_drop = false;
let mut i = 0;
while i < queue.len() {
if matches!(queue[i], ActionOp::Action(_)) {
if let Some(dropped_item) = queue.remove(i) {
found_action_to_drop = true;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if !found_action_to_drop {
queue = self.condvar.wait(queue).unwrap();
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
continue; }
}
BackpressurePolicy::DropLatestIf(None) => {
let mut found_action_to_drop = false;
let mut i = 0;
while i < queue.len() {
let idx = queue.len() - i - 1;
if matches!(queue[idx], ActionOp::Action(_)) {
if let Some(dropped_item) = queue.remove(idx) {
found_action_to_drop = true;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if !found_action_to_drop {
queue = self.condvar.wait(queue).unwrap();
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
continue; }
}
BackpressurePolicy::DropOldestIf(Some(predicate)) => {
let mut dropped_count = 0;
let mut i = 0;
while i < queue.len() {
#[cfg(feature = "store-log")]
eprintln!(
"store: check droppable {}/{}: {}",
i,
queue.len(),
describe_action_op(&queue[i])
);
let should_drop = if let ActionOp::Action(action) = &queue[i] {
predicate(action)
} else {
false };
if should_drop {
if let Some(dropped_item) = queue.remove(i) {
dropped_count += 1;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if dropped_count == 0 {
#[cfg(feature = "store-log")]
eprintln!(
"store: no droppable items found, blocking until space available: queue len={}",
queue.len()
);
queue = self.condvar.wait(queue).unwrap();
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
}
}
BackpressurePolicy::DropLatestIf(Some(predicate)) => {
let mut dropped_count = 0;
let mut i = 0;
while i < queue.len() {
let index = queue.len() - i - 1;
#[cfg(feature = "store-log")]
eprintln!(
"store: check droppable {}/{}: {}",
index,
queue.len(),
describe_action_op(&queue[index])
);
let should_drop = if let ActionOp::Action(action) = &queue[index] {
predicate(action)
} else {
false };
if should_drop {
if let Some(dropped_item) = queue.remove(index) {
dropped_count += 1;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if dropped_count == 0 {
#[cfg(feature = "store-log")]
eprintln!(
"store: no droppable items found, blocking until space available: queue len={}",
queue.len()
);
queue = self.condvar.wait(queue).unwrap();
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
}
}
}
}
if let Some(metrics) = &self.metrics {
metrics.queue_size(queue.len());
}
self.condvar.notify_one();
Ok(queue.len() as i64)
}
fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
if *self.closed.lock().unwrap() {
return Err(SenderError::ChannelClosed);
}
let mut queue = self.queue.lock().unwrap();
if queue.len() < self.capacity {
queue.push_back(item);
} else {
#[allow(deprecated)]
match &self.policy {
BackpressurePolicy::BlockOnFull => {
return Err(SenderError::SendError(item));
}
BackpressurePolicy::DropOldestIf(None) => {
let mut found_action_to_drop = false;
let mut i = 0;
while i < queue.len() {
if matches!(queue[i], ActionOp::Action(_)) {
if let Some(dropped_item) = queue.remove(i) {
found_action_to_drop = true;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if found_action_to_drop {
queue.push_back(item);
} else {
#[cfg(feature = "store-log")]
eprintln!(
"store: failed to drop oldest action while trying to send: queue len={}",
queue.len()
);
return Err(SenderError::SendError(item));
}
}
BackpressurePolicy::DropLatestIf(None) => {
let mut found_action_to_drop = false;
let mut i = 0;
while i < queue.len() {
let index = queue.len() - i - 1;
if matches!(queue[index], ActionOp::Action(_)) {
if let Some(dropped_item) = queue.remove(index) {
found_action_to_drop = true;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if found_action_to_drop {
queue.push_back(item);
} else {
#[cfg(feature = "store-log")]
eprintln!(
"store: failed to drop latest action while trying to send: queue len={}",
queue.len()
);
return Err(SenderError::SendError(item));
}
}
BackpressurePolicy::DropOldestIf(Some(predicate)) => {
let mut dropped_count = 0;
let mut i = 0;
while i < queue.len() {
#[cfg(feature = "store-log")]
eprintln!(
"store: check droppable {}/{}: {}",
i,
queue.len(),
describe_action_op(&queue[i])
);
let should_drop = if let ActionOp::Action(action) = &queue[i] {
predicate(action)
} else {
false };
if should_drop {
if let Some(dropped_item) = queue.remove(i) {
dropped_count += 1;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if dropped_count > 0 {
queue.push_back(item);
} else {
#[cfg(feature = "store-log")]
eprintln!(
"store: failed to drop the oldestif while trying to send: queue len={}",
queue.len()
);
return Err(SenderError::SendError(item));
}
}
BackpressurePolicy::DropLatestIf(Some(predicate)) => {
let mut dropped_count = 0;
let mut i = 0;
while i < queue.len() {
let index = queue.len() - i - 1;
#[cfg(feature = "store-log")]
eprintln!(
"store: check droppable {}/{}: {}",
index,
queue.len(),
describe_action_op(&queue[index])
);
let should_drop = if let ActionOp::Action(action) = &queue[index] {
predicate(action)
} else {
false };
if should_drop {
if let Some(dropped_item) = queue.remove(index) {
dropped_count += 1;
if let Some(metrics) = &self.metrics {
if let ActionOp::Action(action) = &dropped_item {
metrics.action_dropped(Some(action as &dyn std::any::Any));
}
}
break;
}
}
i += 1;
}
if dropped_count > 0 {
queue.push_back(item);
} else {
#[cfg(feature = "store-log")]
eprintln!(
"store: failed to drop the latestif while trying to send: queue len={}",
queue.len()
);
return Err(SenderError::SendError(item));
}
}
}
}
if let Some(metrics) = &self.metrics {
metrics.queue_size(queue.len());
}
self.condvar.notify_one();
Ok(queue.len() as i64)
}
fn recv(&self) -> Option<ActionOp<T>> {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() {
if *self.closed.lock().unwrap() {
return None;
}
queue = self.condvar.wait(queue).unwrap();
}
let item = queue.pop_front();
self.condvar.notify_one();
item
}
fn try_recv(&self) -> Option<ActionOp<T>> {
let mut queue = self.queue.lock().unwrap();
let item = queue.pop_front();
if item.is_some() {
self.condvar.notify_one();
}
item
}
fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
fn close(&self) {
*self.closed.lock().unwrap() = true;
self.condvar.notify_all();
}
}
#[derive(Clone)]
pub(crate) struct SenderChannel<T>
where
T: Send + Sync + Clone + 'static,
{
_name: String,
queue: Arc<MpscQueue<T>>,
}
impl<Action> Drop for SenderChannel<Action>
where
Action: Send + Sync + Clone + 'static,
{
fn drop(&mut self) {
#[cfg(feature = "store-log")]
eprintln!("store: drop '{}' sender channel", self._name);
}
}
#[allow(dead_code)]
impl<T> SenderChannel<T>
where
T: Send + Sync + Clone + 'static,
{
pub fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
self.queue.send(item)
}
pub fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
self.queue.try_send(item)
}
}
#[allow(dead_code)]
pub(crate) struct ReceiverChannel<T>
where
T: Send + Sync + Clone + 'static,
{
name: String,
queue: Arc<MpscQueue<T>>,
metrics: Option<Arc<dyn Metrics + Send + Sync>>,
}
impl<Action> Drop for ReceiverChannel<Action>
where
Action: Send + Sync + Clone + 'static,
{
fn drop(&mut self) {
#[cfg(feature = "store-log")]
eprintln!("store: drop '{}' receiver channel", self.name);
self.close();
}
}
#[allow(dead_code)]
impl<T> ReceiverChannel<T>
where
T: Send + Sync + Clone + 'static,
{
pub fn recv(&self) -> Option<ActionOp<T>> {
self.queue.recv()
}
#[allow(dead_code)]
pub fn try_recv(&self) -> Option<ActionOp<T>> {
self.queue.try_recv()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn close(&self) {
self.queue.close();
}
}
pub(crate) struct BackpressureChannel<MSG>
where
MSG: Send + Sync + Clone + 'static,
{
phantom_data: PhantomData<MSG>,
}
impl<MSG> BackpressureChannel<MSG>
where
MSG: Send + Sync + Clone + 'static,
{
#[allow(dead_code)]
pub fn pair(
capacity: usize,
policy: BackpressurePolicy<MSG>,
) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
Self::pair_with("<anon>", capacity, policy, None)
}
#[allow(dead_code)]
pub fn pair_with_metrics(
capacity: usize,
policy: BackpressurePolicy<MSG>,
metrics: Option<Arc<dyn Metrics + Send + Sync>>,
) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
Self::pair_with("<anon>", capacity, policy, metrics)
}
#[allow(dead_code)]
pub fn pair_with(
name: &str,
capacity: usize,
policy: BackpressurePolicy<MSG>,
metrics: Option<Arc<dyn Metrics + Send + Sync>>,
) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
let queue = Arc::new(MpscQueue::new(capacity, policy, metrics.clone()));
(
SenderChannel {
_name: name.to_string(),
queue: queue.clone(),
},
ReceiverChannel {
name: name.to_string(),
queue,
metrics,
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_send_recv() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
assert_eq!(receiver.try_recv(), None);
}
#[test]
fn test_drop_oldest() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::Action(3)).unwrap();
assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
assert_eq!(receiver.try_recv(), None);
}
#[test]
fn test_drop_latest() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap(); sender.send(ActionOp::Action(3)).unwrap();
assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
assert_eq!(receiver.try_recv(), None);
}
#[test]
fn test_predicate_dropping() {
let predicate = Box::new(|value: &i32| *value < 5);
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap();
let result = sender.send(ActionOp::Action(7)); assert!(
result.is_ok(),
"Should succeed because predicate should drop the first item"
);
let received_item = receiver.recv();
assert!(received_item.is_some());
if let Some(ActionOp::Action(value)) = received_item {
assert_eq!(value, 6, "Should receive 6, not 1");
}
let received_item = receiver.recv();
assert!(received_item.is_some());
if let Some(ActionOp::Action(value)) = received_item {
assert_eq!(value, 7, "Should receive 7");
}
}
#[test]
fn test_add_subscriber_action() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
sender.send(ActionOp::AddSubscriber).unwrap();
let received = receiver.recv();
assert!(received.is_some());
match received.unwrap() {
ActionOp::AddSubscriber => {
}
_ => panic!("Expected AddSubscriber action"),
}
}
#[test]
fn test_add_subscriber_with_predicate() {
let predicate = Box::new(|value: &i32| *value < 5);
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::Action(6)).unwrap();
let result = sender.send(ActionOp::AddSubscriber);
assert!(result.is_ok(), "AddSubscriber should be sent successfully");
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 2);
let has_add_subscriber =
received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
assert!(has_add_subscriber, "AddSubscriber should be received");
}
#[test]
fn test_mixed_action_types() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(10, BackpressurePolicy::BlockOnFull);
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Action(3)).unwrap();
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 5);
match &received_items[0] {
ActionOp::Action(value) => assert_eq!(*value, 1),
_ => panic!("Expected Action(1)"),
}
match &received_items[1] {
ActionOp::AddSubscriber => {
}
_ => panic!("Expected AddSubscriber"),
}
match &received_items[2] {
ActionOp::Action(value) => assert_eq!(*value, 2),
_ => panic!("Expected Action(2)"),
}
match &received_items[3] {
ActionOp::AddSubscriber => {
}
_ => panic!("Expected AddSubscriber"),
}
match &received_items[4] {
ActionOp::Action(value) => assert_eq!(*value, 3),
_ => panic!("Expected Action(3)"),
}
}
#[test]
fn test_block_on_full() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(1, BackpressurePolicy::BlockOnFull);
sender.send(ActionOp::Action(1)).unwrap();
let result = sender.try_send(ActionOp::Action(2));
assert!(result.is_err(), "Should fail because channel is full");
assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
sender.send(ActionOp::Action(2)).unwrap();
assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
}
#[test]
fn test_drop_oldest_if_predicate_always_false() {
let (sender, receiver) = BackpressureChannel::pair(
3,
BackpressurePolicy::DropOldestIf(Some(Box::new(|_| false))), );
assert!(sender.try_send(ActionOp::Action(1)).is_ok());
assert!(sender.try_send(ActionOp::Action(2)).is_ok());
assert!(sender.try_send(ActionOp::Action(3)).is_ok());
assert_eq!(receiver.len(), 3);
let result = sender.try_send(ActionOp::Action(4));
assert!(
result.is_err(),
"Should fail because no items match the predicate"
);
assert_eq!(receiver.len(), 3);
assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
}
#[test]
fn test_drop_oldest_if_predicate_sometimes_true() {
let (sender, receiver) = BackpressureChannel::pair(
3,
BackpressurePolicy::DropOldestIf(Some(Box::new(|value: &i32| *value < 5))), );
assert!(sender.try_send(ActionOp::Action(6)).is_ok()); assert!(sender.try_send(ActionOp::Action(2)).is_ok()); assert!(sender.try_send(ActionOp::Action(8)).is_ok()); assert_eq!(receiver.len(), 3);
let result = sender.try_send(ActionOp::Action(9));
assert!(
result.is_ok(),
"Should fail because no items match the predicate"
);
let result = sender.try_send(ActionOp::Action(10)); assert!(
result.is_err(),
"Should fail because no items match the predicate"
);
assert_eq!(receiver.len(), 3);
assert_eq!(receiver.recv(), Some(ActionOp::Action(6)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(8)));
assert_eq!(receiver.recv(), Some(ActionOp::Action(9)));
}
#[test]
fn test_drop_oldest_only_actions() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
assert_eq!(receiver.len(), 2);
let result = sender.try_send(ActionOp::Action(1));
assert!(
result.is_err(),
"Should fail because no Actions can be dropped"
);
assert_eq!(receiver.len(), 2);
assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
}
#[test]
fn test_drop_oldest_with_mixed_types() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::AddSubscriber).unwrap(); sender.send(ActionOp::Action(2)).unwrap(); assert_eq!(receiver.len(), 3);
sender.send(ActionOp::Action(3)).unwrap();
assert_eq!(receiver.len(), 3);
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 3);
let has_add_subscriber =
received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
let has_action_2 = received_items.iter().any(|item| matches!(item, ActionOp::Action(2)));
let has_action_3 = received_items.iter().any(|item| matches!(item, ActionOp::Action(3)));
let has_action_1 = received_items.iter().any(|item| matches!(item, ActionOp::Action(1)));
assert!(has_add_subscriber, "AddSubscriber should be preserved");
assert!(has_action_2, "Action(2) should be preserved");
assert!(has_action_3, "Action(3) should be added");
assert!(!has_action_1, "Action(1) should be dropped");
}
#[test]
fn test_drop_latest_only_actions() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
assert_eq!(receiver.len(), 3);
let result = sender.try_send(ActionOp::Action(2));
assert!(result.is_ok(), "Action should be dropped successfully");
let result = sender.try_send(ActionOp::StateFunction);
assert!(result.is_ok(), "Action should be dropped successfully");
let result = sender.try_send(ActionOp::AddSubscriber);
assert!(
result.is_err(),
"Should fail because channel is full and non-Actions can't be dropped"
);
assert_eq!(receiver.len(), 3);
assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
assert_eq!(receiver.recv(), Some(ActionOp::StateFunction));
}
#[test]
fn test_drop_policy_preserves_critical_operations() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::AddSubscriber).unwrap();
assert_eq!(receiver.len(), 3);
sender.send(ActionOp::Action(3)).unwrap();
assert_eq!(receiver.len(), 3);
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
let has_add_subscriber =
received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
assert!(
has_add_subscriber,
"AddSubscriber should never be dropped by drop policy"
);
let action_values: Vec<i32> = received_items
.iter()
.filter_map(|item| {
if let ActionOp::Action(val) = item {
Some(*val)
} else {
None
}
})
.collect();
assert_eq!(action_values.len(), 2, "Should have 2 Actions remaining");
assert!(action_values.contains(&2), "Action(2) should be preserved");
assert!(action_values.contains(&3), "Action(3) should be added");
assert!(!action_values.contains(&1), "Action(1) should be dropped");
}
#[test]
fn test_drop_policy_with_exit_operations() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
let exit_time = std::time::Instant::now();
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Exit(exit_time)).unwrap();
assert_eq!(receiver.len(), 2);
let result = sender.send(ActionOp::Action(2));
assert!(result.is_ok(), "Action should be dropped, not Exit");
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 2);
let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
assert!(!has_action_1, "Action(1) should be dropped");
let has_exit = received_items.get(0).unwrap() == &ActionOp::Exit(exit_time);
assert!(has_exit, "Exit should never be dropped by drop policy");
let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
assert!(has_action_2, "Action(2) added");
}
#[test]
fn test_drop_oldest_action_ordering() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(4, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::Action(1)).unwrap(); sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::Action(3)).unwrap();
assert_eq!(receiver.len(), 4);
sender.send(ActionOp::Action(4)).unwrap();
assert_eq!(receiver.len(), 4);
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
let has_add_subscriber =
received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
assert!(has_add_subscriber, "AddSubscriber should be preserved");
let action_values: Vec<i32> = received_items
.iter()
.filter_map(|item| {
if let ActionOp::Action(val) = item {
Some(*val)
} else {
None
}
})
.collect();
assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
assert!(
!action_values.contains(&1),
"Action(1) should be dropped (oldest)"
);
assert!(action_values.contains(&2), "Action(2) should be preserved");
assert!(action_values.contains(&3), "Action(3) should be preserved");
assert!(action_values.contains(&4), "Action(4) should be added");
}
#[test]
fn test_drop_policy_blocking_behavior() {
let (sender, receiver) =
BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
assert_eq!(receiver.len(), 2);
let result = sender.try_send(ActionOp::Action(1));
assert!(
result.is_err(),
"Should fail because no Actions available to drop"
);
let result = sender.try_send(ActionOp::AddSubscriber);
assert!(
result.is_err(),
"Should fail because channel is full and no Actions to drop"
);
assert_eq!(receiver.len(), 2);
assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
}
#[test]
fn test_drop_oldest_if_predicate_always_true() {
let (sender, receiver) = BackpressureChannel::<i32>::pair(
3,
BackpressurePolicy::DropOldestIf(Some(Box::new(|_| true))),
);
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::Action(3)).unwrap();
let result = sender.send(ActionOp::Action(4));
assert!(result.is_ok(), "Action should be dropped successfully");
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 3);
let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
assert!(!has_action_1, "Action(1) should be dropped");
let has_action_2 = received_items.get(0).unwrap() == &ActionOp::Action(2);
assert!(has_action_2, "Action(2) should be preserved");
let has_action_3 = received_items.get(1).unwrap() == &ActionOp::Action(3);
assert!(has_action_3, "Action(3) should be preserved");
let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
assert!(has_action_4, "Action(4) should be added");
}
#[test]
fn test_drop_latest_if_predicate_always_true() {
let (sender, receiver) = BackpressureChannel::<i32>::pair(
3,
BackpressurePolicy::DropLatestIf(Some(Box::new(|_| true))),
);
sender.send(ActionOp::Action(1)).unwrap();
sender.send(ActionOp::Action(2)).unwrap();
sender.send(ActionOp::Action(3)).unwrap();
let result = sender.send(ActionOp::Action(4));
assert!(result.is_ok(), "Action should be dropped successfully");
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
assert_eq!(received_items.len(), 3);
let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
assert!(has_action_1, "Action(1) should be preserved");
let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
assert!(has_action_2, "Action(2) should be preserved");
let has_action_3 = received_items.get(2).unwrap() == &ActionOp::Action(3);
assert!(!has_action_3, "Action(3) should be dropped");
let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
assert!(has_action_4, "Action(4) should be added");
}
#[test]
fn test_drop_latest_vs_drop_oldest_action_selection() {
let (sender_oldest, receiver_oldest) =
BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
sender_oldest.send(ActionOp::Action(10)).unwrap(); sender_oldest.send(ActionOp::AddSubscriber).unwrap();
sender_oldest.send(ActionOp::Action(20)).unwrap();
sender_oldest.send(ActionOp::Action(30)).unwrap();
let oldest_items: Vec<_> = std::iter::from_fn(|| receiver_oldest.try_recv()).collect();
let oldest_actions: Vec<i32> = oldest_items
.iter()
.filter_map(|item| {
if let ActionOp::Action(val) = item {
Some(*val)
} else {
None
}
})
.collect();
assert!(
oldest_actions.contains(&20),
"DropOldest should preserve Action(20)"
);
assert!(
oldest_actions.contains(&30),
"DropOldest should preserve Action(30)"
);
let (sender_latest, receiver_latest) =
BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
sender_latest.send(ActionOp::Action(100)).unwrap(); sender_latest.send(ActionOp::AddSubscriber).unwrap();
sender_latest.send(ActionOp::Action(200)).unwrap();
let result = sender_latest.send(ActionOp::Action(300));
assert!(
result.is_ok(),
"send Action should be success, should drop the latest Action(200)"
);
let latest_items: Vec<_> = std::iter::from_fn(|| receiver_latest.try_recv()).collect();
assert_eq!(latest_items.len(), 3);
let has_action_100 = latest_items.get(0).unwrap() == &ActionOp::Action(100);
assert!(has_action_100, "DropLatest should preserve Action(100)");
assert_eq!(
latest_items.get(1).unwrap(),
&ActionOp::AddSubscriber,
"DropLatest should preserve AddSubscriber"
);
let has_action_200 = latest_items.get(2).unwrap() == &ActionOp::Action(200);
assert!(!has_action_200, "DropLatest should drop Action(200)");
let has_action_300 = latest_items.get(2).unwrap() == &ActionOp::Action(300);
assert!(has_action_300, "DropLatest should add Action(300)");
}
#[test]
fn test_comprehensive_drop_policy_verification() {
let (sender, receiver) =
BackpressureChannel::<String>::pair(5, BackpressurePolicy::DropOldestIf(None));
sender.send(ActionOp::Action("action1".to_string())).unwrap();
sender.send(ActionOp::AddSubscriber).unwrap();
sender.send(ActionOp::Action("action2".to_string())).unwrap();
sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
sender.send(ActionOp::Action("action3".to_string())).unwrap();
assert_eq!(receiver.len(), 5);
sender.send(ActionOp::Action("action4".to_string())).unwrap();
assert_eq!(receiver.len(), 5);
let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
let has_add_subscriber =
received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
let has_exit = received_items.iter().any(|item| matches!(item, ActionOp::Exit(_)));
assert!(has_add_subscriber, "AddSubscriber must be preserved");
assert!(has_exit, "Exit must be preserved");
let action_values: Vec<String> = received_items
.iter()
.filter_map(|item| {
if let ActionOp::Action(val) = item {
Some(val.clone())
} else {
None
}
})
.collect();
assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
assert!(
!action_values.contains(&"action1".to_string()),
"action1 should be dropped (oldest Action)"
);
assert!(
action_values.contains(&"action2".to_string()),
"action2 should be preserved"
);
assert!(
action_values.contains(&"action3".to_string()),
"action3 should be preserved"
);
assert!(
action_values.contains(&"action4".to_string()),
"action4 should be added"
);
assert_eq!(received_items.len(), 5, "Total items should remain 5");
}
}