1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
pub mod decode;
pub mod encode;
use std::{
error::Error,
fmt::Display,
io::{self, Read, Write},
};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::command::codec::decode::decode_head;
use self::encode::encode_head;
use super::{Command, HEAD_SIZE};
#[derive(Debug)]
pub enum StreamError {
Bincode(bincode::Error),
Io(io::Error),
}
impl From<bincode::Error> for StreamError {
fn from(err: bincode::Error) -> Self {
Self::Bincode(err)
}
}
impl From<io::Error> for StreamError {
fn from(err: io::Error) -> Self {
Self::Io(err)
}
}
impl Display for StreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamError::Bincode(err) => err.fmt(f),
StreamError::Io(err) => err.fmt(f),
}
}
}
impl Error for StreamError {}
#[derive(Debug)]
pub struct CommandCodec<S> {
stream: S,
}
impl<S> CommandCodec<S> {
pub const fn new(stream: S) -> Self {
Self { stream }
}
pub const fn stream(&self) -> &S {
&self.stream
}
pub fn stream_mut(&mut self) -> &mut S {
&mut self.stream
}
pub fn into_inner(self) -> S {
self.stream
}
}
impl<S: Write> CommandCodec<S> {
pub fn write(&mut self, command: &Command) -> Result<usize, StreamError> {
let head = encode_head(&command)?;
self.stream.write_all(&head)?;
self.stream.write_all(&command.data)?;
Ok(command.data.len() + HEAD_SIZE)
}
}
impl<S: Read> CommandCodec<S> {
pub fn read(&mut self) -> Result<(usize, Command), StreamError> {
let mut buf = [0u8; HEAD_SIZE];
self.stream.read_exact(&mut buf)?;
let mut command = decode_head(&buf)?;
self.stream.read_exact(&mut command.data)?;
Ok((HEAD_SIZE + command.data.len(), command))
}
}
impl<S: AsyncRead + Unpin> CommandCodec<S> {
pub async fn read_async(&mut self) -> Result<(usize, Command), StreamError> {
let mut buf = [0u8; HEAD_SIZE];
self.stream.read_exact(&mut buf).await?;
let mut command = decode_head(&buf)?;
self.stream.read_exact(&mut command.data).await?;
Ok((HEAD_SIZE + command.data.len(), command))
}
}
impl<S: AsyncWrite + Unpin> CommandCodec<S> {
pub async fn write_async(&mut self, command: &Command) -> Result<usize, StreamError> {
let head = encode_head(&command)?;
self.stream.write_all(&head).await?;
self.stream.write_all(&command.data).await?;
Ok(command.data.len() + HEAD_SIZE)
}
}