nodedb_bridge/
async_bridge.rs1use std::marker::PhantomData;
24use std::sync::Arc;
25
26use crate::backpressure::{BackpressureController, PressureState};
27use crate::buffer::{Consumer, Producer, RingBuffer};
28use crate::error::{BridgeError, Result};
29use crate::eventfd::WakePair;
30
31pub struct BridgeChannel<Req, Rsp> {
35 pub control: ControlHandle<Req, Rsp>,
37 pub data: DataHandle<Req, Rsp>,
39}
40
41pub struct ControlHandle<Req, Rsp> {
43 pub producer: Producer<Req>,
45 pub consumer: Consumer<Rsp>,
47 pub backpressure: Arc<BackpressureController>,
49 pub req_wake: Arc<WakePair>,
51 pub rsp_wake: Arc<WakePair>,
53}
54
55pub struct DataHandle<Req, Rsp> {
61 pub consumer: Consumer<Req>,
63 pub producer: Producer<Rsp>,
65 pub backpressure: Arc<BackpressureController>,
67 pub req_wake: Arc<WakePair>,
69 pub rsp_wake: Arc<WakePair>,
71}
72
73pub struct PinnedDataHandle<Req, Rsp> {
80 inner: DataHandle<Req, Rsp>,
81 _not_send: PhantomData<*const ()>,
83}
84
85impl<Req, Rsp> BridgeChannel<Req, Rsp> {
86 pub fn new(req_capacity: usize, rsp_capacity: usize) -> std::io::Result<Self> {
91 let (req_producer, req_consumer) = RingBuffer::channel::<Req>(req_capacity);
92 let (rsp_producer, rsp_consumer) = RingBuffer::channel::<Rsp>(rsp_capacity);
93
94 let req_wake = Arc::new(WakePair::new()?);
95 let rsp_wake = Arc::new(WakePair::new()?);
96 let backpressure = Arc::new(BackpressureController::default());
97
98 Ok(Self {
99 control: ControlHandle {
100 producer: req_producer,
101 consumer: rsp_consumer,
102 backpressure: Arc::clone(&backpressure),
103 req_wake: Arc::clone(&req_wake),
104 rsp_wake: Arc::clone(&rsp_wake),
105 },
106 data: DataHandle {
107 consumer: req_consumer,
108 producer: rsp_producer,
109 backpressure,
110 req_wake,
111 rsp_wake,
112 },
113 })
114 }
115}
116
117impl<Req, Rsp> ControlHandle<Req, Rsp> {
118 pub fn try_send_request(&mut self, req: Req) -> Result<()> {
123 let result = self.producer.try_push(req);
124
125 let util = self.producer.utilization();
127 if let Some(new_state) = self.backpressure.update(util) {
128 tracing::info!(
129 utilization = util,
130 state = ?new_state,
131 "bridge backpressure transition"
132 );
133 }
134
135 match &result {
136 Ok(()) => {
137 let _ = self.req_wake.consumer_wake.notify();
139 }
140 Err(BridgeError::Full { .. }) => {
141 }
143 _ => {}
144 }
145
146 result
147 }
148
149 pub fn try_recv_response(&mut self) -> Result<Rsp> {
154 let result = self.consumer.try_pop();
155
156 if result.is_ok() {
157 let _ = self.rsp_wake.producer_wake.notify();
159 }
160
161 result
162 }
163
164 pub fn drain_responses(&mut self, buf: &mut Vec<Rsp>, max: usize) -> usize {
166 let count = self.consumer.drain_into(buf, max);
167 if count > 0 {
168 let _ = self.rsp_wake.producer_wake.notify();
169 }
170 count
171 }
172
173 pub fn pressure(&self) -> PressureState {
175 self.backpressure.state()
176 }
177
178 pub fn response_wake_fd(&self) -> std::os::unix::io::RawFd {
180 self.rsp_wake.consumer_wake.as_fd()
181 }
182
183 pub fn request_space_fd(&self) -> std::os::unix::io::RawFd {
185 self.req_wake.producer_wake.as_fd()
186 }
187}
188
189impl<Req, Rsp> DataHandle<Req, Rsp> {
190 pub fn pin(self) -> PinnedDataHandle<Req, Rsp> {
197 PinnedDataHandle {
198 inner: self,
199 _not_send: PhantomData,
200 }
201 }
202
203 pub fn try_recv_request(&mut self) -> Result<Req> {
205 let result = self.consumer.try_pop();
206 if result.is_ok() {
207 let _ = self.req_wake.producer_wake.notify();
208 }
209 result
210 }
211
212 pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
214 let count = self.consumer.drain_into(buf, max);
215 if count > 0 {
216 let _ = self.req_wake.producer_wake.notify();
217 }
218 count
219 }
220
221 pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
223 let result = self.producer.try_push(rsp);
224 if result.is_ok() {
225 let _ = self.rsp_wake.consumer_wake.notify();
226 }
227 result
228 }
229
230 pub fn pressure(&self) -> PressureState {
232 self.backpressure.state()
233 }
234
235 pub fn should_throttle(&self) -> bool {
237 matches!(
238 self.pressure(),
239 PressureState::Throttled | PressureState::Suspended
240 )
241 }
242
243 pub fn should_suspend(&self) -> bool {
245 self.pressure() == PressureState::Suspended
246 }
247
248 pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
250 self.req_wake.consumer_wake.as_fd()
251 }
252
253 pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
255 self.rsp_wake.producer_wake.as_fd()
256 }
257}
258
259impl<Req, Rsp> PinnedDataHandle<Req, Rsp> {
260 pub fn try_recv_request(&mut self) -> Result<Req> {
262 self.inner.try_recv_request()
263 }
264
265 pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
267 self.inner.drain_requests(buf, max)
268 }
269
270 pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
272 self.inner.try_send_response(rsp)
273 }
274
275 pub fn pressure(&self) -> PressureState {
277 self.inner.pressure()
278 }
279
280 pub fn should_throttle(&self) -> bool {
282 self.inner.should_throttle()
283 }
284
285 pub fn should_suspend(&self) -> bool {
287 self.inner.should_suspend()
288 }
289
290 pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
292 self.inner.request_wake_fd()
293 }
294
295 pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
297 self.inner.response_space_fd()
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn bridge_channel_roundtrip() {
307 let bridge: BridgeChannel<u64, String> = BridgeChannel::new(16, 16).unwrap();
308 let mut control = bridge.control;
309 let mut data = bridge.data;
310
311 control.try_send_request(42).unwrap();
313
314 let req = data.try_recv_request().unwrap();
316 assert_eq!(req, 42);
317
318 data.try_send_response("result".to_string()).unwrap();
320
321 let rsp = control.try_recv_response().unwrap();
323 assert_eq!(rsp, "result");
324 }
325
326 #[test]
327 fn backpressure_updates_on_send() {
328 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
329 let mut control = bridge.control;
330
331 for i in 0..14 {
333 control.try_send_request(i).unwrap();
334 }
335
336 assert_eq!(control.pressure(), PressureState::Throttled);
338 }
339
340 #[test]
341 fn eventfd_wake_on_push() {
342 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
343 let mut control = bridge.control;
344 let data = bridge.data;
345
346 control.try_send_request(1).unwrap();
347
348 let count = data.req_wake.consumer_wake.try_read().unwrap();
350 assert!(count > 0);
351 }
352
353 #[test]
354 fn drain_responses_signals_producer() {
355 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
356 let mut control = bridge.control;
357 let mut data = bridge.data;
358
359 data.try_send_response(10).unwrap();
361 data.try_send_response(20).unwrap();
362 data.try_send_response(30).unwrap();
363
364 let mut buf = Vec::new();
366 let count = control.drain_responses(&mut buf, 10);
367 assert_eq!(count, 3);
368 assert_eq!(buf, vec![10, 20, 30]);
369 }
370
371 #[test]
372 fn data_handle_throttle_queries() {
373 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
374 let mut control = bridge.control;
375 let data = bridge.data;
376
377 assert!(!data.should_throttle());
378 assert!(!data.should_suspend());
379
380 for i in 0..14 {
382 control.try_send_request(i).unwrap();
383 }
384
385 assert!(data.should_throttle());
386 assert!(!data.should_suspend());
387
388 control.try_send_request(14).unwrap();
390 control.try_send_request(15).unwrap();
391
392 assert!(data.should_throttle());
393 assert!(data.should_suspend());
394 }
395}