remotia_buffer_utils/
pool.rs1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4
5use crate::BytesMut;
6use remotia_core::traits::{FrameProcessor, PullableFrameProperties};
7use tokio::sync::{
8 mpsc::{self, Receiver, Sender},
9 Mutex,
10};
11
12pub struct BuffersPool<K: Copy> {
13 slot_id: K,
14 buffers_sender: Sender<BytesMut>,
15 buffers_receiver: Arc<Mutex<Receiver<BytesMut>>>,
16}
17
18impl<K: Copy> BuffersPool<K> {
19 pub async fn new(slot_id: K, pool_size: usize, buffer_size: usize) -> Self {
20 let (sender, receiver) = mpsc::channel(pool_size);
21
22 for _ in 0..pool_size {
23 let buf = BytesMut::with_capacity(buffer_size);
24 sender.send(buf).await.unwrap();
25 }
26
27 Self {
28 slot_id,
29 buffers_sender: sender,
30 buffers_receiver: Arc::new(Mutex::new(receiver)),
31 }
32 }
33
34 pub fn borrower(&self) -> BufferBorrower<K> {
35 BufferBorrower {
36 slot_id: self.slot_id,
37 receiver: self.buffers_receiver.clone(),
38 soft: false,
39 }
40 }
41
42 pub fn redeemer(&self) -> BufferRedeemer<K> {
43 BufferRedeemer {
44 slot_id: self.slot_id.clone(),
45 sender: self.buffers_sender.clone(),
46 soft: false,
47 }
48 }
49}
50
51pub struct BufferBorrower<K> {
52 slot_id: K,
53 receiver: Arc<Mutex<Receiver<BytesMut>>>,
54 soft: bool,
55}
56
57impl<K> BufferBorrower<K> {
58 pub fn soft(mut self) -> Self {
59 self.soft = true;
60 self
61 }
62}
63
64#[async_trait]
65impl<F, K> FrameProcessor<F> for BufferBorrower<K>
66where
67 K: Copy + Debug + Send,
68 F: PullableFrameProperties<K, BytesMut> + Send + 'static,
69{
70 async fn process(&mut self, mut frame_data: F) -> Option<F> {
71 log::debug!("Borrowing '{:?}' buffer...", self.slot_id);
72
73 loop {
74 let mut receiver = self.receiver.lock().await;
75 match receiver.try_recv() {
76 Ok(buffer) => {
77 frame_data.push(self.slot_id, buffer);
78 break;
79 }
80 Err(err) => {
81 log::debug!("Unable to borrow '{:?}' buffer: {:?}", self.slot_id, err);
82 tokio::task::yield_now().await;
83 if self.soft {
84 break;
85 }
86 }
87 }
88 }
89
90 Some(frame_data)
91 }
92}
93
94pub struct BufferRedeemer<K> {
95 slot_id: K,
96 sender: Sender<BytesMut>,
97 soft: bool,
98}
99
100impl<K> BufferRedeemer<K> {
101 pub fn soft(mut self) -> Self {
102 self.soft = true;
103 self
104 }
105}
106
107#[async_trait]
108impl<F, K> FrameProcessor<F> for BufferRedeemer<K>
109where
110 K: Copy + Debug + Send,
111 F: PullableFrameProperties<K, BytesMut> + Send + 'static,
112{
113 async fn process(&mut self, mut frame_data: F) -> Option<F> {
114 log::debug!(
115 "Redeeming '{:?}' buffer (soft = {})...",
116 self.slot_id,
117 self.soft
118 );
119
120 let buffer = frame_data.pull(&self.slot_id);
121
122 match buffer {
123 Some(mut buffer) => {
124 buffer.clear();
125
126 self.sender
127 .send(buffer)
128 .await
129 .expect(&format!("Unable to redeem '{:?}' buffer", self.slot_id));
130
131 if self.soft {
132 log::debug!("Soft-redeemed a '{:?}' buffer", self.slot_id);
133 }
134 }
135 None => {
136 if !self.soft {
137 panic!("Missing '{:?}' buffer", self.slot_id);
138 }
139 }
140 }
141
142 log::debug!(
143 "Redeemed '{:?}' buffer (soft = {})",
144 self.slot_id,
145 self.soft
146 );
147
148 Some(frame_data)
149 }
150}