Skip to main content

nodedb_bridge/
async_bridge.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Async wrappers for the SPSC bridge.
4//!
5//! These integrate the raw `Producer`/`Consumer` with Tokio's async runtime
6//! and the eventfd-based waker, providing `async fn push()` and `async fn pop()`.
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Tokio (Control Plane)             TPC (Data Plane)
12//! ┌─────────────────────┐           ┌─────────────────────┐
13//! │  AsyncProducer      │           │  SyncConsumer        │
14//! │  .push(req).await   │──push──→  │  .poll_drain()       │
15//! │                     │           │                     │
16//! │  AsyncReceiver      │           │  SyncSender          │
17//! │  .recv(rsp).await   │←──pop──── │  .try_send(rsp)      │
18//! └─────────────────────┘           └─────────────────────┘
19//! ```
20//!
21//! The TPC side uses `SyncConsumer`/`SyncSender` (non-async, !Send-compatible).
22//! The Tokio side uses `AsyncProducer`/`AsyncReceiver` which wrap `try_push`/`try_pop`
23//! with eventfd-based waking.
24
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28use crate::backpressure::{BackpressureController, PressureState};
29use crate::buffer::{Consumer, Producer, RingBuffer};
30use crate::error::{BridgeError, Result};
31use crate::eventfd::WakePair;
32
33/// A complete bridge channel: request path (Control→Data) + response path (Data→Control).
34///
35/// Created once, then split into Tokio-side and TPC-side handles.
36pub struct BridgeChannel<Req, Rsp> {
37    /// Tokio-side handle for sending requests and receiving responses.
38    pub control: ControlHandle<Req, Rsp>,
39    /// TPC-side handle for receiving requests and sending responses.
40    pub data: DataHandle<Req, Rsp>,
41}
42
43/// Handle held by the Control Plane (Tokio).
44pub struct ControlHandle<Req, Rsp> {
45    /// Send requests to the Data Plane.
46    pub producer: Producer<Req>,
47    /// Receive responses from the Data Plane.
48    pub consumer: Consumer<Rsp>,
49    /// Backpressure state for the request queue.
50    pub backpressure: Arc<BackpressureController>,
51    /// Wakers for the request channel.
52    pub req_wake: Arc<WakePair>,
53    /// Wakers for the response channel.
54    pub rsp_wake: Arc<WakePair>,
55}
56
57/// Handle held by the Data Plane (TPC core).
58///
59/// This type is `Send` so it can be transferred to a TPC core during setup.
60/// Call `.pin()` once on the target core to get a `PinnedDataHandle` which is
61/// `!Send` — enforcing at compile time that it stays on that core forever.
62pub struct DataHandle<Req, Rsp> {
63    /// Receive requests from the Control Plane.
64    pub consumer: Consumer<Req>,
65    /// Send responses back to the Control Plane.
66    pub producer: Producer<Rsp>,
67    /// Backpressure state for the request queue (read-only from Data Plane).
68    pub backpressure: Arc<BackpressureController>,
69    /// Wakers for the request channel.
70    pub req_wake: Arc<WakePair>,
71    /// Wakers for the response channel.
72    pub rsp_wake: Arc<WakePair>,
73}
74
75/// A pinned Data Plane handle that is `!Send`.
76///
77/// Created by calling `DataHandle::pin()` on the target TPC core.
78/// Once pinned, this handle cannot be moved to another thread — the compiler
79/// enforces this. Any attempt to `tokio::spawn` or `thread::spawn` with a
80/// `PinnedDataHandle` is a compile error.
81pub struct PinnedDataHandle<Req, Rsp> {
82    inner: DataHandle<Req, Rsp>,
83    /// Makes this type `!Send`.
84    _not_send: PhantomData<*const ()>,
85}
86
87impl<Req, Rsp> BridgeChannel<Req, Rsp> {
88    /// Create a new bridge channel pair.
89    ///
90    /// `req_capacity`: Size of the Control→Data request queue.
91    /// `rsp_capacity`: Size of the Data→Control response queue.
92    pub fn new(req_capacity: usize, rsp_capacity: usize) -> std::io::Result<Self> {
93        let (req_producer, req_consumer) = RingBuffer::channel::<Req>(req_capacity);
94        let (rsp_producer, rsp_consumer) = RingBuffer::channel::<Rsp>(rsp_capacity);
95
96        let req_wake = Arc::new(WakePair::new()?);
97        let rsp_wake = Arc::new(WakePair::new()?);
98        let backpressure = Arc::new(BackpressureController::default());
99
100        Ok(Self {
101            control: ControlHandle {
102                producer: req_producer,
103                consumer: rsp_consumer,
104                backpressure: Arc::clone(&backpressure),
105                req_wake: Arc::clone(&req_wake),
106                rsp_wake: Arc::clone(&rsp_wake),
107            },
108            data: DataHandle {
109                consumer: req_consumer,
110                producer: rsp_producer,
111                backpressure,
112                req_wake,
113                rsp_wake,
114            },
115        })
116    }
117}
118
119impl<Req, Rsp> ControlHandle<Req, Rsp> {
120    /// Try to send a request to the Data Plane.
121    ///
122    /// On success, signals the consumer wake eventfd so the TPC core wakes up.
123    /// Updates backpressure state based on queue utilization.
124    pub fn try_send_request(&mut self, req: Req) -> Result<()> {
125        let result = self.producer.try_push(req);
126
127        // Update backpressure state.
128        let util = self.producer.utilization();
129        if let Some(new_state) = self.backpressure.update(util) {
130            tracing::info!(
131                utilization = util,
132                state = ?new_state,
133                "bridge backpressure transition"
134            );
135        }
136
137        match &result {
138            Ok(()) => {
139                // Signal consumer that data is available.
140                let _ = self.req_wake.consumer_wake.notify();
141            }
142            Err(BridgeError::Full { .. }) => {
143                // Don't signal — queue is full, consumer already has plenty.
144            }
145            _ => {}
146        }
147
148        result
149    }
150
151    /// Try to receive a response from the Data Plane.
152    ///
153    /// On success, signals the producer wake eventfd so the TPC core knows
154    /// there's space for more responses.
155    pub fn try_recv_response(&mut self) -> Result<Rsp> {
156        let result = self.consumer.try_pop();
157
158        if result.is_ok() {
159            // Signal the Data Plane that response queue has space.
160            let _ = self.rsp_wake.producer_wake.notify();
161        }
162
163        result
164    }
165
166    /// Drain up to `max` responses into the buffer.
167    pub fn drain_responses(&mut self, buf: &mut Vec<Rsp>, max: usize) -> usize {
168        let count = self.consumer.drain_into(buf, max);
169        if count > 0 {
170            let _ = self.rsp_wake.producer_wake.notify();
171        }
172        count
173    }
174
175    /// Current backpressure state.
176    pub fn pressure(&self) -> PressureState {
177        self.backpressure.state()
178    }
179
180    /// Raw fd for the response-available signal (register with Tokio's AsyncFd).
181    pub fn response_wake_fd(&self) -> std::os::unix::io::RawFd {
182        self.rsp_wake.consumer_wake.as_fd()
183    }
184
185    /// Raw fd for the request-space-available signal.
186    pub fn request_space_fd(&self) -> std::os::unix::io::RawFd {
187        self.req_wake.producer_wake.as_fd()
188    }
189}
190
191impl<Req, Rsp> DataHandle<Req, Rsp> {
192    /// Pin this handle to the current TPC core.
193    ///
194    /// Returns a `PinnedDataHandle` which is `!Send` — the compiler will
195    /// reject any attempt to move it to another thread.
196    ///
197    /// Call this exactly once, on the target TPC core, during setup.
198    pub fn pin(self) -> PinnedDataHandle<Req, Rsp> {
199        PinnedDataHandle {
200            inner: self,
201            _not_send: PhantomData,
202        }
203    }
204
205    /// Try to receive a request from the Control Plane.
206    pub fn try_recv_request(&mut self) -> Result<Req> {
207        let result = self.consumer.try_pop();
208        if result.is_ok() {
209            let _ = self.req_wake.producer_wake.notify();
210        }
211        result
212    }
213
214    /// Drain up to `max` requests into the buffer.
215    pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
216        let count = self.consumer.drain_into(buf, max);
217        if count > 0 {
218            let _ = self.req_wake.producer_wake.notify();
219        }
220        count
221    }
222
223    /// Try to send a response back to the Control Plane.
224    pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
225        let result = self.producer.try_push(rsp);
226        if result.is_ok() {
227            let _ = self.rsp_wake.consumer_wake.notify();
228        }
229        result
230    }
231
232    /// Current backpressure state (read by Data Plane to decide I/O depth).
233    pub fn pressure(&self) -> PressureState {
234        self.backpressure.state()
235    }
236
237    /// Whether the Data Plane should reduce read depth.
238    pub fn should_throttle(&self) -> bool {
239        matches!(
240            self.pressure(),
241            PressureState::Throttled | PressureState::Suspended
242        )
243    }
244
245    /// Whether the Data Plane should suspend new reads entirely.
246    pub fn should_suspend(&self) -> bool {
247        self.pressure() == PressureState::Suspended
248    }
249
250    /// Raw fd for the request-available signal (register with TPC event loop).
251    pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
252        self.req_wake.consumer_wake.as_fd()
253    }
254
255    /// Raw fd for the response-space-available signal.
256    pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
257        self.rsp_wake.producer_wake.as_fd()
258    }
259}
260
261impl<Req, Rsp> PinnedDataHandle<Req, Rsp> {
262    /// Try to receive a request from the Control Plane.
263    pub fn try_recv_request(&mut self) -> Result<Req> {
264        self.inner.try_recv_request()
265    }
266
267    /// Drain up to `max` requests into the buffer.
268    pub fn drain_requests(&mut self, buf: &mut Vec<Req>, max: usize) -> usize {
269        self.inner.drain_requests(buf, max)
270    }
271
272    /// Try to send a response back to the Control Plane.
273    pub fn try_send_response(&mut self, rsp: Rsp) -> Result<()> {
274        self.inner.try_send_response(rsp)
275    }
276
277    /// Current backpressure state.
278    pub fn pressure(&self) -> PressureState {
279        self.inner.pressure()
280    }
281
282    /// Whether the Data Plane should reduce read depth.
283    pub fn should_throttle(&self) -> bool {
284        self.inner.should_throttle()
285    }
286
287    /// Whether the Data Plane should suspend new reads entirely.
288    pub fn should_suspend(&self) -> bool {
289        self.inner.should_suspend()
290    }
291
292    /// Raw fd for the request-available signal.
293    pub fn request_wake_fd(&self) -> std::os::unix::io::RawFd {
294        self.inner.request_wake_fd()
295    }
296
297    /// Raw fd for the response-space-available signal.
298    pub fn response_space_fd(&self) -> std::os::unix::io::RawFd {
299        self.inner.response_space_fd()
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn bridge_channel_roundtrip() {
309        let bridge: BridgeChannel<u64, String> = BridgeChannel::new(16, 16).unwrap();
310        let mut control = bridge.control;
311        let mut data = bridge.data;
312
313        // Control sends request.
314        control.try_send_request(42).unwrap();
315
316        // Data receives request.
317        let req = data.try_recv_request().unwrap();
318        assert_eq!(req, 42);
319
320        // Data sends response.
321        data.try_send_response("result".to_string()).unwrap();
322
323        // Control receives response.
324        let rsp = control.try_recv_response().unwrap();
325        assert_eq!(rsp, "result");
326    }
327
328    #[test]
329    fn backpressure_updates_on_send() {
330        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
331        let mut control = bridge.control;
332
333        // Fill to 87.5% (14/16 slots).
334        for i in 0..14 {
335            control.try_send_request(i).unwrap();
336        }
337
338        // Should have transitioned to Throttled.
339        assert_eq!(control.pressure(), PressureState::Throttled);
340    }
341
342    #[test]
343    fn eventfd_wake_on_push() {
344        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
345        let mut control = bridge.control;
346        let data = bridge.data;
347
348        control.try_send_request(1).unwrap();
349
350        // Consumer wake fd should be signaled.
351        let count = data.req_wake.consumer_wake.try_read().unwrap();
352        assert!(count > 0);
353    }
354
355    #[test]
356    fn drain_responses_signals_producer() {
357        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
358        let mut control = bridge.control;
359        let mut data = bridge.data;
360
361        // Data sends multiple responses.
362        data.try_send_response(10).unwrap();
363        data.try_send_response(20).unwrap();
364        data.try_send_response(30).unwrap();
365
366        // Control drains them.
367        let mut buf = Vec::new();
368        let count = control.drain_responses(&mut buf, 10);
369        assert_eq!(count, 3);
370        assert_eq!(buf, vec![10, 20, 30]);
371    }
372
373    #[test]
374    fn data_handle_throttle_queries() {
375        let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
376        let mut control = bridge.control;
377        let data = bridge.data;
378
379        assert!(!data.should_throttle());
380        assert!(!data.should_suspend());
381
382        // Fill past 85%.
383        for i in 0..14 {
384            control.try_send_request(i).unwrap();
385        }
386
387        assert!(data.should_throttle());
388        assert!(!data.should_suspend());
389
390        // Fill past 95%.
391        control.try_send_request(14).unwrap();
392        control.try_send_request(15).unwrap();
393
394        assert!(data.should_throttle());
395        assert!(data.should_suspend());
396    }
397}