rs_store/
channel.rs

1use crate::metrics::Metrics;
2use crate::ActionOp;
3use std::collections::VecDeque;
4use std::marker::PhantomData;
5use std::sync::{Arc, Condvar, Mutex};
6
7/// the Backpressure policy
8#[derive(Clone, Default)]
9pub enum BackpressurePolicy<T>
10where
11    T: Send + Sync + Clone + 'static,
12{
13    /// Block the sender when the queue is full
14    #[default]
15    BlockOnFull,
16    /// Drop the oldest item when the queue is full
17    DropOldest,
18    /// Drop the latest item when the queue is full
19    DropLatest,
20    /// Drop items based on predicate when the queue is full, it drops from the latest
21    DropLatestIf {
22        predicate: Arc<dyn Fn(&ActionOp<T>) -> bool + Send + Sync>,
23    },
24    /// Drop items based on predicate when the queue is full, it drops from the oldest
25    DropOldestIf {
26        predicate: Arc<dyn Fn(&ActionOp<T>) -> bool + Send + Sync>,
27    },
28}
29
30#[derive(thiserror::Error, Debug)]
31pub(crate) enum SenderError<T> {
32    #[error("Failed to send: {0}")]
33    SendError(T),
34    #[error("Channel is closed")]
35    ChannelClosed,
36}
37
38/// Internal MPSC Queue implementation
39struct MpscQueue<T>
40where
41    T: Send + Sync + Clone + 'static,
42{
43    queue: Mutex<VecDeque<ActionOp<T>>>,
44    condvar: Condvar,
45    capacity: usize,
46    policy: BackpressurePolicy<T>,
47    metrics: Option<Arc<dyn Metrics + Send + Sync>>,
48    closed: Mutex<bool>,
49}
50
51impl<T> MpscQueue<T>
52where
53    T: Send + Sync + Clone + 'static,
54{
55    fn new(
56        capacity: usize,
57        policy: BackpressurePolicy<T>,
58        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
59    ) -> Self {
60        Self {
61            queue: Mutex::new(VecDeque::new()),
62            condvar: Condvar::new(),
63            capacity,
64            policy,
65            metrics,
66            closed: Mutex::new(false),
67        }
68    }
69
70    fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
71        let mut queue = self.queue.lock().unwrap();
72
73        // Check if channel is closed
74        if *self.closed.lock().unwrap() {
75            return Err(SenderError::ChannelClosed);
76        }
77
78        if queue.len() >= self.capacity {
79            match &self.policy {
80                BackpressurePolicy::BlockOnFull => {
81                    // Wait until space is available
82                    while queue.len() >= self.capacity {
83                        queue = self.condvar.wait(queue).unwrap();
84                        if *self.closed.lock().unwrap() {
85                            return Err(SenderError::ChannelClosed);
86                        }
87                    }
88                    queue.push_back(item);
89                }
90                BackpressurePolicy::DropOldest => {
91                    // Drop the oldest item
92                    if let Some(dropped_item) = queue.pop_front() {
93                        if let Some(metrics) = &self.metrics {
94                            if let ActionOp::Action(action) = &dropped_item {
95                                metrics.action_dropped(Some(action as &dyn std::any::Any));
96                            }
97                        }
98                    }
99                    queue.push_back(item);
100                }
101                BackpressurePolicy::DropLatest => {
102                    // Drop the new item
103                    if let Some(metrics) = &self.metrics {
104                        if let ActionOp::Action(action) = &item {
105                            metrics.action_dropped(Some(action as &dyn std::any::Any));
106                        }
107                    }
108                    return Ok(queue.len() as i64);
109                }
110                BackpressurePolicy::DropLatestIf { predicate } => {
111                    // Find and drop items that match the predicate
112                    let mut dropped_count = 0;
113                    let mut i = 0;
114                    while i < queue.len() {
115                        if predicate(&queue[i]) {
116                            if let Some(dropped_item) = queue.remove(i) {
117                                dropped_count += 1;
118                                if let Some(metrics) = &self.metrics {
119                                    if let ActionOp::Action(action) = &dropped_item {
120                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
121                                    }
122                                }
123                                break;
124                            }
125                        }
126                        i += 1;
127                    }
128
129                    if dropped_count > 0 {
130                        queue.push_back(item);
131                    } else {
132                        return Err(SenderError::SendError(item));
133                    }
134                }
135                BackpressurePolicy::DropOldestIf { predicate } => {
136                    // Find and drop items that match the predicate
137                    let mut dropped_count = 0;
138                    let mut i = 0;
139                    while i < queue.len() {
140                        let index = queue.len() - i - 1;
141                        if predicate(&queue[index]) {
142                            if let Some(dropped_item) = queue.remove(index) {
143                                dropped_count += 1;
144                                if let Some(metrics) = &self.metrics {
145                                    if let ActionOp::Action(action) = &dropped_item {
146                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
147                                    }
148                                }
149                                break;
150                            }
151                        }
152                        i += 1;
153                    }
154
155                    if dropped_count > 0 {
156                        queue.push_back(item);
157                    } else {
158                        return Err(SenderError::SendError(item));
159                    }
160                }
161            }
162        } else {
163            queue.push_back(item);
164        }
165
166        // Update metrics
167        if let Some(metrics) = &self.metrics {
168            metrics.queue_size(queue.len());
169        }
170
171        self.condvar.notify_one();
172        Ok(queue.len() as i64)
173    }
174
175    fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
176        // Check if channel is closed
177        if *self.closed.lock().unwrap() {
178            return Err(SenderError::ChannelClosed);
179        }
180
181        let mut queue = self.queue.lock().unwrap();
182
183        if queue.len() >= self.capacity {
184            match &self.policy {
185                BackpressurePolicy::BlockOnFull => {
186                    return Err(SenderError::SendError(item));
187                }
188                BackpressurePolicy::DropOldest => {
189                    // Drop the oldest item
190                    if let Some(dropped_item) = queue.pop_front() {
191                        if let Some(metrics) = &self.metrics {
192                            if let ActionOp::Action(action) = &dropped_item {
193                                metrics.action_dropped(Some(action as &dyn std::any::Any));
194                            }
195                        }
196                    }
197                    queue.push_back(item);
198                }
199                BackpressurePolicy::DropLatest => {
200                    // Drop the new item
201                    if let Some(metrics) = &self.metrics {
202                        if let ActionOp::Action(action) = &item {
203                            metrics.action_dropped(Some(action as &dyn std::any::Any));
204                        }
205                    }
206                    return Ok(queue.len() as i64);
207                }
208                BackpressurePolicy::DropLatestIf { predicate } => {
209                    // Find and drop items that match the predicate
210                    let mut dropped_count = 0;
211                    let mut i = 0;
212                    while i < queue.len() {
213                        if predicate(&queue[i]) {
214                            if let Some(dropped_item) = queue.remove(i) {
215                                dropped_count += 1;
216                                if let Some(metrics) = &self.metrics {
217                                    if let ActionOp::Action(action) = &dropped_item {
218                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
219                                    }
220                                }
221                                break;
222                            }
223                        }
224                        i += 1;
225                    }
226
227                    if dropped_count > 0 {
228                        queue.push_back(item);
229                    } else {
230                        return Err(SenderError::SendError(item));
231                    }
232                }
233                BackpressurePolicy::DropOldestIf { predicate } => {
234                    // Find and drop items that match the predicate
235                    let mut dropped_count = 0;
236                    let mut i = 0;
237                    while i < queue.len() {
238                        let index = queue.len() - i - 1;
239                        if predicate(&queue[index]) {
240                            if let Some(dropped_item) = queue.remove(index) {
241                                dropped_count += 1;
242                                if let Some(metrics) = &self.metrics {
243                                    if let ActionOp::Action(action) = &dropped_item {
244                                        metrics.action_dropped(Some(action as &dyn std::any::Any));
245                                    }
246                                }
247                                break;
248                            }
249                        }
250                        i += 1;
251                    }
252
253                    if dropped_count > 0 {
254                        queue.push_back(item);
255                    } else {
256                        return Err(SenderError::SendError(item));
257                    }
258                }
259            }
260        } else {
261            queue.push_back(item);
262        }
263
264        // Update metrics
265        if let Some(metrics) = &self.metrics {
266            metrics.queue_size(queue.len());
267        }
268
269        self.condvar.notify_one();
270        Ok(queue.len() as i64)
271    }
272
273    fn recv(&self) -> Option<ActionOp<T>> {
274        let mut queue = self.queue.lock().unwrap();
275
276        // Wait until there's an item or channel is closed
277        while queue.is_empty() {
278            if *self.closed.lock().unwrap() {
279                return None;
280            }
281            queue = self.condvar.wait(queue).unwrap();
282        }
283
284        let item = queue.pop_front();
285        self.condvar.notify_one();
286        item
287    }
288
289    fn try_recv(&self) -> Option<ActionOp<T>> {
290        let mut queue = self.queue.lock().unwrap();
291        let item = queue.pop_front();
292        if item.is_some() {
293            self.condvar.notify_one();
294        }
295        item
296    }
297
298    fn len(&self) -> usize {
299        self.queue.lock().unwrap().len()
300    }
301
302    fn close(&self) {
303        *self.closed.lock().unwrap() = true;
304        self.condvar.notify_all();
305    }
306}
307
308/// Channel to hold the sender with backpressure policy
309#[derive(Clone)]
310pub(crate) struct SenderChannel<T>
311where
312    T: Send + Sync + Clone + 'static,
313{
314    _name: String,
315    queue: Arc<MpscQueue<T>>,
316}
317
318impl<Action> Drop for SenderChannel<Action>
319where
320    Action: Send + Sync + Clone + 'static,
321{
322    fn drop(&mut self) {
323        #[cfg(feature = "store-log")]
324        eprintln!("store: drop '{}' sender channel", self._name);
325    }
326}
327
328#[allow(dead_code)]
329impl<T> SenderChannel<T>
330where
331    T: Send + Sync + Clone + 'static,
332{
333    pub fn send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
334        self.queue.send(item)
335    }
336
337    pub fn try_send(&self, item: ActionOp<T>) -> Result<i64, SenderError<ActionOp<T>>> {
338        self.queue.try_send(item)
339    }
340}
341
342#[allow(dead_code)]
343pub(crate) struct ReceiverChannel<T>
344where
345    T: Send + Sync + Clone + 'static,
346{
347    name: String,
348    queue: Arc<MpscQueue<T>>,
349    metrics: Option<Arc<dyn Metrics + Send + Sync>>,
350}
351
352impl<Action> Drop for ReceiverChannel<Action>
353where
354    Action: Send + Sync + Clone + 'static,
355{
356    fn drop(&mut self) {
357        #[cfg(feature = "store-log")]
358        eprintln!("store: drop '{}' receiver channel", self.name);
359        self.close();
360    }
361}
362
363#[allow(dead_code)]
364impl<T> ReceiverChannel<T>
365where
366    T: Send + Sync + Clone + 'static,
367{
368    pub fn recv(&self) -> Option<ActionOp<T>> {
369        self.queue.recv()
370    }
371
372    #[allow(dead_code)]
373    pub fn try_recv(&self) -> Option<ActionOp<T>> {
374        self.queue.try_recv()
375    }
376
377    pub fn len(&self) -> usize {
378        self.queue.len()
379    }
380
381    pub fn close(&self) {
382        self.queue.close();
383    }
384}
385
386/// Channel with back pressure
387pub(crate) struct BackpressureChannel<MSG>
388where
389    MSG: Send + Sync + Clone + 'static,
390{
391    phantom_data: PhantomData<MSG>,
392}
393
394impl<MSG> BackpressureChannel<MSG>
395where
396    MSG: Send + Sync + Clone + 'static,
397{
398    #[allow(dead_code)]
399    pub fn pair(
400        capacity: usize,
401        policy: BackpressurePolicy<MSG>,
402    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
403        Self::pair_with("<anon>", capacity, policy, None)
404    }
405
406    #[allow(dead_code)]
407    pub fn pair_with_metrics(
408        capacity: usize,
409        policy: BackpressurePolicy<MSG>,
410        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
411    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
412        Self::pair_with("<anon>", capacity, policy, metrics)
413    }
414
415    #[allow(dead_code)]
416    pub fn pair_with(
417        name: &str,
418        capacity: usize,
419        policy: BackpressurePolicy<MSG>,
420        metrics: Option<Arc<dyn Metrics + Send + Sync>>,
421    ) -> (SenderChannel<MSG>, ReceiverChannel<MSG>) {
422        let queue = Arc::new(MpscQueue::new(capacity, policy, metrics.clone()));
423
424        (
425            SenderChannel {
426                _name: name.to_string(),
427                queue: queue.clone(),
428            },
429            ReceiverChannel {
430                name: name.to_string(),
431                queue,
432                metrics,
433            },
434        )
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441
442    #[test]
443    fn test_basic_send_recv() {
444        let (sender, receiver) =
445            BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
446
447        sender.send(ActionOp::Action(1)).unwrap();
448        sender.send(ActionOp::Action(2)).unwrap();
449
450        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
451        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
452        assert_eq!(receiver.try_recv(), None);
453    }
454
455    #[test]
456    fn test_drop_oldest() {
457        let (sender, receiver) =
458            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropOldest);
459
460        sender.send(ActionOp::Action(1)).unwrap();
461        sender.send(ActionOp::Action(2)).unwrap();
462        sender.send(ActionOp::Action(3)).unwrap(); // Should drop 1
463
464        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
465        assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
466        assert_eq!(receiver.try_recv(), None);
467    }
468
469    #[test]
470    fn test_drop_latest() {
471        let (sender, receiver) =
472            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatest);
473
474        sender.send(ActionOp::Action(1)).unwrap();
475        sender.send(ActionOp::Action(2)).unwrap();
476        sender.send(ActionOp::Action(3)).unwrap(); // Should drop 3
477
478        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
479        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
480        assert_eq!(receiver.try_recv(), None);
481    }
482
483    #[test]
484    fn test_predicate_dropping() {
485        // predicate: 5보다 작은 값들은 drop
486        let predicate = Arc::new(|action_op: &ActionOp<i32>| match action_op {
487            ActionOp::Action(value) => *value < 5,
488            ActionOp::Exit(_) => false,
489            ActionOp::AddSubscriber => false,
490        });
491
492        let (sender, receiver) =
493            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf { predicate });
494
495        // 채널을 가득 채우기
496        sender.send(ActionOp::Action(1)).unwrap(); // 첫 번째 아이템 (drop 대상)
497        sender.send(ActionOp::Action(6)).unwrap(); // 두 번째 아이템 (유지 대상)
498
499        // 세 번째 아이템을 보내면 채널이 가득 차서 predicate가 적용됨
500        let result = sender.send(ActionOp::Action(7)); // 세 번째 아이템 (유지 대상)
501        assert!(
502            result.is_ok(),
503            "Should succeed because predicate should drop the first item"
504        );
505
506        // 소비자에서 아이템 확인
507        let received_item = receiver.recv();
508        assert!(received_item.is_some());
509        if let Some(ActionOp::Action(value)) = received_item {
510            // predicate에 의해 1이 drop되고 6이 유지되어야 함
511            assert_eq!(value, 6, "Should receive 6, not 1");
512        }
513
514        let received_item = receiver.recv();
515        assert!(received_item.is_some());
516        if let Some(ActionOp::Action(value)) = received_item {
517            assert_eq!(value, 7, "Should receive 7");
518        }
519    }
520
521    #[test]
522    fn test_add_subscriber_action() {
523        let (sender, receiver) =
524            BackpressureChannel::<i32>::pair(5, BackpressurePolicy::BlockOnFull);
525
526        // AddSubscriber 액션 전송
527        sender.send(ActionOp::AddSubscriber).unwrap();
528
529        // 수신 확인
530        let received = receiver.recv();
531        assert!(received.is_some());
532        match received.unwrap() {
533            ActionOp::AddSubscriber => {
534                // AddSubscriber 액션이 정상적으로 수신됨
535            }
536            _ => panic!("Expected AddSubscriber action"),
537        }
538    }
539
540    #[test]
541    fn test_add_subscriber_with_predicate() {
542        // AddSubscriber는 절대 drop되지 않도록 하는 predicate
543        let predicate = Arc::new(|action_op: &ActionOp<i32>| match action_op {
544            ActionOp::Action(value) => *value < 5,
545            ActionOp::Exit(_) => false,
546            ActionOp::AddSubscriber => false, // AddSubscriber는 절대 drop하지 않음
547        });
548
549        let (sender, receiver) =
550            BackpressureChannel::<i32>::pair(2, BackpressurePolicy::DropLatestIf { predicate });
551
552        // 채널을 가득 채우기
553        sender.send(ActionOp::Action(1)).unwrap(); // drop 대상
554        sender.send(ActionOp::Action(6)).unwrap(); // 유지 대상
555
556        // AddSubscriber 액션을 보내면 predicate에 의해 다른 액션이 drop되어야 함
557        let result = sender.send(ActionOp::AddSubscriber);
558        assert!(result.is_ok(), "AddSubscriber should be sent successfully");
559
560        // 수신 확인
561        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
562        assert_eq!(received_items.len(), 2);
563
564        // AddSubscriber가 포함되어 있는지 확인
565        let has_add_subscriber =
566            received_items.iter().any(|item| matches!(item, ActionOp::AddSubscriber));
567        assert!(has_add_subscriber, "AddSubscriber should be received");
568    }
569
570    #[test]
571    fn test_mixed_action_types() {
572        let (sender, receiver) =
573            BackpressureChannel::<i32>::pair(10, BackpressurePolicy::BlockOnFull);
574
575        // 다양한 타입의 액션들을 전송
576        sender.send(ActionOp::Action(1)).unwrap();
577        sender.send(ActionOp::AddSubscriber).unwrap();
578        sender.send(ActionOp::Action(2)).unwrap();
579        sender.send(ActionOp::AddSubscriber).unwrap();
580        sender.send(ActionOp::Action(3)).unwrap();
581
582        // 수신 확인
583        let received_items: Vec<_> = std::iter::from_fn(|| receiver.try_recv()).collect();
584        assert_eq!(received_items.len(), 5);
585
586        // 순서 확인
587        match &received_items[0] {
588            ActionOp::Action(value) => assert_eq!(*value, 1),
589            _ => panic!("Expected Action(1)"),
590        }
591        match &received_items[1] {
592            ActionOp::AddSubscriber => {
593                // AddSubscriber 액션
594            }
595            _ => panic!("Expected AddSubscriber"),
596        }
597        match &received_items[2] {
598            ActionOp::Action(value) => assert_eq!(*value, 2),
599            _ => panic!("Expected Action(2)"),
600        }
601        match &received_items[3] {
602            ActionOp::AddSubscriber => {
603                // AddSubscriber 액션
604            }
605            _ => panic!("Expected AddSubscriber"),
606        }
607        match &received_items[4] {
608            ActionOp::Action(value) => assert_eq!(*value, 3),
609            _ => panic!("Expected Action(3)"),
610        }
611    }
612
613    #[test]
614    fn test_block_on_full() {
615        let (sender, receiver) =
616            BackpressureChannel::<i32>::pair(1, BackpressurePolicy::BlockOnFull);
617
618        sender.send(ActionOp::Action(1)).unwrap();
619
620        // Try to send another item - should block or fail
621        let result = sender.try_send(ActionOp::Action(2));
622        assert!(result.is_err(), "Should fail because channel is full");
623
624        // Receive the first item
625        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
626
627        // Now we can send again
628        sender.send(ActionOp::Action(2)).unwrap();
629        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
630    }
631
632    #[test]
633    fn test_drop_oldest_if_predicate_always_false() {
634        let (sender, receiver) = BackpressureChannel::pair(
635            3,
636            BackpressurePolicy::DropOldestIf {
637                predicate: Arc::new(|_| false), // Predicate always returns false
638            },
639        );
640
641        // Fill the channel to capacity
642        assert!(sender.try_send(ActionOp::Action(1)).is_ok());
643        assert!(sender.try_send(ActionOp::Action(2)).is_ok());
644        assert!(sender.try_send(ActionOp::Action(3)).is_ok());
645        assert_eq!(receiver.len(), 3);
646
647        // Try to send one more item - should fail since predicate is false
648        // and no items can be dropped
649        let result = sender.try_send(ActionOp::Action(4));
650        assert!(
651            result.is_err(),
652            "Should fail because no items match the predicate"
653        );
654
655        // Verify the channel contents are unchanged
656        assert_eq!(receiver.len(), 3);
657        assert_eq!(receiver.recv(), Some(ActionOp::Action(1)));
658        assert_eq!(receiver.recv(), Some(ActionOp::Action(2)));
659        assert_eq!(receiver.recv(), Some(ActionOp::Action(3)));
660    }
661
662    #[test]
663    fn test_drop_oldest_if_predicate_sometimes_true() {
664        let (sender, receiver) = BackpressureChannel::pair(
665            3,
666            BackpressurePolicy::DropOldestIf {
667                predicate: Arc::new(|action_op: &ActionOp<i32>| {
668                    if let ActionOp::Action(value) = action_op {
669                        *value < 5 // Drop values less than 5
670                    } else {
671                        false
672                    }
673                }),
674            },
675        );
676
677        // Fill the channel to capacity with values that don't match predicate
678        assert!(sender.try_send(ActionOp::Action(6)).is_ok()); // >= 5, not droppable
679        assert!(sender.try_send(ActionOp::Action(2)).is_ok()); // < 5, droppable
680        assert!(sender.try_send(ActionOp::Action(8)).is_ok()); // >= 5, not droppable
681        assert_eq!(receiver.len(), 3);
682
683        // Try to send a value that doesn't match predicate - should fail
684        let result = sender.try_send(ActionOp::Action(9));
685        assert!(
686            result.is_ok(),
687            "Should fail because no items match the predicate"
688        );
689
690        // Now send a value that matches predicate - should fail because no items match predicate
691        let result = sender.try_send(ActionOp::Action(10)); // This should fail because no items < 5
692        assert!(
693            result.is_err(),
694            "Should fail because no items match the predicate"
695        );
696
697        // Verify the channel contents are unchanged
698        assert_eq!(receiver.len(), 3);
699        assert_eq!(receiver.recv(), Some(ActionOp::Action(6)));
700        assert_eq!(receiver.recv(), Some(ActionOp::Action(8)));
701        assert_eq!(receiver.recv(), Some(ActionOp::Action(9)));
702    }
703}