ocapn_netlayer/
connection.rs1use tokio::io::{
2 copy, split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
3};
4use tokio::net::TcpStream;
5use tokio::sync::mpsc;
6use tokio::{select, spawn};
7use tokio_util::sync::CancellationToken;
8
9use ocapn_syrup::Value;
10
11use crate::{Error, Result};
12
13pub struct Connection {
15 to_peer: mpsc::Sender<Value>,
17
18 from_peer: mpsc::Receiver<Value>,
20
21 cancel: CancellationToken,
24
25 done: CancellationToken,
28}
29
30impl Connection {
31 pub async fn send(&self, value: Value) -> Result<()> {
32 self.to_peer.send(value).await.map_err(|e| e.into())
35 }
36
37 pub async fn recv(&mut self) -> Result<Value> {
38 self.from_peer
39 .recv()
40 .await
41 .ok_or(anyhow::format_err!("connection terminated"))
42 }
43
44 pub async fn close(&mut self) -> Result<()> {
45 self.cancel.cancel();
46 self.done.cancelled().await;
48 Ok(())
49 }
50}
51
52impl From<TcpStream> for Connection {
53 fn from(stream: TcpStream) -> Self {
54 split(stream).into()
55 }
56}
57
58impl<T: AsyncRead + AsyncWrite + Send + 'static> From<(ReadHalf<T>, WriteHalf<T>)> for Connection {
59 fn from(split_stream: (ReadHalf<T>, WriteHalf<T>)) -> Self {
60 let (mut rd_from_stream, mut wr_to_stream) = split_stream;
61 let (tx_to_stream, mut rx_to_stream) = mpsc::channel::<Value>(32);
62 let (tx_from_stream, rx_from_stream) = mpsc::channel::<Value>(32);
63
64 let cancel = CancellationToken::new(); let done = CancellationToken::new(); let cancel_read_task = cancel.clone();
68 let done_read_task = done.clone();
69 let cancel_write_task = cancel.clone();
70 let done_write_task = done.clone();
71
72 spawn(async move {
74 let result: Result<()> = async {
75 let mut rd_pending_buf: Vec<u8> = vec![];
76 loop {
77 let mut rd_buf = vec![0; 32768];
78 select! {
79 _ = cancel_read_task.cancelled() => {
80 return Ok(())
81 }
82 res = rd_from_stream.read(&mut rd_buf) => {
83 let rd = res? as usize;
84 rd_pending_buf.extend_from_slice(&rd_buf[0..rd]);
85 }
86 }
87 loop {
88 match ocapn_syrup::parse_value(&rd_pending_buf[..]) {
89 Ok((rest, value)) => {
90 tx_from_stream.send(value).await?;
91 rd_pending_buf = rest;
92 }
93 Err(nom::Err::Incomplete(_)) => break,
94 Err(e) => return Err(e.into()),
95 };
96 }
97 }
98 }
99 .await;
100 done_read_task.cancel();
101 result
102 });
103
104 spawn(async move {
106 let result: Result<()> = async {
107 loop {
108 select! {
109 _ = cancel_write_task.cancelled() => {
110 return Ok(())
111 }
112 value = rx_to_stream.recv() => {
113 let mut offset: usize = 0;
114 let wr_buf = value.ok_or(Error::msg("channel closed"))?.to_vec();
115 loop {
116 if offset >= wr_buf.len() {
117 break
118 }
119 let wr = copy(&mut &wr_buf[offset..], &mut wr_to_stream).await?;
120 offset += wr as usize;
121 }
122 wr_to_stream.flush().await?;
123 }
124 }
125 }
126 }
127 .await;
128 done_write_task.cancel();
129 result
130 });
131
132 Connection {
133 cancel,
134 done,
135 from_peer: rx_from_stream,
136 to_peer: tx_to_stream,
137 }
138 }
139}
140
141pub fn new_pipe_connection() -> Result<(Connection, Connection)> {
144 let (tx_to_peer_1, mut rx_to_peer_1) = mpsc::channel::<Value>(32);
153 let (tx_from_peer_1, rx_from_peer_1) = mpsc::channel::<Value>(32);
154
155 let (tx_to_peer_2, mut rx_to_peer_2) = mpsc::channel::<Value>(32);
156 let (tx_from_peer_2, rx_from_peer_2) = mpsc::channel::<Value>(32);
157
158 let cancel = CancellationToken::new(); let done_1 = CancellationToken::new(); let done_2 = CancellationToken::new(); let cancel_1_task = cancel.clone();
166 let cancel_2_task = cancel.clone();
167 let done_1_task = done_1.clone();
168 let done_2_task = done_2.clone();
169
170 spawn(async move {
172 let result: Result<()> = async {
173 loop {
174 select! {
175 _ = cancel_1_task.cancelled() => {
176 return Ok(());
177 }
178 sent_from_1_to_2 = rx_to_peer_1.recv() => {
179 match sent_from_1_to_2 {
180 Some(value) => tx_from_peer_2.send(value).await?,
181 None=> return Ok(()),
182 }
183 }
184 }
185 }
186 }
187 .await;
188 done_1_task.cancel();
189 result
190 });
191
192 spawn(async move {
194 let result: Result<()> = async {
195 loop {
196 select! {
197 _ = cancel_2_task.cancelled() => {
198 return Ok(());
199 }
200 sent_from_2_to_1 = rx_to_peer_2.recv() => {
201 match sent_from_2_to_1 {
202 Some(value) => tx_from_peer_1.send(value).await?,
203 None=> return Ok(()),
204 }
205 }
206 }
207 }
208 }
209 .await;
210 done_2_task.cancel();
211 result
212 });
213
214 let conn_1 = Connection {
215 to_peer: tx_to_peer_1,
216 from_peer: rx_from_peer_1,
217 cancel: cancel.clone(),
218 done: done_1,
219 };
220
221 let conn_2 = Connection {
222 to_peer: tx_to_peer_2,
223 from_peer: rx_from_peer_2,
224 cancel: cancel.clone(),
225 done: done_2,
226 };
227
228 Ok((conn_1, conn_2))
229}
230
231#[tokio::test]
239async fn test_pipe() {
240 let (mut alice, mut bob) = new_pipe_connection().expect("pipe");
241 alice
242 .send(Value::string("anyone receiving?"))
243 .await
244 .expect("send");
245 let from_alice = bob.recv().await.expect("recv");
246 bob.send(Value::string("yep still here"))
247 .await
248 .expect("send");
249 let from_bob = alice.recv().await.expect("recv");
250 assert_eq!(from_alice, Value::string("anyone receiving?"));
251 assert_eq!(from_bob, Value::string("yep still here"));
252 alice.close().await.expect("closed");
253 bob.close().await.expect("closed");
254}