ate_comms/protocol/
api.rs1use std::io;
2use std::pin::Pin;
3use std::task::Context;
4use std::task::Poll;
5use tokio::io::ReadBuf;
6use tokio::io::AsyncRead;
7use tokio::io::AsyncWrite;
8use async_trait::async_trait;
9use ate_crypto::EncryptKey;
10
11use super::StreamRx;
12use super::StreamTx;
13
14#[async_trait]
15pub trait MessageProtocolApi
16where Self: std::fmt::Debug + Send + Sync,
17{
18 async fn write_with_fixed_16bit_header(
19 &mut self,
20 buf: &'_ [u8],
21 delay_flush: bool,
22 ) -> Result<u64, tokio::io::Error>;
23
24 async fn write_with_fixed_32bit_header(
25 &mut self,
26 buf: &'_ [u8],
27 delay_flush: bool,
28 ) -> Result<u64, tokio::io::Error>;
29
30 async fn send(
31 &mut self,
32 wire_encryption: &Option<EncryptKey>,
33 data: &[u8],
34 ) -> Result<u64, tokio::io::Error>;
35
36 async fn read_with_fixed_16bit_header(
37 &mut self,
38 ) -> Result<Vec<u8>, tokio::io::Error>;
39
40 async fn read_with_fixed_32bit_header(
41 &mut self,
42 ) -> Result<Vec<u8>, tokio::io::Error>;
43
44 async fn read_buf_with_header(
45 &mut self,
46 wire_encryption: &Option<EncryptKey>,
47 total_read: &mut u64
48 ) -> std::io::Result<Vec<u8>>;
49
50 async fn send_close(
51 &mut self,
52 ) -> std::io::Result<()>;
53
54 async fn flush(
55 &mut self,
56 ) -> std::io::Result<()>;
57
58 fn split(&mut self, ek: Option<EncryptKey>) -> (StreamRx, StreamTx);
59
60 fn rx(&mut self) -> Option<&mut (dyn AsyncRead + Send + Sync + Unpin + 'static)>;
61
62 fn tx(&mut self) -> Option<&mut (dyn AsyncWrite + Send + Sync + Unpin + 'static)>;
63
64 fn take_rx(&mut self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin + 'static>>;
65
66 fn take_tx(&mut self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin + 'static>>;
67}
68
69impl AsyncRead
70for dyn MessageProtocolApi + Unpin + Send + Sync
71{
72 fn poll_read(
73 mut self: Pin<&mut Self>,
74 cx: &mut Context<'_>,
75 buf: &mut ReadBuf<'_>,
76 ) -> Poll<io::Result<()>> {
77 let rx = match self.rx() {
78 Some(rx) => rx,
79 None => {
80 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support reading")));
81 }
82 };
83 let rx = Pin::new(rx);
84 rx.poll_read(cx, buf)
85 }
86}
87
88impl AsyncWrite
89for dyn MessageProtocolApi + Unpin + Send + Sync
90{
91 fn poll_write(
92 mut self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 buf: &[u8],
95 ) -> Poll<Result<usize, io::Error>> {
96 let tx = match self.tx() {
97 Some(tx) => tx,
98 None => {
99 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
100 }
101 };
102 let tx = Pin::new(tx);
103 tx.poll_write(cx, buf)
104 }
105
106 fn poll_flush(
107 mut self: Pin<&mut Self>,
108 cx: &mut Context<'_>
109 ) -> Poll<Result<(), io::Error>> {
110 let tx = match self.tx() {
111 Some(tx) => tx,
112 None => {
113 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
114 }
115 };
116 let tx = Pin::new(tx);
117 tx.poll_flush(cx)
118 }
119
120 fn poll_shutdown(
121 mut self: Pin<&mut Self>,
122 cx: &mut Context<'_>
123 ) -> Poll<Result<(), io::Error>> {
124 let tx = match self.tx() {
125 Some(tx) => tx,
126 None => {
127 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Unsupported, "this stream does not support writing")));
128 }
129 };
130 let tx = Pin::new(tx);
131 tx.poll_shutdown(cx)
132 }
133}
134
135#[async_trait]
136pub trait StreamReadable {
137 async fn read(&mut self) -> io::Result<Vec<u8>>;
138}
139
140#[async_trait]
141pub trait StreamWritable {
142 async fn write(&mut self, data: &[u8]) -> io::Result<usize>;
143
144 async fn flush(&mut self) -> io::Result<()>;
145
146 async fn close(&mut self) -> io::Result<()>;
147
148 fn wire_encryption(&self) -> Option<EncryptKey>;
149}
150
151pub trait AsyncStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync {}
152
153impl<T> AsyncStream for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync
154{}
155
156impl std::fmt::Debug for dyn AsyncStream {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.write_str("async-stream")
159 }
160}