remotia_buffer_utils/
pool.rs

1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4
5use crate::BytesMut;
6use remotia_core::traits::{FrameProcessor, PullableFrameProperties};
7use tokio::sync::{
8    mpsc::{self, Receiver, Sender},
9    Mutex,
10};
11
12pub struct BuffersPool<K: Copy> {
13    slot_id: K,
14    buffers_sender: Sender<BytesMut>,
15    buffers_receiver: Arc<Mutex<Receiver<BytesMut>>>,
16}
17
18impl<K: Copy> BuffersPool<K> {
19    pub async fn new(slot_id: K, pool_size: usize, buffer_size: usize) -> Self {
20        let (sender, receiver) = mpsc::channel(pool_size);
21
22        for _ in 0..pool_size {
23            let buf = BytesMut::with_capacity(buffer_size);
24            sender.send(buf).await.unwrap();
25        }
26
27        Self {
28            slot_id,
29            buffers_sender: sender,
30            buffers_receiver: Arc::new(Mutex::new(receiver)),
31        }
32    }
33
34    pub fn borrower(&self) -> BufferBorrower<K> {
35        BufferBorrower {
36            slot_id: self.slot_id,
37            receiver: self.buffers_receiver.clone(),
38            soft: false,
39        }
40    }
41
42    pub fn redeemer(&self) -> BufferRedeemer<K> {
43        BufferRedeemer {
44            slot_id: self.slot_id.clone(),
45            sender: self.buffers_sender.clone(),
46            soft: false,
47        }
48    }
49}
50
51pub struct BufferBorrower<K> {
52    slot_id: K,
53    receiver: Arc<Mutex<Receiver<BytesMut>>>,
54    soft: bool,
55}
56
57impl<K> BufferBorrower<K> {
58    pub fn soft(mut self) -> Self {
59        self.soft = true;
60        self
61    }
62}
63
64#[async_trait]
65impl<F, K> FrameProcessor<F> for BufferBorrower<K>
66where
67    K: Copy + Debug + Send,
68    F: PullableFrameProperties<K, BytesMut> + Send + 'static,
69{
70    async fn process(&mut self, mut frame_data: F) -> Option<F> {
71        log::debug!("Borrowing '{:?}' buffer...", self.slot_id);
72
73        loop {
74            let mut receiver = self.receiver.lock().await;
75            match receiver.try_recv() {
76                Ok(buffer) => {
77                    frame_data.push(self.slot_id, buffer);
78                    break;
79                }
80                Err(err) => {
81                    log::debug!("Unable to borrow '{:?}' buffer: {:?}", self.slot_id, err);
82                    tokio::task::yield_now().await;
83                    if self.soft {
84                        break;
85                    }
86                }
87            }
88        }
89
90        Some(frame_data)
91    }
92}
93
94pub struct BufferRedeemer<K> {
95    slot_id: K,
96    sender: Sender<BytesMut>,
97    soft: bool,
98}
99
100impl<K> BufferRedeemer<K> {
101    pub fn soft(mut self) -> Self {
102        self.soft = true;
103        self
104    }
105}
106
107#[async_trait]
108impl<F, K> FrameProcessor<F> for BufferRedeemer<K>
109where
110    K: Copy + Debug + Send,
111    F: PullableFrameProperties<K, BytesMut> + Send + 'static,
112{
113    async fn process(&mut self, mut frame_data: F) -> Option<F> {
114        log::debug!(
115            "Redeeming '{:?}' buffer (soft = {})...",
116            self.slot_id,
117            self.soft
118        );
119
120        let buffer = frame_data.pull(&self.slot_id);
121
122        match buffer {
123            Some(mut buffer) => {
124                buffer.clear();
125
126                self.sender
127                    .send(buffer)
128                    .await
129                    .expect(&format!("Unable to redeem '{:?}' buffer", self.slot_id));
130
131                if self.soft {
132                    log::debug!("Soft-redeemed a '{:?}' buffer", self.slot_id);
133                }
134            }
135            None => {
136                if !self.soft {
137                    panic!("Missing '{:?}' buffer", self.slot_id);
138                }
139            }
140        }
141
142        log::debug!(
143            "Redeemed '{:?}' buffer (soft = {})",
144            self.slot_id,
145            self.soft
146        );
147
148        Some(frame_data)
149    }
150}