ate_comms/protocol/
api.rs

1use std::io;
2use std::pin::Pin;
3use std::task::Context;
4use std::task::Poll;
5use tokio::io::ReadBuf;
6use tokio::io::AsyncRead;
7use tokio::io::AsyncWrite;
8use async_trait::async_trait;
9use ate_crypto::EncryptKey;
10
11use super::StreamRx;
12use super::StreamTx;
13
14#[async_trait]
15pub trait MessageProtocolApi
16where Self: std::fmt::Debug + Send + Sync,
17{
18    async fn write_with_fixed_16bit_header(
19        &mut self,
20        buf: &'_ [u8],
21        delay_flush: bool,
22    ) -> Result<u64, tokio::io::Error>;   
23
24    async fn write_with_fixed_32bit_header(
25        &mut self,
26        buf: &'_ [u8],
27        delay_flush: bool,
28    ) -> Result<u64, tokio::io::Error>;
29
30    async fn send(
31        &mut self,
32        wire_encryption: &Option<EncryptKey>,
33        data: &[u8],
34    ) -> Result<u64, tokio::io::Error>;
35
36    async fn read_with_fixed_16bit_header(
37        &mut self,
38    ) -> Result<Vec<u8>, tokio::io::Error>;
39
40    async fn read_with_fixed_32bit_header(
41        &mut self,
42    ) -> Result<Vec<u8>, tokio::io::Error>;
43
44    async fn read_buf_with_header(
45        &mut self,
46        wire_encryption: &Option<EncryptKey>,
47        total_read: &mut u64
48    ) -> std::io::Result<Vec<u8>>;
49
50    async fn send_close(
51        &mut self,
52    ) -> std::io::Result<()>;
53
54    async fn flush(
55        &mut self,
56    ) -> std::io::Result<()>;
57
58    fn split(&mut self, ek: Option<EncryptKey>) -> (StreamRx, StreamTx);
59
60    fn rx(&mut self) -> Option<&mut (dyn AsyncRead + Send + Sync + Unpin + 'static)>;
61
62    fn tx(&mut self) -> Option<&mut (dyn AsyncWrite + Send + Sync + Unpin + 'static)>;
63
64    fn take_rx(&mut self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin + 'static>>;
65
66    fn take_tx(&mut self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin + 'static>>;
67}
68
69impl AsyncRead
70for dyn MessageProtocolApi + Unpin + Send + Sync
71{
72    fn poll_read(
73        mut self: Pin<&mut Self>,
74        cx: &mut Context<'_>,
75        buf: &mut ReadBuf<'_>,
76    ) -> Poll<io::Result<()>> {
77        let rx = match self.rx() {
78            Some(rx) => rx,
79            None => {
80                return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support reading")));
81            }
82        };
83        let rx = Pin::new(rx);
84        rx.poll_read(cx, buf)
85    }
86}
87
88impl AsyncWrite
89for dyn MessageProtocolApi + Unpin + Send + Sync
90{
91    fn poll_write(
92        mut self: Pin<&mut Self>,
93        cx: &mut Context<'_>,
94        buf: &[u8],
95    ) -> Poll<Result<usize, io::Error>> {
96        let tx = match self.tx() {
97            Some(tx) => tx,
98            None => {
99                return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
100            }
101        };
102        let tx = Pin::new(tx);
103        tx.poll_write(cx, buf)
104    }
105
106    fn poll_flush(
107        mut self: Pin<&mut Self>,
108        cx: &mut Context<'_>
109    ) -> Poll<Result<(), io::Error>> {
110        let tx = match self.tx() {
111            Some(tx) => tx,
112            None => {
113                return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
114            }
115        };
116        let tx = Pin::new(tx);
117        tx.poll_flush(cx)
118    }
119
120    fn poll_shutdown(
121        mut self: Pin<&mut Self>,
122        cx: &mut Context<'_>
123    ) -> Poll<Result<(), io::Error>> {
124        let tx = match self.tx() {
125            Some(tx) => tx,
126            None => {
127                return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
128            }
129        };
130        let tx = Pin::new(tx);
131        tx.poll_shutdown(cx)
132    }
133}
134
135#[async_trait]
136pub trait StreamReadable {
137    async fn read(&mut self) -> io::Result<Vec<u8>>;
138}
139
140#[async_trait]
141pub trait StreamWritable {
142    async fn write(&mut self, data: &[u8]) -> io::Result<usize>;
143    
144    async fn flush(&mut self) -> io::Result<()>;
145
146    async fn close(&mut self) -> io::Result<()>;
147
148    fn wire_encryption(&self) -> Option<EncryptKey>;
149}
150
151pub trait AsyncStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync {}
152
153impl<T> AsyncStream for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync
154{}
155
156impl std::fmt::Debug for dyn AsyncStream {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.write_str("async-stream")
159    }
160}