nodedb_bridge/
async_bridge.rs1use std::marker::PhantomData;
26use std::sync::Arc;
27
28use crate::backpressure::{BackpressureController, PressureState};
29use crate::buffer::{Consumer, Producer, RingBuffer};
30use crate::error::{BridgeError, Result};
31use crate::eventfd::WakePair;
32
33pub struct BridgeChannel<Req, Rsp> {
37 pub control: ControlHandle<Req, Rsp>,
39 pub data: DataHandle<Req, Rsp>,
41}
42
43pub struct ControlHandle<Req, Rsp> {
45 pub producer: Producer<Req>,
47 pub consumer: Consumer<Rsp>,
49 pub backpressure: Arc<BackpressureController>,
51 pub req_wake: Arc<WakePair>,
53 pub rsp_wake: Arc<WakePair>,
55}
56
57pub struct DataHandle<Req, Rsp> {
63 pub consumer: Consumer<Req>,
65 pub producer: Producer<Rsp>,
67 pub backpressure: Arc<BackpressureController>,
69 pub req_wake: Arc<WakePair>,
71 pub rsp_wake: Arc<WakePair>,
73}
74
75pub struct PinnedDataHandle<Req, Rsp> {
82 inner: DataHandle<Req, Rsp>,
83 _not_send: PhantomData<*const ()>,
85}
86
87impl<Req, Rsp> BridgeChannel<Req, Rsp> {
88 pub fn new(req_capacity: usize, rsp_capacity: usize) -> std::io::Result<Self> {
93 let (req_producer, req_consumer) = RingBuffer::channel::<Req>(req_capacity);
94 let (rsp_producer, rsp_consumer) = RingBuffer::channel::<Rsp>(rsp_capacity);
95
96 let req_wake = Arc::new(WakePair::new()?);
97 let rsp_wake = Arc::new(WakePair::new()?);
98 let backpressure = Arc::new(BackpressureController::default());
99
100 Ok(Self {
101 control: ControlHandle {
102 producer: req_producer,
103 consumer: rsp_consumer,
104 backpressure: Arc::clone(&backpressure),
105 req_wake: Arc::clone(&req_wake),
106 rsp_wake: Arc::clone(&rsp_wake),
107 },
108 data: DataHandle {
109 consumer: req_consumer,
110 producer: rsp_producer,
111 backpressure,
112 req_wake,
113 rsp_wake,
114 },
115 })
116 }
117}
118
119impl<Req, Rsp> ControlHandle<Req, Rsp> {
120 pub fn try_send_request(&mut self, req: Req) -> Result<()> {
125 let result = self.producer.try_push(req);
126
127 let util = self.producer.utilization();
129 if let Some(new_state) = self.backpressure.update(util) {
130 tracing::info!(
131 utilization = util,
132 state = ?new_state,
133 "bridge backpressure transition"
134 );
135 }
136
137 match &result {
138 Ok(()) => {
139 let _ = self.req_wake.consumer_wake.notify();
141 }
142 Err(BridgeError::Full { .. }) => {
143 }
145 _ => {}
146 }
147
148 result
149 }
150
151 pub fn try_recv_response(&mut self) -> Result<Rsp> {
156 let result = self.consumer.try_pop();
157
158 if result.is_ok() {
159 let _ = self.rsp_wake.producer_wake.notify();
161 }
162
163 result
164 }
165
166 pub fn drain_responses(&mut self, buf: &mut Vec<Rsp>, max: usize) -> usize {
168 let count = self.consumer.drain_into(buf, max);
169 if count > 0 {
170 let _ = self.rsp_wake.producer_wake.notify();
171 }
172 count
173 }
174
175 pub fn pressure(&self) -> PressureState {
177 self.backpressure.state()
178 }
179
180 pub fn response_wake_fd(&self) -> std::os::unix::io::RawFd {
182 self.rsp_wake.consumer_wake.as_fd()
183 }
184
185 pub fn request_space_fd(&self) -> std::os::unix::io::RawFd {
187 self.req_wake.producer_wake.as_fd()
188 }
189}
190
191impl<Req, Rsp> DataHandle<Req, Rsp> {
192 pub fn pin(self) -> PinnedDataHandle<Req, Rsp> {
199 PinnedDataHandle {
200 inner: self,
201 _not_send: PhantomData,
202 }
203 }
204
205 pub fn try_recv_request(&mut self) -> Result<Req> {
207 let result = self.consumer.try_pop();
208 if result.is_ok() {
209 let _ = self.req_wake.producer_wake.notify();
210 }
211 result
212 }
213
214 pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
216 let count = self.consumer.drain_into(buf, max);
217 if count > 0 {
218 let _ = self.req_wake.producer_wake.notify();
219 }
220 count
221 }
222
223 pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
225 let result = self.producer.try_push(rsp);
226 if result.is_ok() {
227 let _ = self.rsp_wake.consumer_wake.notify();
228 }
229 result
230 }
231
232 pub fn pressure(&self) -> PressureState {
234 self.backpressure.state()
235 }
236
237 pub fn should_throttle(&self) -> bool {
239 matches!(
240 self.pressure(),
241 PressureState::Throttled | PressureState::Suspended
242 )
243 }
244
245 pub fn should_suspend(&self) -> bool {
247 self.pressure() == PressureState::Suspended
248 }
249
250 pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
252 self.req_wake.consumer_wake.as_fd()
253 }
254
255 pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
257 self.rsp_wake.producer_wake.as_fd()
258 }
259}
260
261impl<Req, Rsp> PinnedDataHandle<Req, Rsp> {
262 pub fn try_recv_request(&mut self) -> Result<Req> {
264 self.inner.try_recv_request()
265 }
266
267 pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
269 self.inner.drain_requests(buf, max)
270 }
271
272 pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
274 self.inner.try_send_response(rsp)
275 }
276
277 pub fn pressure(&self) -> PressureState {
279 self.inner.pressure()
280 }
281
282 pub fn should_throttle(&self) -> bool {
284 self.inner.should_throttle()
285 }
286
287 pub fn should_suspend(&self) -> bool {
289 self.inner.should_suspend()
290 }
291
292 pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
294 self.inner.request_wake_fd()
295 }
296
297 pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
299 self.inner.response_space_fd()
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn bridge_channel_roundtrip() {
309 let bridge: BridgeChannel<u64, String> = BridgeChannel::new(16, 16).unwrap();
310 let mut control = bridge.control;
311 let mut data = bridge.data;
312
313 control.try_send_request(42).unwrap();
315
316 let req = data.try_recv_request().unwrap();
318 assert_eq!(req, 42);
319
320 data.try_send_response("result".to_string()).unwrap();
322
323 let rsp = control.try_recv_response().unwrap();
325 assert_eq!(rsp, "result");
326 }
327
328 #[test]
329 fn backpressure_updates_on_send() {
330 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
331 let mut control = bridge.control;
332
333 for i in 0..14 {
335 control.try_send_request(i).unwrap();
336 }
337
338 assert_eq!(control.pressure(), PressureState::Throttled);
340 }
341
342 #[test]
343 fn eventfd_wake_on_push() {
344 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
345 let mut control = bridge.control;
346 let data = bridge.data;
347
348 control.try_send_request(1).unwrap();
349
350 let count = data.req_wake.consumer_wake.try_read().unwrap();
352 assert!(count > 0);
353 }
354
355 #[test]
356 fn drain_responses_signals_producer() {
357 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
358 let mut control = bridge.control;
359 let mut data = bridge.data;
360
361 data.try_send_response(10).unwrap();
363 data.try_send_response(20).unwrap();
364 data.try_send_response(30).unwrap();
365
366 let mut buf = Vec::new();
368 let count = control.drain_responses(&mut buf, 10);
369 assert_eq!(count, 3);
370 assert_eq!(buf, vec![10, 20, 30]);
371 }
372
373 #[test]
374 fn data_handle_throttle_queries() {
375 let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
376 let mut control = bridge.control;
377 let data = bridge.data;
378
379 assert!(!data.should_throttle());
380 assert!(!data.should_suspend());
381
382 for i in 0..14 {
384 control.try_send_request(i).unwrap();
385 }
386
387 assert!(data.should_throttle());
388 assert!(!data.should_suspend());
389
390 control.try_send_request(14).unwrap();
392 control.try_send_request(15).unwrap();
393
394 assert!(data.should_throttle());
395 assert!(data.should_suspend());
396 }
397}