use tokio::io::{
copy, split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::{select, spawn};
use tokio_util::sync::CancellationToken;
use ocapn_syrup::Value;
use crate::{Error, Result};
pub struct Connection {
to_peer: mpsc::Sender<Value>,
from_peer: mpsc::Receiver<Value>,
cancel: CancellationToken,
done: CancellationToken,
}
impl Connection {
pub async fn send(&self, value: Value) -> Result<()> {
self.to_peer.send(value).await.map_err(|e| e.into())
}
pub async fn recv(&mut self) -> Result<Value> {
self.from_peer
.recv()
.await
.ok_or(anyhow::format_err!("connection terminated"))
}
pub async fn close(&mut self) -> Result<()> {
self.cancel.cancel();
self.done.cancelled().await;
Ok(())
}
}
impl From<TcpStream> for Connection {
fn from(stream: TcpStream) -> Self {
split(stream).into()
}
}
impl<T: AsyncRead + AsyncWrite + Send + 'static> From<(ReadHalf<T>, WriteHalf<T>)> for Connection {
fn from(split_stream: (ReadHalf<T>, WriteHalf<T>)) -> Self {
let (mut rd_from_stream, mut wr_to_stream) = split_stream;
let (tx_to_stream, mut rx_to_stream) = mpsc::channel::<Value>(32);
let (tx_from_stream, rx_from_stream) = mpsc::channel::<Value>(32);
let cancel = CancellationToken::new(); let done = CancellationToken::new();
let cancel_read_task = cancel.clone();
let done_read_task = done.clone();
let cancel_write_task = cancel.clone();
let done_write_task = done.clone();
spawn(async move {
let result: Result<()> = async {
let mut rd_pending_buf: Vec<u8> = vec![];
loop {
let mut rd_buf = vec![0; 32768];
select! {
_ = cancel_read_task.cancelled() => {
return Ok(())
}
res = rd_from_stream.read(&mut rd_buf) => {
let rd = res? as usize;
rd_pending_buf.extend_from_slice(&rd_buf[0..rd]);
}
}
loop {
match ocapn_syrup::parse_value(&rd_pending_buf[..]) {
Ok((rest, value)) => {
tx_from_stream.send(value).await?;
rd_pending_buf = rest;
}
Err(nom::Err::Incomplete(_)) => break,
Err(e) => return Err(e.into()),
};
}
}
}
.await;
done_read_task.cancel();
result
});
spawn(async move {
let result: Result<()> = async {
loop {
select! {
_ = cancel_write_task.cancelled() => {
return Ok(())
}
value = rx_to_stream.recv() => {
let mut offset: usize = 0;
let wr_buf = value.ok_or(Error::msg("channel closed"))?.to_vec();
loop {
if offset >= wr_buf.len() {
break
}
let wr = copy(&mut &wr_buf[offset..], &mut wr_to_stream).await?;
offset += wr as usize;
}
wr_to_stream.flush().await?;
}
}
}
}
.await;
done_write_task.cancel();
result
});
Connection {
cancel,
done,
from_peer: rx_from_stream,
to_peer: tx_to_stream,
}
}
}
pub fn new_pipe_connection() -> Result<(Connection, Connection)> {
let (tx_to_peer_1, mut rx_to_peer_1) = mpsc::channel::<Value>(32);
let (tx_from_peer_1, rx_from_peer_1) = mpsc::channel::<Value>(32);
let (tx_to_peer_2, mut rx_to_peer_2) = mpsc::channel::<Value>(32);
let (tx_from_peer_2, rx_from_peer_2) = mpsc::channel::<Value>(32);
let cancel = CancellationToken::new(); let done_1 = CancellationToken::new(); let done_2 = CancellationToken::new();
let cancel_1_task = cancel.clone();
let cancel_2_task = cancel.clone();
let done_1_task = done_1.clone();
let done_2_task = done_2.clone();
spawn(async move {
let result: Result<()> = async {
loop {
select! {
_ = cancel_1_task.cancelled() => {
return Ok(());
}
sent_from_1_to_2 = rx_to_peer_1.recv() => {
match sent_from_1_to_2 {
Some(value) => tx_from_peer_2.send(value).await?,
None=> return Ok(()),
}
}
}
}
}
.await;
done_1_task.cancel();
result
});
spawn(async move {
let result: Result<()> = async {
loop {
select! {
_ = cancel_2_task.cancelled() => {
return Ok(());
}
sent_from_2_to_1 = rx_to_peer_2.recv() => {
match sent_from_2_to_1 {
Some(value) => tx_from_peer_1.send(value).await?,
None=> return Ok(()),
}
}
}
}
}
.await;
done_2_task.cancel();
result
});
let conn_1 = Connection {
to_peer: tx_to_peer_1,
from_peer: rx_from_peer_1,
cancel: cancel.clone(),
done: done_1,
};
let conn_2 = Connection {
to_peer: tx_to_peer_2,
from_peer: rx_from_peer_2,
cancel: cancel.clone(),
done: done_2,
};
Ok((conn_1, conn_2))
}
#[tokio::test]
async fn test_pipe() {
let (mut alice, mut bob) = new_pipe_connection().expect("pipe");
alice
.send(Value::string("anyone receiving?"))
.await
.expect("send");
let from_alice = bob.recv().await.expect("recv");
bob.send(Value::string("yep still here"))
.await
.expect("send");
let from_bob = alice.recv().await.expect("recv");
assert_eq!(from_alice, Value::string("anyone receiving?"));
assert_eq!(from_bob, Value::string("yep still here"));
alice.close().await.expect("closed");
bob.close().await.expect("closed");
}