1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use crate::error::RatsioError;
use crate::protocol::parser::operation;
use crate::ops::*;
use tokio::codec::{Decoder, Encoder};
use bytes::{BytesMut, BufMut};
use nom::{Err as NomErr};
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct OpCodec {
}
impl Decoder for OpCodec {
type Item = Op;
type Error = RatsioError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let len = buf.len();
if len == 0{
return Ok(None);
}
match operation(&buf[..]) {
Err(NomErr::Incomplete(_)) => Ok(None),
Ok((remaining, item)) => {
debug!(target: "ratsio", " Op::Item => {:?}", item);
buf.split_to(len - remaining.len());
Ok(Some(item))
},
Err(NomErr::Error(err)) => {
let txt = String::from(&(*String::from_utf8_lossy(&buf[..])));
error!(target: "ratsio", " Error parsing => {:?}\n{}", err, txt);
if let Some(offset) = buf[..].windows(2).position(|w| w == b"\r\n") {
buf.split_to(offset);
self.decode(buf)
}else{
buf.split_to(len);
Ok(None)
}
},
Err(NomErr::Failure(err)) => {
let txt = String::from(&(*String::from_utf8_lossy(&buf[..])));
error!(target: "ratsio", " Failure parsing => {:?}\n{}", err, txt);
if let Some(offset) = buf[..].windows(2).position(|w| w == b"\r\n") {
buf.split_to(offset);
self.decode(buf)
}else{
buf.split_to(len);
Ok(None)
}
},
}
}
}
impl Encoder for OpCodec {
type Item = Op;
type Error = RatsioError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let buf = item.into_bytes()?;
let buf_len = buf.len();
let remaining_bytes = dst.remaining_mut();
if remaining_bytes < buf_len {
dst.reserve(buf_len);
}
debug!(" Sending --->\n{}", String::from(&(*String::from_utf8_lossy(&buf[..]))));
dst.put(&buf[..]);
Ok(())
}
}