ate_comms/protocol/
stream.rs

1use std::io;
2use ate_crypto::EncryptKey;
3use async_trait::async_trait;
4
5use super::MessageProtocolApi;
6use super::StreamReadable;
7use super::StreamWritable;
8
9#[derive(Debug)]
10pub struct StreamRx {
11    proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>,
12    ek: Option<EncryptKey>,
13}
14
15impl StreamRx
16{
17    pub(crate) fn new(proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>, ek: Option<EncryptKey>) -> Self {
18        Self {
19            proto,
20            ek
21        }
22    }
23    
24    pub async fn read(&mut self) -> io::Result<Vec<u8>>
25    {
26        let mut total_read = 0u64;
27        self.proto.read_buf_with_header(&self.ek, &mut total_read).await
28    }
29}
30
31#[async_trait]
32impl StreamReadable
33for StreamRx
34{
35    async fn read(&mut self) -> io::Result<Vec<u8>>
36    {
37        StreamRx::read(&mut self).await
38    }
39}
40
41#[derive(Debug)]
42pub struct StreamTx {
43    proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>,
44    ek: Option<EncryptKey>,
45}
46
47impl StreamTx
48{
49    pub(crate) fn new(proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>, ek: Option<EncryptKey>) -> Self {
50        Self {
51            proto,
52            ek
53        }
54    }
55
56    pub async fn write(&mut self, data: &[u8]) -> io::Result<usize>
57    {
58        self.proto.send(&self.ek, data).await
59            .map(|a| a as usize)
60    }
61
62    pub async fn flush(&mut self) -> io::Result<()> {
63        self.proto.flush().await
64    }
65
66    pub async fn close(&mut self) -> io::Result<()> {
67        self.proto.send_close().await
68    }
69
70    pub fn wire_encryption(&self) -> Option<EncryptKey> {
71        self.ek.clone()
72    }
73}
74
75#[async_trait]
76impl StreamWritable
77for StreamTx
78{
79    async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
80        StreamTx::write(&mut self, data).await
81    }
82    
83    async fn flush(&mut self) -> io::Result<()> {
84        StreamTx::flush(&mut self).await
85    }
86
87    async fn close(&mut self) -> io::Result<()> {
88        StreamTx::close(&mut self).await
89    }
90
91    fn wire_encryption(&self) -> Option<EncryptKey> {
92        StreamTx::wire_encryption(&self)
93    }
94}