ocapn_netlayer/
connection.rs

1use tokio::io::{
2    copy, split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
3};
4use tokio::net::TcpStream;
5use tokio::sync::mpsc;
6use tokio::{select, spawn};
7use tokio_util::sync::CancellationToken;
8
9use ocapn_syrup::Value;
10
11use crate::{Error, Result};
12
13/// A bidirectional FIFO channel between two peers on the network.
14pub struct Connection {
15    /// Outgoing messages sent to the peer.
16    to_peer: mpsc::Sender<Value>,
17
18    /// Incoming messages received from the peer.
19    from_peer: mpsc::Receiver<Value>,
20
21    /// Cancellation token used to initiate termination of the connection and
22    /// releasing of resources.
23    cancel: CancellationToken,
24
25    /// Cancellation token used to indicate confirmation that resources have
26    /// been released.
27    done: CancellationToken,
28}
29
30impl Connection {
31    pub async fn send(&self, value: Value) -> Result<()> {
32        // TODO: check if connection OK, return error otherwise
33        // TODO: is an acknowledged send necessary?
34        self.to_peer.send(value).await.map_err(|e| e.into())
35    }
36
37    pub async fn recv(&mut self) -> Result<Value> {
38        self.from_peer
39            .recv()
40            .await
41            .ok_or(anyhow::format_err!("connection terminated"))
42    }
43
44    pub async fn close(&mut self) -> Result<()> {
45        self.cancel.cancel();
46        // TODO: timeout waiting for shutdown?
47        self.done.cancelled().await;
48        Ok(())
49    }
50}
51
52impl From<TcpStream> for Connection {
53    fn from(stream: TcpStream) -> Self {
54        split(stream).into()
55    }
56}
57
58impl<T: AsyncRead + AsyncWrite + Send + 'static> From<(ReadHalf<T>, WriteHalf<T>)> for Connection {
59    fn from(split_stream: (ReadHalf<T>, WriteHalf<T>)) -> Self {
60        let (mut rd_from_stream, mut wr_to_stream) = split_stream;
61        let (tx_to_stream, mut rx_to_stream) = mpsc::channel::<Value>(32);
62        let (tx_from_stream, rx_from_stream) = mpsc::channel::<Value>(32);
63
64        let cancel = CancellationToken::new(); // Used to initiate shutdown
65        let done = CancellationToken::new(); // Used to acknowledge shutdown complete
66
67        let cancel_read_task = cancel.clone();
68        let done_read_task = done.clone();
69        let cancel_write_task = cancel.clone();
70        let done_write_task = done.clone();
71
72        // Read from the stream, deserializing onto the from_stream channel.
73        spawn(async move {
74            let result: Result<()> = async {
75                let mut rd_pending_buf: Vec<u8> = vec![];
76                loop {
77                    let mut rd_buf = vec![0; 32768];
78                    select! {
79                        _ = cancel_read_task.cancelled() => {
80                            return Ok(())
81                        }
82                        res = rd_from_stream.read(&mut rd_buf) => {
83                            let rd = res? as usize;
84                            rd_pending_buf.extend_from_slice(&rd_buf[0..rd]);
85                        }
86                    }
87                    loop {
88                        match ocapn_syrup::parse_value(&rd_pending_buf[..]) {
89                            Ok((rest, value)) => {
90                                tx_from_stream.send(value).await?;
91                                rd_pending_buf = rest;
92                            }
93                            Err(nom::Err::Incomplete(_)) => break,
94                            Err(e) => return Err(e.into()),
95                        };
96                    }
97                }
98            }
99            .await;
100            done_read_task.cancel();
101            result
102        });
103
104        // Write to the stream, serializing values from the to_stream channel.
105        spawn(async move {
106            let result: Result<()> = async {
107                loop {
108                    select! {
109                        _ = cancel_write_task.cancelled() => {
110                            return Ok(())
111                        }
112                        value = rx_to_stream.recv() => {
113                            let mut offset: usize = 0;
114                            let wr_buf = value.ok_or(Error::msg("channel closed"))?.to_vec();
115                            loop {
116                                if offset >= wr_buf.len() {
117                                    break
118                                }
119                                let wr = copy(&mut &wr_buf[offset..], &mut wr_to_stream).await?;
120                                offset += wr as usize;
121                            }
122                            wr_to_stream.flush().await?;
123                        }
124                    }
125                }
126            }
127            .await;
128            done_write_task.cancel();
129            result
130        });
131
132        Connection {
133            cancel,
134            done,
135            from_peer: rx_from_stream,
136            to_peer: tx_to_stream,
137        }
138    }
139}
140
141/// Create a pair of connections, directly connected together by buffered mpsc
142/// channels.
143pub fn new_pipe_connection() -> Result<(Connection, Connection)> {
144    // Naming bidirectional channels can be tricky, when you're simultaneously
145    // wiring things up on both sides, from the perspectives of both sides of the
146    // communication. So these variable names are worth a little explanation,
147    // they follow a terse convention that tries to encode the purpose of each
148    // sender and receiver.
149    //
150    // tx_to_peer_1 means "the write side of the channel (tx_) to the other peer (to_), from peer 1's perspective" (peer_1)
151    // rx_from_peer_2 means "the read side of the channel (rx_) from the other peer (from_), from peer 2's perspecitve" (peer_2)
152    let (tx_to_peer_1, mut rx_to_peer_1) = mpsc::channel::<Value>(32);
153    let (tx_from_peer_1, rx_from_peer_1) = mpsc::channel::<Value>(32);
154
155    let (tx_to_peer_2, mut rx_to_peer_2) = mpsc::channel::<Value>(32);
156    let (tx_from_peer_2, rx_from_peer_2) = mpsc::channel::<Value>(32);
157
158    // Spawn two concurrent channel receiver-senders, for full duplex
159    // communication without a chance of deadlock.
160
161    let cancel = CancellationToken::new(); // Used to initiate shutdown
162    let done_1 = CancellationToken::new(); // Used to ack shutdown complete to conn_1
163    let done_2 = CancellationToken::new(); // Used to ack shutdown complete to conn_2
164
165    let cancel_1_task = cancel.clone();
166    let cancel_2_task = cancel.clone();
167    let done_1_task = done_1.clone();
168    let done_2_task = done_2.clone();
169
170    // Ferry messages from peer 1 to peer 2.
171    spawn(async move {
172        let result: Result<()> = async {
173            loop {
174                select! {
175                    _ = cancel_1_task.cancelled() => {
176                        return Ok(());
177                    }
178                    sent_from_1_to_2 = rx_to_peer_1.recv() => {
179                        match sent_from_1_to_2 {
180                           Some(value) => tx_from_peer_2.send(value).await?,
181                          None=> return Ok(()),
182                        }
183                    }
184                }
185            }
186        }
187        .await;
188        done_1_task.cancel();
189        result
190    });
191
192    // Ferry messages from peer 2 to peer 1.
193    spawn(async move {
194        let result: Result<()> = async {
195            loop {
196                select! {
197                    _ = cancel_2_task.cancelled() => {
198                        return Ok(());
199                    }
200                    sent_from_2_to_1 = rx_to_peer_2.recv() => {
201                        match sent_from_2_to_1 {
202                           Some(value) => tx_from_peer_1.send(value).await?,
203                          None=> return Ok(()),
204                        }
205                    }
206                }
207            }
208        }
209        .await;
210        done_2_task.cancel();
211        result
212    });
213
214    let conn_1 = Connection {
215        to_peer: tx_to_peer_1,
216        from_peer: rx_from_peer_1,
217        cancel: cancel.clone(),
218        done: done_1,
219    };
220
221    let conn_2 = Connection {
222        to_peer: tx_to_peer_2,
223        from_peer: rx_from_peer_2,
224        cancel: cancel.clone(),
225        done: done_2,
226    };
227
228    Ok((conn_1, conn_2))
229}
230
231// Under consideration:
232// loopback netlayer connector -- will be great for testing!
233// tor netlayer connector -- let's use Arti if we can!
234// veilid netlayer connector -- rise and reverberate!
235// clearnet tcp netlayer connector -- let a reverse proxy terminate tls
236// meshtastic netlayer connector -- probably serial because BLE on rust doesn't work yet
237
238#[tokio::test]
239async fn test_pipe() {
240    let (mut alice, mut bob) = new_pipe_connection().expect("pipe");
241    alice
242        .send(Value::string("anyone receiving?"))
243        .await
244        .expect("send");
245    let from_alice = bob.recv().await.expect("recv");
246    bob.send(Value::string("yep still here"))
247        .await
248        .expect("send");
249    let from_bob = alice.recv().await.expect("recv");
250    assert_eq!(from_alice, Value::string("anyone receiving?"));
251    assert_eq!(from_bob, Value::string("yep still here"));
252    alice.close().await.expect("closed");
253    bob.close().await.expect("closed");
254}