Skip to main content

nodedb_bridge/
async_bridge.rs

1//! Async wrappers for the SPSC bridge.
2//!
3//! These integrate the raw `Producer`/`Consumer` with Tokio's async runtime
4//! and the eventfd-based waker, providing `async fn push()` and `async fn pop()`.
5//!
6//! ## Architecture
7//!
8//! ```text
9//! Tokio (Control Plane)             TPC (Data Plane)
10//! ┌─────────────────────┐           ┌─────────────────────┐
11//! │  AsyncProducer      │           │  SyncConsumer        │
12//! │  .push(req).await   │──push──→  │  .poll_drain()       │
13//! │                     │           │                     │
14//! │  AsyncReceiver      │           │  SyncSender          │
15//! │  .recv(rsp).await   │←──pop──── │  .try_send(rsp)      │
16//! └─────────────────────┘           └─────────────────────┘
17//! ```
18//!
19//! The TPC side uses `SyncConsumer`/`SyncSender` (non-async, !Send-compatible).
20//! The Tokio side uses `AsyncProducer`/`AsyncReceiver` which wrap `try_push`/`try_pop`
21//! with eventfd-based waking.
22
23use std::marker::PhantomData;
24use std::sync::Arc;
25
26use crate::backpressure::{BackpressureController, PressureState};
27use crate::buffer::{Consumer, Producer, RingBuffer};
28use crate::error::{BridgeError, Result};
29use crate::eventfd::WakePair;
30
31/// A complete bridge channel: request path (Control→Data) + response path (Data→Control).
32///
33/// Created once, then split into Tokio-side and TPC-side handles.
34pub struct BridgeChannel<Req, Rsp> {
35    /// Tokio-side handle for sending requests and receiving responses.
36    pub control: ControlHandle<Req, Rsp>,
37    /// TPC-side handle for receiving requests and sending responses.
38    pub data: DataHandle<Req, Rsp>,
39}
40
41/// Handle held by the Control Plane (Tokio).
42pub struct ControlHandle<Req, Rsp> {
43    /// Send requests to the Data Plane.
44    pub producer: Producer<Req>,
45    /// Receive responses from the Data Plane.
46    pub consumer: Consumer<Rsp>,
47    /// Backpressure state for the request queue.
48    pub backpressure: Arc<BackpressureController>,
49    /// Wakers for the request channel.
50    pub req_wake: Arc<WakePair>,
51    /// Wakers for the response channel.
52    pub rsp_wake: Arc<WakePair>,
53}
54
55/// Handle held by the Data Plane (TPC core).
56///
57/// This type is `Send` so it can be transferred to a TPC core during setup.
58/// Call `.pin()` once on the target core to get a `PinnedDataHandle` which is
59/// `!Send` — enforcing at compile time that it stays on that core forever.
60pub struct DataHandle<Req, Rsp> {
61    /// Receive requests from the Control Plane.
62    pub consumer: Consumer<Req>,
63    /// Send responses back to the Control Plane.
64    pub producer: Producer<Rsp>,
65    /// Backpressure state for the request queue (read-only from Data Plane).
66    pub backpressure: Arc<BackpressureController>,
67    /// Wakers for the request channel.
68    pub req_wake: Arc<WakePair>,
69    /// Wakers for the response channel.
70    pub rsp_wake: Arc<WakePair>,
71}
72
73/// A pinned Data Plane handle that is `!Send`.
74///
75/// Created by calling `DataHandle::pin()` on the target TPC core.
76/// Once pinned, this handle cannot be moved to another thread — the compiler
77/// enforces this. Any attempt to `tokio::spawn` or `thread::spawn` with a
78/// `PinnedDataHandle` is a compile error.
79pub struct PinnedDataHandle<Req, Rsp> {
80    inner: DataHandle<Req, Rsp>,
81    /// Makes this type `!Send`.
82    _not_send: PhantomData<*const ()>,
83}
84
85impl<Req, Rsp> BridgeChannel<Req, Rsp> {
86    /// Create a new bridge channel pair.
87    ///
88    /// `req_capacity`: Size of the Control→Data request queue.
89    /// `rsp_capacity`: Size of the Data→Control response queue.
90    pub fn new(req_capacity: usize, rsp_capacity: usize) -> std::io::Result<Self> {
91        let (req_producer, req_consumer) = RingBuffer::channel::<Req>(req_capacity);
92        let (rsp_producer, rsp_consumer) = RingBuffer::channel::<Rsp>(rsp_capacity);
93
94        let req_wake = Arc::new(WakePair::new()?);
95        let rsp_wake = Arc::new(WakePair::new()?);
96        let backpressure = Arc::new(BackpressureController::default());
97
98        Ok(Self {
99            control: ControlHandle {
100                producer: req_producer,
101                consumer: rsp_consumer,
102                backpressure: Arc::clone(&backpressure),
103                req_wake: Arc::clone(&req_wake),
104                rsp_wake: Arc::clone(&rsp_wake),
105            },
106            data: DataHandle {
107                consumer: req_consumer,
108                producer: rsp_producer,
109                backpressure,
110                req_wake,
111                rsp_wake,
112            },
113        })
114    }
115}
116
117impl<Req, Rsp> ControlHandle<Req, Rsp> {
118    /// Try to send a request to the Data Plane.
119    ///
120    /// On success, signals the consumer wake eventfd so the TPC core wakes up.
121    /// Updates backpressure state based on queue utilization.
122    pub fn try_send_request(&mut self, req: Req) -> Result<()> {
123        let result = self.producer.try_push(req);
124
125        // Update backpressure state.
126        let util = self.producer.utilization();
127        if let Some(new_state) = self.backpressure.update(util) {
128            tracing::info!(
129                utilization = util,
130                state = ?new_state,
131                "bridge backpressure transition"
132            );
133        }
134
135        match &result {
136            Ok(()) => {
137                // Signal consumer that data is available.
138                let _ = self.req_wake.consumer_wake.notify();
139            }
140            Err(BridgeError::Full { .. }) => {
141                // Don't signal — queue is full, consumer already has plenty.
142            }
143            _ => {}
144        }
145
146        result
147    }
148
149    /// Try to receive a response from the Data Plane.
150    ///
151    /// On success, signals the producer wake eventfd so the TPC core knows
152    /// there's space for more responses.
153    pub fn try_recv_response(&mut self) -> Result<Rsp> {
154        let result = self.consumer.try_pop();
155
156        if result.is_ok() {
157            // Signal the Data Plane that response queue has space.
158            let _ = self.rsp_wake.producer_wake.notify();
159        }
160
161        result
162    }
163
164    /// Drain up to `max` responses into the buffer.
165    pub fn drain_responses(&mut self, buf: &mut Vec<Rsp>, max: usize) -> usize {
166        let count = self.consumer.drain_into(buf, max);
167        if count > 0 {
168            let _ = self.rsp_wake.producer_wake.notify();
169        }
170        count
171    }
172
173    /// Current backpressure state.
174    pub fn pressure(&self) -> PressureState {
175        self.backpressure.state()
176    }
177
178    /// Raw fd for the response-available signal (register with Tokio's AsyncFd).
179    pub fn response_wake_fd(&self) -> std::os::unix::io::RawFd {
180        self.rsp_wake.consumer_wake.as_fd()
181    }
182
183    /// Raw fd for the request-space-available signal.
184    pub fn request_space_fd(&self) -> std::os::unix::io::RawFd {
185        self.req_wake.producer_wake.as_fd()
186    }
187}
188
189impl<Req, Rsp> DataHandle<Req, Rsp> {
190    /// Pin this handle to the current TPC core.
191    ///
192    /// Returns a `PinnedDataHandle` which is `!Send` — the compiler will
193    /// reject any attempt to move it to another thread.
194    ///
195    /// Call this exactly once, on the target TPC core, during setup.
196    pub fn pin(self) -> PinnedDataHandle<Req, Rsp> {
197        PinnedDataHandle {
198            inner: self,
199            _not_send: PhantomData,
200        }
201    }
202
203    /// Try to receive a request from the Control Plane.
204    pub fn try_recv_request(&mut self) -> Result<Req> {
205        let result = self.consumer.try_pop();
206        if result.is_ok() {
207            let _ = self.req_wake.producer_wake.notify();
208        }
209        result
210    }
211
212    /// Drain up to `max` requests into the buffer.
213    pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
214        let count = self.consumer.drain_into(buf, max);
215        if count > 0 {
216            let _ = self.req_wake.producer_wake.notify();
217        }
218        count
219    }
220
221    /// Try to send a response back to the Control Plane.
222    pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
223        let result = self.producer.try_push(rsp);
224        if result.is_ok() {
225            let _ = self.rsp_wake.consumer_wake.notify();
226        }
227        result
228    }
229
230    /// Current backpressure state (read by Data Plane to decide I/O depth).
231    pub fn pressure(&self) -> PressureState {
232        self.backpressure.state()
233    }
234
235    /// Whether the Data Plane should reduce read depth.
236    pub fn should_throttle(&self) -> bool {
237        matches!(
238            self.pressure(),
239            PressureState::Throttled | PressureState::Suspended
240        )
241    }
242
243    /// Whether the Data Plane should suspend new reads entirely.
244    pub fn should_suspend(&self) -> bool {
245        self.pressure() == PressureState::Suspended
246    }
247
248    /// Raw fd for the request-available signal (register with TPC event loop).
249    pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
250        self.req_wake.consumer_wake.as_fd()
251    }
252
253    /// Raw fd for the response-space-available signal.
254    pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
255        self.rsp_wake.producer_wake.as_fd()
256    }
257}
258
259impl<Req, Rsp> PinnedDataHandle<Req, Rsp> {
260    /// Try to receive a request from the Control Plane.
261    pub fn try_recv_request(&mut self) -> Result<Req> {
262        self.inner.try_recv_request()
263    }
264
265    /// Drain up to `max` requests into the buffer.
266    pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
267        self.inner.drain_requests(buf, max)
268    }
269
270    /// Try to send a response back to the Control Plane.
271    pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
272        self.inner.try_send_response(rsp)
273    }
274
275    /// Current backpressure state.
276    pub fn pressure(&self) -> PressureState {
277        self.inner.pressure()
278    }
279
280    /// Whether the Data Plane should reduce read depth.
281    pub fn should_throttle(&self) -> bool {
282        self.inner.should_throttle()
283    }
284
285    /// Whether the Data Plane should suspend new reads entirely.
286    pub fn should_suspend(&self) -> bool {
287        self.inner.should_suspend()
288    }
289
290    /// Raw fd for the request-available signal.
291    pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
292        self.inner.request_wake_fd()
293    }
294
295    /// Raw fd for the response-space-available signal.
296    pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
297        self.inner.response_space_fd()
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn bridge_channel_roundtrip() {
307        let bridge: BridgeChannel<u64, String> = BridgeChannel::new(16, 16).unwrap();
308        let mut control = bridge.control;
309        let mut data = bridge.data;
310
311        // Control sends request.
312        control.try_send_request(42).unwrap();
313
314        // Data receives request.
315        let req = data.try_recv_request().unwrap();
316        assert_eq!(req, 42);
317
318        // Data sends response.
319        data.try_send_response("result".to_string()).unwrap();
320
321        // Control receives response.
322        let rsp = control.try_recv_response().unwrap();
323        assert_eq!(rsp, "result");
324    }
325
326    #[test]
327    fn backpressure_updates_on_send() {
328        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
329        let mut control = bridge.control;
330
331        // Fill to 87.5% (14/16 slots).
332        for i in 0..14 {
333            control.try_send_request(i).unwrap();
334        }
335
336        // Should have transitioned to Throttled.
337        assert_eq!(control.pressure(), PressureState::Throttled);
338    }
339
340    #[test]
341    fn eventfd_wake_on_push() {
342        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
343        let mut control = bridge.control;
344        let data = bridge.data;
345
346        control.try_send_request(1).unwrap();
347
348        // Consumer wake fd should be signaled.
349        let count = data.req_wake.consumer_wake.try_read().unwrap();
350        assert!(count > 0);
351    }
352
353    #[test]
354    fn drain_responses_signals_producer() {
355        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
356        let mut control = bridge.control;
357        let mut data = bridge.data;
358
359        // Data sends multiple responses.
360        data.try_send_response(10).unwrap();
361        data.try_send_response(20).unwrap();
362        data.try_send_response(30).unwrap();
363
364        // Control drains them.
365        let mut buf = Vec::new();
366        let count = control.drain_responses(&mut buf, 10);
367        assert_eq!(count, 3);
368        assert_eq!(buf, vec![10, 20, 30]);
369    }
370
371    #[test]
372    fn data_handle_throttle_queries() {
373        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
374        let mut control = bridge.control;
375        let data = bridge.data;
376
377        assert!(!data.should_throttle());
378        assert!(!data.should_suspend());
379
380        // Fill past 85%.
381        for i in 0..14 {
382            control.try_send_request(i).unwrap();
383        }
384
385        assert!(data.should_throttle());
386        assert!(!data.should_suspend());
387
388        // Fill past 95%.
389        control.try_send_request(14).unwrap();
390        control.try_send_request(15).unwrap();
391
392        assert!(data.should_throttle());
393        assert!(data.should_suspend());
394    }
395}