rs_store/
channel.rs

1use crate::metrics::Metrics;
2#[cfg(feature = "store-log")]
3use crate::store_impl::describe_action_op;
4use crate::ActionOp;
5use std::collections::VecDeque;
6use std::fmt;
7use std::marker::PhantomData;
8use std::sync::{Arc, Condvar, Mutex};
9
10/// the Backpressure policy
11#[derive(Default)]
12pub enum BackpressurePolicy<T>
13where
14    T: Send + Sync + Clone + 'static,
15{
16    /// Block the sender when the queue is full
17    #[default]
18    BlockOnFull,
19    /// Drop items based on predicate when the queue is full, it drops from the newest to the oldest
20    /// With this policy, [send] method can be Err when all items are not droppable
21    /// Predicate is only applied to ActionOp::Action variants, other variants are never dropped
22    #[allow(clippy::type_complexity)]
23    DropLatestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
24    /// Drop items based on predicate when the queue is full, it drops from the oldest to the newest
25    /// With this policy, [send] method can be Err when all items are not droppable
26    /// Predicate is only applied to ActionOp::Action variants, other variants are never dropped
27    #[allow(clippy::type_complexity)]
28    DropOldestIf(Option<Box<dyn Fn(&T) -> bool + Send + Sync>>),
29}
30
31#[derive(thiserror::Error)]
32pub(crate) enum SenderError<T> {
33    #[error("Failed to send item")]
34    SendError(T),
35    #[error("Channel is closed")]
36    ChannelClosed,
37}
38
39impl<T> fmt::Debug for SenderError<T> {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            SenderError::SendError(_) => f.write_str("SendError(..)"),
43            SenderError::ChannelClosed => f.write_str("ChannelClosed"),
44        }
45    }
46}
47
48/// Internal MPSC Queue implementation
49struct MpscQueue<T>
50where
51    T: Send + Sync + Clone + 'static,
52{
53    queue: Mutex<VecDeque<ActionOp<T>>>,
54    condvar: Condvar,
55    capacity: usize,
56    policy: BackpressurePolicy<T>,
57    metrics: Option<Arc<dyn Metrics + Send + Sync>>,
58    closed: Mutex<bool>,
59}
60
61impl<T> MpscQueue<T>
62where
63    T: Send + Sync + Clone + 'static,
64{
65    fn new(
66        capacity: usize,
67        policy: BackpressurePolicy<T>,
68        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
69    ) -> Self {
70        Self {
71            queue: Mutex::new(VecDeque::new()),
72            condvar: Condvar::new(),
73            capacity,
74            policy,
75            metrics,
76            closed: Mutex::new(false),
77        }
78    }
79
80    /// send put an item to the queue with backpressure policy
81    /// when it is full, the send will try to drop an item based on the policy:
82    /// - with BlockOnFull policy, the send will block until the space is available
83    /// - with DropOldestIf policy, the send will drop an item if the predicate is true from the oldest to the newest
84    /// - with DropLatestIf policy, the send will drop an item if the predicate is true from the latest to the oldest
85    ///
86    /// if nothing is dropped, the send will block until the space is available
87    fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
88        // Check if channel is closed
89        if *self.closed.lock().unwrap() {
90            return Err(SenderError::ChannelClosed);
91        }
92
93        let mut queue: std::sync::MutexGuard<'_, VecDeque<ActionOp<T>>> =
94            self.queue.lock().unwrap();
95
96        // Loop until we can successfully add the item to the queue
97        loop {
98            if queue.len() < self.capacity {
99                // Queue has space, add the item
100                queue.push_back(item);
101                break;
102            }
103
104            // Queue is full, try to handle based on policy
105            #[allow(deprecated)]
106            match &self.policy {
107                BackpressurePolicy::BlockOnFull => {
108                    // Wait until space is available
109                    while queue.len() >= self.capacity {
110                        queue = self.condvar.wait(queue).unwrap();
111                        if *self.closed.lock().unwrap() {
112                            return Err(SenderError::ChannelClosed);
113                        }
114                    }
115                    // Continue loop to add item
116                }
117                BackpressurePolicy::DropOldestIf(None) => {
118                    // Drop the oldest item, but only if it's an Action variant
119                    let mut found_action_to_drop = false;
120                    let mut i = 0;
121                    while i < queue.len() {
122                        if matches!(queue[i], ActionOp::Action(_)) {
123                            if let Some(dropped_item) = queue.remove(i) {
124                                found_action_to_drop = true;
125                                if let Some(metrics) = &self.metrics {
126                                    if let ActionOp::Action(action) = &dropped_item {
127                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
128                                    }
129                                }
130                                break;
131                            }
132                        }
133                        i += 1;
134                    }
135
136                    // If no Action variant was found to drop, block until space is available
137                    if !found_action_to_drop {
138                        queue = self.condvar.wait(queue).unwrap();
139                        if *self.closed.lock().unwrap() {
140                            return Err(SenderError::ChannelClosed);
141                        }
142                        continue; // Continue the outer loop to try again
143                    }
144                    // Continue loop to add item
145                }
146                BackpressurePolicy::DropLatestIf(None) => {
147                    // Drop the new item only if it's an Action variant
148                    let mut found_action_to_drop = false;
149                    let mut i = 0;
150                    while i < queue.len() {
151                        let idx = queue.len() - i - 1;
152                        if matches!(queue[idx], ActionOp::Action(_)) {
153                            if let Some(dropped_item) = queue.remove(idx) {
154                                found_action_to_drop = true;
155                                if let Some(metrics) = &self.metrics {
156                                    if let ActionOp::Action(action) = &dropped_item {
157                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
158                                    }
159                                }
160                                break;
161                            }
162                        }
163                        i += 1;
164                    }
165
166                    // If no Action variant was found to drop, block until space is available
167                    if !found_action_to_drop {
168                        queue = self.condvar.wait(queue).unwrap();
169                        if *self.closed.lock().unwrap() {
170                            return Err(SenderError::ChannelClosed);
171                        }
172                        continue; // Continue the outer loop to try again
173                    }
174                    // Continue loop to add item
175                }
176                BackpressurePolicy::DropOldestIf(Some(predicate)) => {
177                    // Find and drop items that match the predicate from oldest to newest
178                    let mut dropped_count = 0;
179                    let mut i = 0;
180                    while i < queue.len() {
181                        #[cfg(feature = "store-log")]
182                        eprintln!(
183                            "store: check droppable {}/{}: {}",
184                            i,
185                            queue.len(),
186                            describe_action_op(&queue[i])
187                        );
188                        // Only apply predicate to Action variants
189                        let should_drop = if let ActionOp::Action(action) = &queue[i] {
190                            predicate(action)
191                        } else {
192                            false // Never drop non-Action variants
193                        };
194                        if should_drop {
195                            if let Some(dropped_item) = queue.remove(i) {
196                                dropped_count += 1;
197                                if let Some(metrics) = &self.metrics {
198                                    if let ActionOp::Action(action) = &dropped_item {
199                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
200                                    }
201                                }
202                                break;
203                            }
204                        }
205                        i += 1;
206                    }
207
208                    if dropped_count == 0 {
209                        // Nothing was dropped, block until space is available
210                        #[cfg(feature = "store-log")]
211                        eprintln!(
212                            "store: no droppable items found, blocking until space available: queue len={}",
213                            queue.len()
214                        );
215                        queue = self.condvar.wait(queue).unwrap();
216                        if *self.closed.lock().unwrap() {
217                            return Err(SenderError::ChannelClosed);
218                        }
219                    }
220                    // Continue loop to try again
221                }
222                BackpressurePolicy::DropLatestIf(Some(predicate)) => {
223                    // Find and drop items that match the predicate from latest to oldest
224                    let mut dropped_count = 0;
225                    let mut i = 0;
226                    while i < queue.len() {
227                        let index = queue.len() - i - 1;
228                        #[cfg(feature = "store-log")]
229                        eprintln!(
230                            "store: check droppable {}/{}: {}",
231                            index,
232                            queue.len(),
233                            describe_action_op(&queue[index])
234                        );
235                        // Only apply predicate to Action variants
236                        let should_drop = if let ActionOp::Action(action) = &queue[index] {
237                            predicate(action)
238                        } else {
239                            false // Never drop non-Action variants
240                        };
241                        if should_drop {
242                            if let Some(dropped_item) = queue.remove(index) {
243                                dropped_count += 1;
244                                if let Some(metrics) = &self.metrics {
245                                    if let ActionOp::Action(action) = &dropped_item {
246                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
247                                    }
248                                }
249                                break;
250                            }
251                        }
252                        i += 1;
253                    }
254
255                    if dropped_count == 0 {
256                        // Nothing was dropped, block until space is available
257                        #[cfg(feature = "store-log")]
258                        eprintln!(
259                            "store: no droppable items found, blocking until space available: queue len={}",
260                            queue.len()
261                        );
262                        queue = self.condvar.wait(queue).unwrap();
263                        if *self.closed.lock().unwrap() {
264                            return Err(SenderError::ChannelClosed);
265                        }
266                    }
267                    // Continue loop to try again
268                }
269            }
270        }
271
272        // Update metrics
273        if let Some(metrics) = &self.metrics {
274            metrics.queue_size(queue.len());
275        }
276
277        self.condvar.notify_one();
278        Ok(queue.len() as i64)
279    }
280
281    fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
282        // Check if channel is closed
283        if *self.closed.lock().unwrap() {
284            return Err(SenderError::ChannelClosed);
285        }
286
287        let mut queue = self.queue.lock().unwrap();
288
289        if queue.len() < self.capacity {
290            queue.push_back(item);
291        } else {
292            #[allow(deprecated)]
293            match &self.policy {
294                BackpressurePolicy::BlockOnFull => {
295                    return Err(SenderError::SendError(item));
296                }
297                BackpressurePolicy::DropOldestIf(None) => {
298                    // Drop the oldest item, but only if it's an Action variant
299                    let mut found_action_to_drop = false;
300                    let mut i = 0;
301                    while i < queue.len() {
302                        if matches!(queue[i], ActionOp::Action(_)) {
303                            if let Some(dropped_item) = queue.remove(i) {
304                                found_action_to_drop = true;
305                                if let Some(metrics) = &self.metrics {
306                                    if let ActionOp::Action(action) = &dropped_item {
307                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
308                                    }
309                                }
310                                break;
311                            }
312                        }
313                        i += 1;
314                    }
315
316                    if found_action_to_drop {
317                        queue.push_back(item);
318                    } else {
319                        // No Action variant found to drop, return error
320                        #[cfg(feature = "store-log")]
321                        eprintln!(
322                            "store: failed to drop oldest action while trying to send: queue len={}",
323                            queue.len()
324                        );
325                        return Err(SenderError::SendError(item));
326                    }
327                }
328                BackpressurePolicy::DropLatestIf(None) => {
329                    let mut found_action_to_drop = false;
330                    let mut i = 0;
331                    while i < queue.len() {
332                        let index = queue.len() - i - 1;
333                        if matches!(queue[index], ActionOp::Action(_)) {
334                            if let Some(dropped_item) = queue.remove(index) {
335                                found_action_to_drop = true;
336                                if let Some(metrics) = &self.metrics {
337                                    if let ActionOp::Action(action) = &dropped_item {
338                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
339                                    }
340                                }
341                                break;
342                            }
343                        }
344                        i += 1;
345                    }
346
347                    if found_action_to_drop {
348                        queue.push_back(item);
349                    } else {
350                        // No Action variant found to drop, return error
351                        #[cfg(feature = "store-log")]
352                        eprintln!(
353                            "store: failed to drop latest action while trying to send: queue len={}",
354                            queue.len()
355                        );
356                        return Err(SenderError::SendError(item));
357                    }
358                }
359                BackpressurePolicy::DropOldestIf(Some(predicate)) => {
360                    // Find and drop items that match the predicate
361                    let mut dropped_count = 0;
362                    let mut i = 0;
363                    while i < queue.len() {
364                        #[cfg(feature = "store-log")]
365                        eprintln!(
366                            "store: check droppable {}/{}: {}",
367                            i,
368                            queue.len(),
369                            describe_action_op(&queue[i])
370                        );
371                        // Only apply predicate to Action variants
372                        let should_drop = if let ActionOp::Action(action) = &queue[i] {
373                            predicate(action)
374                        } else {
375                            false // Never drop non-Action variants
376                        };
377                        if should_drop {
378                            if let Some(dropped_item) = queue.remove(i) {
379                                dropped_count += 1;
380                                if let Some(metrics) = &self.metrics {
381                                    if let ActionOp::Action(action) = &dropped_item {
382                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
383                                    }
384                                }
385                                break;
386                            }
387                        }
388                        i += 1;
389                    }
390
391                    if dropped_count > 0 {
392                        queue.push_back(item);
393                    } else {
394                        #[cfg(feature = "store-log")]
395                        eprintln!(
396                            "store: failed to drop the oldestif while trying to send: queue len={}",
397                            queue.len()
398                        );
399                        return Err(SenderError::SendError(item));
400                    }
401                }
402                BackpressurePolicy::DropLatestIf(Some(predicate)) => {
403                    // Find and drop items that match the predicate
404                    let mut dropped_count = 0;
405                    let mut i = 0;
406                    while i < queue.len() {
407                        let index = queue.len() - i - 1;
408                        #[cfg(feature = "store-log")]
409                        eprintln!(
410                            "store: check droppable {}/{}: {}",
411                            index,
412                            queue.len(),
413                            describe_action_op(&queue[index])
414                        );
415                        // Only apply predicate to Action variants
416                        let should_drop = if let ActionOp::Action(action) = &queue[index] {
417                            predicate(action)
418                        } else {
419                            false // Never drop non-Action variants
420                        };
421                        if should_drop {
422                            if let Some(dropped_item) = queue.remove(index) {
423                                dropped_count += 1;
424                                if let Some(metrics) = &self.metrics {
425                                    if let ActionOp::Action(action) = &dropped_item {
426                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
427                                    }
428                                }
429                                break;
430                            }
431                        }
432                        i += 1;
433                    }
434
435                    if dropped_count > 0 {
436                        queue.push_back(item);
437                    } else {
438                        #[cfg(feature = "store-log")]
439                        eprintln!(
440                            "store: failed to drop the latestif while trying to send: queue len={}",
441                            queue.len()
442                        );
443                        return Err(SenderError::SendError(item));
444                    }
445                }
446            }
447        }
448
449        // Update metrics
450        if let Some(metrics) = &self.metrics {
451            metrics.queue_size(queue.len());
452        }
453
454        self.condvar.notify_one();
455        Ok(queue.len() as i64)
456    }
457
458    fn recv(&self) -> Option<ActionOp<T>> {
459        let mut queue = self.queue.lock().unwrap();
460
461        // Wait until there's an item or channel is closed
462        while queue.is_empty() {
463            if *self.closed.lock().unwrap() {
464                return None;
465            }
466            queue = self.condvar.wait(queue).unwrap();
467        }
468
469        let item = queue.pop_front();
470        self.condvar.notify_one();
471        item
472    }
473
474    fn try_recv(&self) -> Option<ActionOp<T>> {
475        let mut queue = self.queue.lock().unwrap();
476        let item = queue.pop_front();
477        if item.is_some() {
478            self.condvar.notify_one();
479        }
480        item
481    }
482
483    fn len(&self) -> usize {
484        self.queue.lock().unwrap().len()
485    }
486
487    fn close(&self) {
488        *self.closed.lock().unwrap() = true;
489        self.condvar.notify_all();
490    }
491}
492
493/// Channel to hold the sender with backpressure policy
494#[derive(Clone)]
495pub(crate) struct SenderChannel<T>
496where
497    T: Send + Sync + Clone + 'static,
498{
499    _name: String,
500    queue: Arc<MpscQueue<T>>,
501}
502
503impl<Action> Drop for SenderChannel<Action>
504where
505    Action: Send + Sync + Clone + 'static,
506{
507    fn drop(&mut self) {
508        #[cfg(feature = "store-log")]
509        eprintln!("store: drop '{}' sender channel", self._name);
510    }
511}
512
513#[allow(dead_code)]
514impl<T> SenderChannel<T>
515where
516    T: Send + Sync + Clone + 'static,
517{
518    /// when it is full, the send will try to drop an item based on the policy
519    /// if nothing is dropped, the send will block until the space is available
520    pub fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
521        self.queue.send(item)
522    }
523
524    /// when it is full, it will return Err
525    pub fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
526        self.queue.try_send(item)
527    }
528}
529
530#[allow(dead_code)]
531pub(crate) struct ReceiverChannel<T>
532where
533    T: Send + Sync + Clone + 'static,
534{
535    name: String,
536    queue: Arc<MpscQueue<T>>,
537    metrics: Option<Arc<dyn Metrics + Send + Sync>>,
538}
539
540impl<Action> Drop for ReceiverChannel<Action>
541where
542    Action: Send + Sync + Clone + 'static,
543{
544    fn drop(&mut self) {
545        #[cfg(feature = "store-log")]
546        eprintln!("store: drop '{}' receiver channel", self.name);
547        self.close();
548    }
549}
550
551#[allow(dead_code)]
552impl<T> ReceiverChannel<T>
553where
554    T: Send + Sync + Clone + 'static,
555{
556    pub fn recv(&self) -> Option<ActionOp<T>> {
557        self.queue.recv()
558    }
559
560    #[allow(dead_code)]
561    pub fn try_recv(&self) -> Option<ActionOp<T>> {
562        self.queue.try_recv()
563    }
564
565    pub fn len(&self) -> usize {
566        self.queue.len()
567    }
568
569    pub fn close(&self) {
570        self.queue.close();
571    }
572}
573
574/// Channel with back pressure
575pub(crate) struct BackpressureChannel<MSG>
576where
577    MSG: Send + Sync + Clone + 'static,
578{
579    phantom_data: PhantomData<MSG>,
580}
581
582impl<MSG> BackpressureChannel<MSG>
583where
584    MSG: Send + Sync + Clone + 'static,
585{
586    #[allow(dead_code)]
587    pub fn pair(
588        capacity: usize,
589        policy: BackpressurePolicy<MSG>,
590    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
591        Self::pair_with("<anon>", capacity, policy, None)
592    }
593
594    #[allow(dead_code)]
595    pub fn pair_with_metrics(
596        capacity: usize,
597        policy: BackpressurePolicy<MSG>,
598        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
599    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
600        Self::pair_with("<anon>", capacity, policy, metrics)
601    }
602
603    #[allow(dead_code)]
604    pub fn pair_with(
605        name: &str,
606        capacity: usize,
607        policy: BackpressurePolicy<MSG>,
608        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
609    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
610        let queue = Arc::new(MpscQueue::new(capacity, policy, metrics.clone()));
611
612        (
613            SenderChannel {
614                _name: name.to_string(),
615                queue: queue.clone(),
616            },
617            ReceiverChannel {
618                name: name.to_string(),
619                queue,
620                metrics,
621            },
622        )
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    #[test]
631    fn test_basic_send_recv() {
632        let (sender, receiver) =
633            BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
634
635        sender.send(ActionOp::Action(1)).unwrap();
636        sender.send(ActionOp::Action(2)).unwrap();
637
638        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
639        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
640        assert_eq!(receiver.try_recv(), None);
641    }
642
643    #[test]
644    fn test_drop_oldest() {
645        let (sender, receiver) =
646            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
647
648        sender.send(ActionOp::Action(1)).unwrap();
649        sender.send(ActionOp::Action(2)).unwrap();
650        sender.send(ActionOp::Action(3)).unwrap(); // Should drop 1
651
652        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
653        assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
654        assert_eq!(receiver.try_recv(), None);
655    }
656
657    #[test]
658    fn test_drop_latest() {
659        let (sender, receiver) =
660            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
661
662        sender.send(ActionOp::Action(1)).unwrap();
663        sender.send(ActionOp::Action(2)).unwrap(); // should drop 2
664        sender.send(ActionOp::Action(3)).unwrap();
665
666        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
667        assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
668        assert_eq!(receiver.try_recv(), None);
669    }
670
671    #[test]
672    fn test_predicate_dropping() {
673        // predicate: 5보다 작은 값들은 drop
674        let predicate = Box::new(|value: &i32| *value < 5);
675
676        let (sender, receiver) =
677            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
678
679        // 채널을 가득 채우기
680        sender.send(ActionOp::Action(1)).unwrap(); // 첫 번째 아이템 (drop 대상)
681        sender.send(ActionOp::Action(6)).unwrap(); // 두 번째 아이템 (유지 대상)
682
683        // 세 번째 아이템을 보내면 채널이 가득 차서 predicate가 적용됨
684        let result = sender.send(ActionOp::Action(7)); // 세 번째 아이템 (유지 대상)
685        assert!(
686            result.is_ok(),
687            "Should succeed because predicate should drop the first item"
688        );
689
690        // 소비자에서 아이템 확인
691        let received_item = receiver.recv();
692        assert!(received_item.is_some());
693        if let Some(ActionOp::Action(value)) = received_item {
694            // predicate에 의해 1이 drop되고 6이 유지되어야 함
695            assert_eq!(value, 6, "Should receive 6, not 1");
696        }
697
698        let received_item = receiver.recv();
699        assert!(received_item.is_some());
700        if let Some(ActionOp::Action(value)) = received_item {
701            assert_eq!(value, 7, "Should receive 7");
702        }
703    }
704
705    #[test]
706    fn test_add_subscriber_action() {
707        let (sender, receiver) =
708            BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
709
710        // AddSubscriber 액션 전송
711        sender.send(ActionOp::AddSubscriber).unwrap();
712
713        // 수신 확인
714        let received = receiver.recv();
715        assert!(received.is_some());
716        match received.unwrap() {
717            ActionOp::AddSubscriber => {
718                // AddSubscriber 액션이 정상적으로 수신됨
719            }
720            _ => panic!("Expected AddSubscriber action"),
721        }
722    }
723
724    #[test]
725    fn test_add_subscriber_with_predicate() {
726        // predicate는 Action에만 적용되므로 AddSubscriber는 자동으로 보존됨
727        let predicate = Box::new(|value: &i32| *value < 5);
728
729        let (sender, receiver) =
730            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(Some(predicate)));
731
732        // 채널을 가득 채우기
733        sender.send(ActionOp::Action(1)).unwrap(); // drop 대상
734        sender.send(ActionOp::Action(6)).unwrap(); // 유지 대상
735
736        // AddSubscriber 액션을 보내면 predicate에 의해 다른 액션이 drop되어야 함
737        let result = sender.send(ActionOp::AddSubscriber);
738        assert!(result.is_ok(), "AddSubscriber should be sent successfully");
739
740        // 수신 확인
741        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
742        assert_eq!(received_items.len(), 2);
743
744        // AddSubscriber가 포함되어 있는지 확인
745        let has_add_subscriber =
746            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
747        assert!(has_add_subscriber, "AddSubscriber should be received");
748    }
749
750    #[test]
751    fn test_mixed_action_types() {
752        let (sender, receiver) =
753            BackpressureChannel::<i32>::pair(10, BackpressurePolicy::BlockOnFull);
754
755        // 다양한 타입의 액션들을 전송
756        sender.send(ActionOp::Action(1)).unwrap();
757        sender.send(ActionOp::AddSubscriber).unwrap();
758        sender.send(ActionOp::Action(2)).unwrap();
759        sender.send(ActionOp::AddSubscriber).unwrap();
760        sender.send(ActionOp::Action(3)).unwrap();
761
762        // 수신 확인
763        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
764        assert_eq!(received_items.len(), 5);
765
766        // 순서 확인
767        match &received_items[0] {
768            ActionOp::Action(value) => assert_eq!(*value, 1),
769            _ => panic!("Expected Action(1)"),
770        }
771        match &received_items[1] {
772            ActionOp::AddSubscriber => {
773                // AddSubscriber 액션
774            }
775            _ => panic!("Expected AddSubscriber"),
776        }
777        match &received_items[2] {
778            ActionOp::Action(value) => assert_eq!(*value, 2),
779            _ => panic!("Expected Action(2)"),
780        }
781        match &received_items[3] {
782            ActionOp::AddSubscriber => {
783                // AddSubscriber 액션
784            }
785            _ => panic!("Expected AddSubscriber"),
786        }
787        match &received_items[4] {
788            ActionOp::Action(value) => assert_eq!(*value, 3),
789            _ => panic!("Expected Action(3)"),
790        }
791    }
792
793    #[test]
794    fn test_block_on_full() {
795        let (sender, receiver) =
796            BackpressureChannel::<i32>::pair(1, BackpressurePolicy::BlockOnFull);
797
798        sender.send(ActionOp::Action(1)).unwrap();
799
800        // Try to send another item - should block or fail
801        let result = sender.try_send(ActionOp::Action(2));
802        assert!(result.is_err(), "Should fail because channel is full");
803
804        // Receive the first item
805        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
806
807        // Now we can send again
808        sender.send(ActionOp::Action(2)).unwrap();
809        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
810    }
811
812    #[test]
813    fn test_drop_oldest_if_predicate_always_false() {
814        let (sender, receiver) = BackpressureChannel::pair(
815            3,
816            BackpressurePolicy::DropOldestIf(Some(Box::new(|_| false))), // Predicate always returns false
817        );
818
819        // Fill the channel to capacity
820        assert!(sender.try_send(ActionOp::Action(1)).is_ok());
821        assert!(sender.try_send(ActionOp::Action(2)).is_ok());
822        assert!(sender.try_send(ActionOp::Action(3)).is_ok());
823        assert_eq!(receiver.len(), 3);
824
825        // Try to send one more item - should fail since predicate is false
826        // and no items can be dropped
827        let result = sender.try_send(ActionOp::Action(4));
828        assert!(
829            result.is_err(),
830            "Should fail because no items match the predicate"
831        );
832
833        // Verify the channel contents are unchanged
834        assert_eq!(receiver.len(), 3);
835        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
836        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
837        assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
838    }
839
840    #[test]
841    fn test_drop_oldest_if_predicate_sometimes_true() {
842        let (sender, receiver) = BackpressureChannel::pair(
843            3,
844            BackpressurePolicy::DropOldestIf(Some(Box::new(|value: &i32| *value < 5))), // Drop values less than 5
845        );
846
847        // Fill the channel to capacity with values that don't match predicate
848        assert!(sender.try_send(ActionOp::Action(6)).is_ok()); // >= 5, not droppable
849        assert!(sender.try_send(ActionOp::Action(2)).is_ok()); // < 5, droppable
850        assert!(sender.try_send(ActionOp::Action(8)).is_ok()); // >= 5, not droppable
851        assert_eq!(receiver.len(), 3);
852
853        // Try to send a value that doesn't match predicate - should fail
854        let result = sender.try_send(ActionOp::Action(9));
855        assert!(
856            result.is_ok(),
857            "Should fail because no items match the predicate"
858        );
859
860        // Now send a value that matches predicate - should fail because no items match predicate
861        let result = sender.try_send(ActionOp::Action(10)); // This should fail because no items < 5
862        assert!(
863            result.is_err(),
864            "Should fail because no items match the predicate"
865        );
866
867        // Verify the channel contents are unchanged
868        assert_eq!(receiver.len(), 3);
869        assert_eq!(receiver.recv(), Some(ActionOp::Action(6)));
870        assert_eq!(receiver.recv(), Some(ActionOp::Action(8)));
871        assert_eq!(receiver.recv(), Some(ActionOp::Action(9)));
872    }
873
874    #[test]
875    fn test_drop_oldest_only_actions() {
876        let (sender, receiver) =
877            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
878
879        // Fill the channel with non-Action items
880        sender.send(ActionOp::AddSubscriber).unwrap();
881        sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
882        assert_eq!(receiver.len(), 2);
883
884        // Try to send an Action - should block because no Actions to drop
885        let result = sender.try_send(ActionOp::Action(1));
886        assert!(
887            result.is_err(),
888            "Should fail because no Actions can be dropped"
889        );
890
891        // Channel should still contain the original non-Action items
892        assert_eq!(receiver.len(), 2);
893        assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
894        assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
895    }
896
897    #[test]
898    fn test_drop_oldest_with_mixed_types() {
899        let (sender, receiver) =
900            BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
901
902        // Fill the channel with mixed types
903        sender.send(ActionOp::Action(1)).unwrap(); // This should be droppable
904        sender.send(ActionOp::AddSubscriber).unwrap(); // This should NOT be droppable
905        sender.send(ActionOp::Action(2)).unwrap(); // This should be droppable
906        assert_eq!(receiver.len(), 3);
907
908        // Send another Action - should drop the first Action(1)
909        sender.send(ActionOp::Action(3)).unwrap();
910        assert_eq!(receiver.len(), 3);
911
912        // Verify contents: AddSubscriber should still be there, Action(1) should be dropped
913        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
914        assert_eq!(received_items.len(), 3);
915
916        // Should contain AddSubscriber, Action(2), and Action(3)
917        let has_add_subscriber =
918            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
919        let has_action_2 = received_items.iter().any(|item| matches!(item, ActionOp::Action(2)));
920        let has_action_3 = received_items.iter().any(|item| matches!(item, ActionOp::Action(3)));
921        let has_action_1 = received_items.iter().any(|item| matches!(item, ActionOp::Action(1)));
922
923        assert!(has_add_subscriber, "AddSubscriber should be preserved");
924        assert!(has_action_2, "Action(2) should be preserved");
925        assert!(has_action_3, "Action(3) should be added");
926        assert!(!has_action_1, "Action(1) should be dropped");
927    }
928
929    #[test]
930    fn test_drop_latest_only_actions() {
931        let (sender, receiver) =
932            BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
933
934        // Fill the channel with non-Action items
935        sender.send(ActionOp::AddSubscriber).unwrap();
936        sender.send(ActionOp::Action(1)).unwrap();
937        sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
938        assert_eq!(receiver.len(), 3);
939
940        // Try to send an Action - should be dropped
941        let result = sender.try_send(ActionOp::Action(2));
942        assert!(result.is_ok(), "Action should be dropped successfully");
943
944        // Try to send an Action - should be dropped
945        let result = sender.try_send(ActionOp::StateFunction);
946        assert!(result.is_ok(), "Action should be dropped successfully");
947
948        // Try to send a non-Action - should fail because channel is full
949        let result = sender.try_send(ActionOp::AddSubscriber);
950        assert!(
951            result.is_err(),
952            "Should fail because channel is full and non-Actions can't be dropped"
953        );
954
955        // Channel should still contain the original non-Action items
956        assert_eq!(receiver.len(), 3);
957        assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
958        assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
959        assert_eq!(receiver.recv(), Some(ActionOp::StateFunction));
960    }
961
962    #[test]
963    fn test_drop_policy_preserves_critical_operations() {
964        // 이 테스트는 중요한 작업(AddSubscriber, Exit)이 drop되지 않는지 확인합니다
965        let (sender, receiver) =
966            BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
967
968        // 채널을 가득 채우기: 2개의 Action과 1개의 AddSubscriber
969        sender.send(ActionOp::Action(1)).unwrap();
970        sender.send(ActionOp::Action(2)).unwrap();
971        sender.send(ActionOp::AddSubscriber).unwrap();
972        assert_eq!(receiver.len(), 3);
973
974        // 새로운 Action을 보내면 기존 Action 중 하나가 drop되어야 함
975        sender.send(ActionOp::Action(3)).unwrap();
976        assert_eq!(receiver.len(), 3);
977
978        // AddSubscriber는 여전히 존재해야 함
979        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
980        let has_add_subscriber =
981            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
982        assert!(
983            has_add_subscriber,
984            "AddSubscriber should never be dropped by drop policy"
985        );
986
987        // Action(1)이 drop되고 Action(2), Action(3)이 남아있어야 함
988        let action_values: Vec<i32> = received_items
989            .iter()
990            .filter_map(|item| {
991                if let ActionOp::Action(val) = item {
992                    Some(*val)
993                } else {
994                    None
995                }
996            })
997            .collect();
998        assert_eq!(action_values.len(), 2, "Should have 2 Actions remaining");
999        assert!(action_values.contains(&2), "Action(2) should be preserved");
1000        assert!(action_values.contains(&3), "Action(3) should be added");
1001        assert!(!action_values.contains(&1), "Action(1) should be dropped");
1002    }
1003
1004    #[test]
1005    fn test_drop_policy_with_exit_operations() {
1006        // Exit 작업이 drop되지 않는지 확인하는 테스트
1007        let (sender, receiver) =
1008            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf(None));
1009
1010        let exit_time = std::time::Instant::now();
1011
1012        // 채널을 가득 채우기: 1개의 Action과 1개의 Exit
1013        sender.send(ActionOp::Action(1)).unwrap();
1014        sender.send(ActionOp::Exit(exit_time)).unwrap();
1015        assert_eq!(receiver.len(), 2);
1016
1017        // 새로운 Action을 보내면 최근 아이템이 drop되어야 함 (Exit은 보존)
1018        let result = sender.send(ActionOp::Action(2));
1019        assert!(result.is_ok(), "Action should be dropped, not Exit");
1020
1021        // Exit은 여전히 존재해야 함
1022        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1023        assert_eq!(received_items.len(), 2);
1024
1025        let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1026        assert!(!has_action_1, "Action(1) should be dropped");
1027
1028        let has_exit = received_items.get(0).unwrap() == &ActionOp::Exit(exit_time);
1029        assert!(has_exit, "Exit should never be dropped by drop policy");
1030
1031        let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
1032        assert!(has_action_2, "Action(2) added");
1033    }
1034
1035    #[test]
1036    fn test_drop_oldest_action_ordering() {
1037        // DropOldest가 Action들 중에서 가장 오래된 것을 drop하는지 확인
1038        let (sender, receiver) =
1039            BackpressureChannel::<i32>::pair(4, BackpressurePolicy::DropOldestIf(None));
1040
1041        // 채널에 순서대로 추가: Action(1), AddSubscriber, Action(2), Action(3)
1042        sender.send(ActionOp::Action(1)).unwrap(); // 가장 오래된 Action
1043        sender.send(ActionOp::AddSubscriber).unwrap();
1044        sender.send(ActionOp::Action(2)).unwrap();
1045        sender.send(ActionOp::Action(3)).unwrap();
1046        assert_eq!(receiver.len(), 4);
1047
1048        // 새로운 Action을 보내면 Action(1)이 drop되어야 함
1049        sender.send(ActionOp::Action(4)).unwrap();
1050        assert_eq!(receiver.len(), 4);
1051
1052        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1053
1054        // AddSubscriber는 보존되어야 함
1055        let has_add_subscriber =
1056            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
1057        assert!(has_add_subscriber, "AddSubscriber should be preserved");
1058
1059        // Action 값들 확인
1060        let action_values: Vec<i32> = received_items
1061            .iter()
1062            .filter_map(|item| {
1063                if let ActionOp::Action(val) = item {
1064                    Some(*val)
1065                } else {
1066                    None
1067                }
1068            })
1069            .collect();
1070
1071        assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
1072        assert!(
1073            !action_values.contains(&1),
1074            "Action(1) should be dropped (oldest)"
1075        );
1076        assert!(action_values.contains(&2), "Action(2) should be preserved");
1077        assert!(action_values.contains(&3), "Action(3) should be preserved");
1078        assert!(action_values.contains(&4), "Action(4) should be added");
1079    }
1080
1081    #[test]
1082    fn test_drop_policy_blocking_behavior() {
1083        // Action이 없을 때 blocking 동작을 확인하는 테스트
1084        let (sender, receiver) =
1085            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldestIf(None));
1086
1087        // 채널을 non-Action items로 가득 채우기
1088        sender.send(ActionOp::AddSubscriber).unwrap();
1089        sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
1090        assert_eq!(receiver.len(), 2);
1091
1092        // try_send로 새로운 Action을 보내려 하면 실패해야 함 (drop할 Action이 없음)
1093        let result = sender.try_send(ActionOp::Action(1));
1094        assert!(
1095            result.is_err(),
1096            "Should fail because no Actions available to drop"
1097        );
1098
1099        // try_send로 새로운 non-Action을 보내려 해도 실패해야 함
1100        let result = sender.try_send(ActionOp::AddSubscriber);
1101        assert!(
1102            result.is_err(),
1103            "Should fail because channel is full and no Actions to drop"
1104        );
1105
1106        // 채널 내용이 변경되지 않았는지 확인
1107        assert_eq!(receiver.len(), 2);
1108        assert_eq!(receiver.recv(), Some(ActionOp::AddSubscriber));
1109        assert!(matches!(receiver.recv(), Some(ActionOp::Exit(_))));
1110    }
1111
1112    #[test]
1113    fn test_drop_oldest_if_predicate_always_true() {
1114        let (sender, receiver) = BackpressureChannel::<i32>::pair(
1115            3,
1116            BackpressurePolicy::DropOldestIf(Some(Box::new(|_| true))),
1117        );
1118
1119        sender.send(ActionOp::Action(1)).unwrap();
1120        sender.send(ActionOp::Action(2)).unwrap();
1121        sender.send(ActionOp::Action(3)).unwrap();
1122
1123        let result = sender.send(ActionOp::Action(4));
1124        assert!(result.is_ok(), "Action should be dropped successfully");
1125
1126        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1127        assert_eq!(received_items.len(), 3);
1128
1129        let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1130        assert!(!has_action_1, "Action(1) should be dropped");
1131
1132        let has_action_2 = received_items.get(0).unwrap() == &ActionOp::Action(2);
1133        assert!(has_action_2, "Action(2) should be preserved");
1134
1135        let has_action_3 = received_items.get(1).unwrap() == &ActionOp::Action(3);
1136        assert!(has_action_3, "Action(3) should be preserved");
1137
1138        let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
1139        assert!(has_action_4, "Action(4) should be added");
1140    }
1141
1142    #[test]
1143    fn test_drop_latest_if_predicate_always_true() {
1144        let (sender, receiver) = BackpressureChannel::<i32>::pair(
1145            3,
1146            BackpressurePolicy::DropLatestIf(Some(Box::new(|_| true))),
1147        );
1148
1149        sender.send(ActionOp::Action(1)).unwrap();
1150        sender.send(ActionOp::Action(2)).unwrap();
1151        sender.send(ActionOp::Action(3)).unwrap(); // should be dropped
1152
1153        let result = sender.send(ActionOp::Action(4));
1154        assert!(result.is_ok(), "Action should be dropped successfully");
1155
1156        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1157        assert_eq!(received_items.len(), 3);
1158
1159        let has_action_1 = received_items.get(0).unwrap() == &ActionOp::Action(1);
1160        assert!(has_action_1, "Action(1) should be preserved");
1161
1162        let has_action_2 = received_items.get(1).unwrap() == &ActionOp::Action(2);
1163        assert!(has_action_2, "Action(2) should be preserved");
1164
1165        let has_action_3 = received_items.get(2).unwrap() == &ActionOp::Action(3);
1166        assert!(!has_action_3, "Action(3) should be dropped");
1167
1168        let has_action_4 = received_items.get(2).unwrap() == &ActionOp::Action(4);
1169        assert!(has_action_4, "Action(4) should be added");
1170    }
1171
1172    #[test]
1173    fn test_drop_latest_vs_drop_oldest_action_selection() {
1174        // DropLatest와 DropOldest가 서로 같은 Action을 선택하는지 확인
1175
1176        // DropOldest 테스트
1177        let (sender_oldest, receiver_oldest) =
1178            BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropOldestIf(None));
1179
1180        sender_oldest.send(ActionOp::Action(10)).unwrap(); // 가장 오래된 Action
1181        sender_oldest.send(ActionOp::AddSubscriber).unwrap();
1182        sender_oldest.send(ActionOp::Action(20)).unwrap(); // 가장 새로운 Action
1183
1184        sender_oldest.send(ActionOp::Action(30)).unwrap(); // Action(10)이 drop되어야 함
1185
1186        let oldest_items: Vec<_> = std::iter::from_fn(|| receiver_oldest.try_recv()).collect();
1187        let oldest_actions: Vec<i32> = oldest_items
1188            .iter()
1189            .filter_map(|item| {
1190                if let ActionOp::Action(val) = item {
1191                    Some(*val)
1192                } else {
1193                    None
1194                }
1195            })
1196            .collect();
1197
1198        assert!(
1199            oldest_actions.contains(&20),
1200            "DropOldest should preserve Action(20)"
1201        );
1202        assert!(
1203            oldest_actions.contains(&30),
1204            "DropOldest should preserve Action(30)"
1205        );
1206
1207        // DropLatest 테스트
1208        let (sender_latest, receiver_latest) =
1209            BackpressureChannel::<i32>::pair(3, BackpressurePolicy::DropLatestIf(None));
1210
1211        sender_latest.send(ActionOp::Action(100)).unwrap(); // 가장 오래된 Action
1212        sender_latest.send(ActionOp::AddSubscriber).unwrap();
1213        sender_latest.send(ActionOp::Action(200)).unwrap(); // should be dropped
1214
1215        // 새로운 Action을 보내면 마지막 drop되어야 함
1216        let result = sender_latest.send(ActionOp::Action(300));
1217        assert!(
1218            result.is_ok(),
1219            "send Action should be success, should drop the latest Action(200)"
1220        );
1221
1222        let latest_items: Vec<_> = std::iter::from_fn(|| receiver_latest.try_recv()).collect();
1223        assert_eq!(latest_items.len(), 3);
1224
1225        let has_action_100 = latest_items.get(0).unwrap() == &ActionOp::Action(100);
1226        assert!(has_action_100, "DropLatest should preserve Action(100)");
1227
1228        assert_eq!(
1229            latest_items.get(1).unwrap(),
1230            &ActionOp::AddSubscriber,
1231            "DropLatest should preserve AddSubscriber"
1232        );
1233
1234        let has_action_200 = latest_items.get(2).unwrap() == &ActionOp::Action(200);
1235        assert!(!has_action_200, "DropLatest should drop Action(200)");
1236
1237        let has_action_300 = latest_items.get(2).unwrap() == &ActionOp::Action(300);
1238        assert!(has_action_300, "DropLatest should add Action(300)");
1239    }
1240
1241    #[test]
1242    fn test_comprehensive_drop_policy_verification() {
1243        // 종합적인 drop policy 검증 테스트
1244        let (sender, receiver) =
1245            BackpressureChannel::<String>::pair(5, BackpressurePolicy::DropOldestIf(None));
1246
1247        // 다양한 타입의 ActionOp를 순서대로 추가
1248        sender.send(ActionOp::Action("action1".to_string())).unwrap();
1249        sender.send(ActionOp::AddSubscriber).unwrap();
1250        sender.send(ActionOp::Action("action2".to_string())).unwrap();
1251        sender.send(ActionOp::Exit(std::time::Instant::now())).unwrap();
1252        sender.send(ActionOp::Action("action3".to_string())).unwrap();
1253        assert_eq!(receiver.len(), 5);
1254
1255        // 채널이 가득 찬 상태에서 새로운 Action 추가
1256        // action1이 drop되어야 함 (가장 오래된 Action)
1257        sender.send(ActionOp::Action("action4".to_string())).unwrap();
1258        assert_eq!(receiver.len(), 5);
1259
1260        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
1261
1262        // 모든 non-Action items는 보존되어야 함
1263        let has_add_subscriber =
1264            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
1265        let has_exit = received_items.iter().any(|item| matches!(item, ActionOp::Exit(_)));
1266        assert!(has_add_subscriber, "AddSubscriber must be preserved");
1267        assert!(has_exit, "Exit must be preserved");
1268
1269        // Action items 검증
1270        let action_values: Vec<String> = received_items
1271            .iter()
1272            .filter_map(|item| {
1273                if let ActionOp::Action(val) = item {
1274                    Some(val.clone())
1275                } else {
1276                    None
1277                }
1278            })
1279            .collect();
1280
1281        assert_eq!(action_values.len(), 3, "Should have 3 Actions remaining");
1282        assert!(
1283            !action_values.contains(&"action1".to_string()),
1284            "action1 should be dropped (oldest Action)"
1285        );
1286        assert!(
1287            action_values.contains(&"action2".to_string()),
1288            "action2 should be preserved"
1289        );
1290        assert!(
1291            action_values.contains(&"action3".to_string()),
1292            "action3 should be preserved"
1293        );
1294        assert!(
1295            action_values.contains(&"action4".to_string()),
1296            "action4 should be added"
1297        );
1298
1299        // 전체 아이템 개수 확인
1300        assert_eq!(received_items.len(), 5, "Total items should remain 5");
1301    }
1302}