ate_comms/protocol/
stream.rs1use std::io;
2use ate_crypto::EncryptKey;
3use async_trait::async_trait;
4
5use super::MessageProtocolApi;
6use super::StreamReadable;
7use super::StreamWritable;
8
9#[derive(Debug)]
10pub struct StreamRx {
11 proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>,
12 ek: Option<EncryptKey>,
13}
14
15impl StreamRx
16{
17 pub(crate) fn new(proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>, ek: Option<EncryptKey>) -> Self {
18 Self {
19 proto,
20 ek
21 }
22 }
23
24 pub async fn read(&mut self) -> io::Result<Vec<u8>>
25 {
26 let mut total_read = 0u64;
27 self.proto.read_buf_with_header(&self.ek, &mut total_read).await
28 }
29}
30
31#[async_trait]
32impl StreamReadable
33for StreamRx
34{
35 async fn read(&mut self) -> io::Result<Vec<u8>>
36 {
37 StreamRx::read(&mut self).await
38 }
39}
40
41#[derive(Debug)]
42pub struct StreamTx {
43 proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>,
44 ek: Option<EncryptKey>,
45}
46
47impl StreamTx
48{
49 pub(crate) fn new(proto: Box<dyn MessageProtocolApi + Send + Sync + 'static>, ek: Option<EncryptKey>) -> Self {
50 Self {
51 proto,
52 ek
53 }
54 }
55
56 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize>
57 {
58 self.proto.send(&self.ek, data).await
59 .map(|a| a as usize)
60 }
61
62 pub async fn flush(&mut self) -> io::Result<()> {
63 self.proto.flush().await
64 }
65
66 pub async fn close(&mut self) -> io::Result<()> {
67 self.proto.send_close().await
68 }
69
70 pub fn wire_encryption(&self) -> Option<EncryptKey> {
71 self.ek.clone()
72 }
73}
74
75#[async_trait]
76impl StreamWritable
77for StreamTx
78{
79 async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
80 StreamTx::write(&mut self, data).await
81 }
82
83 async fn flush(&mut self) -> io::Result<()> {
84 StreamTx::flush(&mut self).await
85 }
86
87 async fn close(&mut self) -> io::Result<()> {
88 StreamTx::close(&mut self).await
89 }
90
91 fn wire_encryption(&self) -> Option<EncryptKey> {
92 StreamTx::wire_encryption(&self)
93 }
94}