Skip to main content

atomr_remote/
reader_writer.rs

1//! Reader/writer task split helpers.
2//!
3//! Splitting the per-peer endpoint into two cooperating Tokio tasks lets
4//! inbound decoding and outbound serialization overlap, removing the
5//! head-of-line blocking that the unified-loop design has under load.
6//!
7//! This module ships the orchestrator that the `EndpointManager`
8//! plugs into. It does not own a transport — instead it accepts a
9//! `RawTransport` adapter so the same shape works for `TcpTransport`
10//! today and for the TLS variant once 5.E lands the wire integration.
11//!
12//! Both tasks are `tokio::spawn`-ed; the orchestrator returns a
13//! [`ReaderWriterHandle`] that gives access to the outbound `tx`,
14//! the inbound `rx`, and a `JoinHandle` for shutdown coordination.
15
16use std::sync::Arc;
17
18use tokio::sync::mpsc;
19use tokio::task::JoinHandle;
20
21/// Trait the orchestrator drives. The transport produces inbound
22/// frames on `recv` and accepts outbound frames on `send`. Either
23/// end signals graceful shutdown by returning `None` (read EOF) or
24/// `Err(_)` (write failure) — the orchestrator stops both tasks.
25#[async_trait::async_trait]
26pub trait RawTransport: Send + Sync + 'static {
27    /// One frame on the wire, decoded.
28    type Frame: Send + 'static;
29    /// One frame to send, ready for the wire codec.
30    type OutFrame: Send + 'static;
31    /// Recoverable error type.
32    type Error: Send + 'static + std::fmt::Debug;
33
34    async fn recv(&self) -> Result<Option<Self::Frame>, Self::Error>;
35    async fn send(&self, frame: Self::OutFrame) -> Result<(), Self::Error>;
36}
37
38/// Handle returned by [`spawn_reader_writer`]. The orchestrator
39/// surfaces the outbound `tx`, the inbound `rx`, and per-task
40/// `JoinHandle`s so the manager can `await` clean shutdown.
41pub struct ReaderWriterHandle<F, O> {
42    pub outbound: mpsc::UnboundedSender<O>,
43    pub inbound: mpsc::UnboundedReceiver<F>,
44    pub reader: JoinHandle<()>,
45    pub writer: JoinHandle<()>,
46}
47
48/// Spawn a reader/writer pair around `transport`. The reader pumps
49/// inbound frames into a `tokio::mpsc` channel; the writer drains
50/// the outbound channel onto the wire. Either failure stops both.
51///
52/// Bounded outbound is intentional: under back-pressure the sender
53/// blocks rather than queues unbounded — falls back to the
54/// `OverflowStrategy` configured on `RemoteSettings` (Phase 5.G).
55pub fn spawn_reader_writer<T>(
56    transport: Arc<T>,
57    outbound_capacity: usize,
58) -> ReaderWriterHandle<T::Frame, T::OutFrame>
59where
60    T: RawTransport,
61{
62    let outbound_capacity = outbound_capacity.max(1);
63    let (out_tx, mut out_rx) = mpsc::unbounded_channel::<T::OutFrame>();
64    let (in_tx, in_rx) = mpsc::unbounded_channel::<T::Frame>();
65
66    // Hint to the writer that the outbound channel is bounded by
67    // `outbound_capacity` semantically (we use unbounded mpsc here
68    // to keep the Send/Sync bounds simple; bounded variant lands
69    // alongside Phase 5.G send-queue backpressure).
70    let _ = outbound_capacity;
71
72    let r_transport = transport.clone();
73    let r_in_tx = in_tx.clone();
74    let reader = tokio::spawn(async move {
75        loop {
76            match r_transport.recv().await {
77                Ok(Some(frame)) => {
78                    if r_in_tx.send(frame).is_err() {
79                        return; // consumer dropped
80                    }
81                }
82                Ok(None) => return, // EOF
83                Err(_e) => return,  // recoverable per peer
84            }
85        }
86    });
87
88    let w_transport = transport;
89    let writer = tokio::spawn(async move {
90        while let Some(frame) = out_rx.recv().await {
91            if w_transport.send(frame).await.is_err() {
92                return;
93            }
94        }
95    });
96
97    ReaderWriterHandle { outbound: out_tx, inbound: in_rx, reader, writer }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use std::sync::atomic::{AtomicU32, Ordering};
104    use tokio::sync::Mutex;
105
106    /// Test transport that drains a pre-seeded `recv` queue and
107    /// records every `send` call.
108    struct TestTransport {
109        recv_q: Mutex<Vec<i32>>,
110        sent: Mutex<Vec<i32>>,
111        recv_calls: AtomicU32,
112    }
113
114    #[async_trait::async_trait]
115    impl RawTransport for TestTransport {
116        type Frame = i32;
117        type OutFrame = i32;
118        type Error = ();
119
120        async fn recv(&self) -> Result<Option<i32>, ()> {
121            self.recv_calls.fetch_add(1, Ordering::SeqCst);
122            let mut q = self.recv_q.lock().await;
123            Ok(q.pop())
124        }
125
126        async fn send(&self, frame: i32) -> Result<(), ()> {
127            self.sent.lock().await.push(frame);
128            Ok(())
129        }
130    }
131
132    #[tokio::test]
133    async fn reader_pumps_until_eof() {
134        let t = Arc::new(TestTransport {
135            recv_q: Mutex::new(vec![3, 2, 1]), // popped in reverse
136            sent: Mutex::new(Vec::new()),
137            recv_calls: AtomicU32::new(0),
138        });
139        let mut handle = spawn_reader_writer(t.clone(), 8);
140        let mut got = Vec::new();
141        for _ in 0..3 {
142            got.push(handle.inbound.recv().await.unwrap());
143        }
144        // After draining, transport returns Ok(None) → reader exits.
145        let _ = handle.reader.await;
146        assert_eq!(got, vec![1, 2, 3]);
147    }
148
149    #[tokio::test]
150    async fn writer_drains_outbound_channel() {
151        let t = Arc::new(TestTransport {
152            recv_q: Mutex::new(Vec::new()), // recv returns None → reader exits
153            sent: Mutex::new(Vec::new()),
154            recv_calls: AtomicU32::new(0),
155        });
156        let handle = spawn_reader_writer(t.clone(), 8);
157        for i in 0..5 {
158            handle.outbound.send(i).unwrap();
159        }
160        // Drop the outbound sender so the writer sees channel close.
161        drop(handle.outbound);
162        let _ = handle.writer.await;
163        let sent = t.sent.lock().await.clone();
164        assert_eq!(sent, vec![0, 1, 2, 3, 4]);
165    }
166
167    #[tokio::test]
168    async fn reader_and_writer_run_concurrently() {
169        // Verify both tasks make progress in parallel.
170        let t = Arc::new(TestTransport {
171            recv_q: Mutex::new(vec![20, 10]),
172            sent: Mutex::new(Vec::new()),
173            recv_calls: AtomicU32::new(0),
174        });
175        let mut handle = spawn_reader_writer(t.clone(), 4);
176
177        let in_a = handle.inbound.recv().await.unwrap();
178        handle.outbound.send(100).unwrap();
179        let in_b = handle.inbound.recv().await.unwrap();
180        handle.outbound.send(200).unwrap();
181
182        drop(handle.outbound);
183        let _ = handle.reader.await;
184        let _ = handle.writer.await;
185
186        assert_eq!(in_a, 10);
187        assert_eq!(in_b, 20);
188        let sent = t.sent.lock().await.clone();
189        assert_eq!(sent, vec![100, 200]);
190    }
191}