1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use actix::prelude::*;
use std::io;
use flexbuffers;
use tokio_util::codec::{Encoder, Decoder};
use futures::io::Error;
use bytes::{BytesMut, BufMut};
use byteorder::{NetworkEndian, ByteOrder};
use serde::{Serialize, Deserialize};
use crate::remote::RemoteWrapper;
const PREFIX: &[u8] = b"ACTIX/1.0\r\n";
const ENDIAN_LENGTH: usize = 4;
#[derive(Message, Deserialize, Serialize)]
#[rtype(result = "()")]
pub enum ClusterMessage {
Request(u16, bool),
Response,
Message(RemoteWrapper),
Decline
}
pub struct ConnectCodec {
prefix: bool
}
impl ConnectCodec {
pub fn new() -> ConnectCodec {
ConnectCodec {prefix: false}
}
}
impl Decoder for ConnectCodec {
type Item = ClusterMessage;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if !self.prefix {
if src.len() < 11 {
return Ok(None)
}
if &src[..11] == PREFIX {
let _s = src.split_to(11);
self.prefix = true;
} else {
return Err(io::Error::new(io::ErrorKind::Other, "Prefix mismatch"))
}
}
let size = {
if src.len() < ENDIAN_LENGTH {
return Ok(None)
}
NetworkEndian::read_u32(src.as_ref()) as usize
};
if src.len() >= size + ENDIAN_LENGTH {
let _s = src.split_to(ENDIAN_LENGTH);
let buf = src.split_to(size);
Ok(Some(flexbuffers::from_slice::<ClusterMessage>(&buf).unwrap()))
} else {
Ok(None)
}
}
}
impl Encoder<ClusterMessage> for ConnectCodec {
type Error = Error;
fn encode(&mut self, item: ClusterMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
ClusterMessage::Request(_, _) => dst.extend_from_slice(PREFIX),
ClusterMessage::Response => dst.extend_from_slice(PREFIX),
_ => {}
}
let msg = flexbuffers::to_vec(&item).unwrap();
let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + ENDIAN_LENGTH);
dst.put_u32(msg_ref.len() as u32);
dst.put(msg_ref);
Ok(())
}
}