1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use crate::error::RatsioError;
use crate::protocol::parser::operation;
use crate::ops::*;

use tokio::codec::{Decoder, Encoder};

use bytes::{BytesMut, BufMut};
use nom::{Err as NomErr};

#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct OpCodec {
}

impl Decoder for OpCodec {
    type Item = Op;
    type Error = RatsioError;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let len = buf.len();
        if len == 0{
            return Ok(None);
        }
        match operation(&buf[..]) {
            Err(NomErr::Incomplete(_)) => Ok(None),
            Ok((remaining, item)) => {
                debug!(target: "ratsio", " Op::Item => {:?}", item);
                buf.split_to(len - remaining.len());
                Ok(Some(item))
            },
            Err(NomErr::Error(err)) => {
                //scan for \r\n and recover there.
                let txt = String::from(&(*String::from_utf8_lossy(&buf[..])));
                error!(target: "ratsio", " Error parsing => {:?}\n{}", err, txt);
                if let Some(offset) = buf[..].windows(2).position(|w| w == b"\r\n") {
                    buf.split_to(offset);
                    self.decode(buf)
                }else{
                    buf.split_to(len);
                    Ok(None)
                }
            },
            Err(NomErr::Failure(err)) => {
                //scan for \r\n and recover there.
                let txt = String::from(&(*String::from_utf8_lossy(&buf[..])));
                error!(target: "ratsio", " Failure parsing => {:?}\n{}", err, txt);
                if let Some(offset) = buf[..].windows(2).position(|w| w == b"\r\n") {
                    buf.split_to(offset);
                    self.decode(buf)
                }else{
                    buf.split_to(len);
                    Ok(None)
                }
            },
        }
    }
}

impl Encoder for OpCodec {
    type Item = Op;
    type Error = RatsioError;

    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
        let buf = item.into_bytes()?;
        let buf_len = buf.len();
        let remaining_bytes = dst.remaining_mut();
        if remaining_bytes < buf_len {
            dst.reserve(buf_len);
        }
        debug!(" Sending --->\n{}",  String::from(&(*String::from_utf8_lossy(&buf[..]))));
        dst.put(&buf[..]);
        Ok(())
    }
}