1use crate::remote::RemoteWrapper;
2use actix::prelude::*;
3use byteorder::{ByteOrder, NetworkEndian};
4use bytes::{Buf, BufMut, BytesMut};
5use futures::io::Error;
6use serde::{Deserialize, Serialize};
7use std::io;
8use tokio_util::codec::{Decoder, Encoder};
9
10const PREFIX: &[u8] = b"ACTIX/1.0\r\n";
11const ENDIAN_LENGTH: usize = 4;
12
13#[derive(Message, Deserialize, Serialize, Debug)]
14#[rtype(result = "()")]
15pub enum ClusterMessage {
16 Request(u16, bool),
18 Response,
19 Message(RemoteWrapper),
20 Decline,
21}
22
23impl ClusterMessage {
24 pub fn split(&self) -> (Vec<u8>, Vec<u8>) {
25 (
26 match &self {
27 Self::Message(wrapper) => wrapper.message_buffer.clone(),
28 _ => panic!("split should not be used if not ClusterMessage::Message"),
29 },
30 flexbuffers::to_vec(self).unwrap(),
31 )
32 }
33
34 pub fn set_buffer(&mut self, bytes: Vec<u8>) {
35 match self {
36 Self::Message(ref mut wrapper) => wrapper.message_buffer = bytes,
37 _ => panic!("set_buffer should not be used if not ClusterMessage::Message"),
38 }
39 }
40}
41
42pub struct ConnectCodec {
43 prefix: bool,
44}
45
46impl ConnectCodec {
47 pub fn new() -> ConnectCodec {
48 ConnectCodec { prefix: false }
49 }
50}
51
52impl Decoder for ConnectCodec {
53 type Item = ClusterMessage;
54 type Error = Error;
55
56 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
57 if !self.prefix {
58 if src.len() < 11 {
59 return Ok(None);
60 }
61 if &src[..11] == PREFIX {
62 let _s = src.split_to(11);
63 self.prefix = true;
64 } else {
65 return Err(io::Error::new(io::ErrorKind::Other, "Prefix mismatch"));
66 }
67 }
68
69 let size = {
70 if src.len() < ENDIAN_LENGTH {
71 return Ok(None);
72 }
73 NetworkEndian::read_u32(src.as_ref()) as usize
74 };
75
76 if src.len() >= size + (ENDIAN_LENGTH * 2) {
77 src.advance(ENDIAN_LENGTH);
78 let header_size = NetworkEndian::read_u32(src.as_ref()) as usize;
79 src.advance(ENDIAN_LENGTH);
80
81 if size > header_size {
82 let header = src.split_to(header_size);
83 let buf = src.split_to(size - header_size);
84 let mut cluster_message =
85 flexbuffers::from_slice::<ClusterMessage>(&header).unwrap();
86 cluster_message.set_buffer(buf.to_vec());
87 Ok(Some(cluster_message))
88 } else {
89 let buf = src.split_to(size);
90 Ok(Some(
91 flexbuffers::from_slice::<ClusterMessage>(&buf).unwrap(),
92 ))
93 }
94 } else {
95 Ok(None)
96 }
97 }
98}
99
100impl Encoder<ClusterMessage> for ConnectCodec {
101 type Error = Error;
102
103 fn encode(&mut self, item: ClusterMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
104 match &item {
105 ClusterMessage::Request(_, _) => dst.extend_from_slice(PREFIX),
106 ClusterMessage::Response => dst.extend_from_slice(PREFIX),
107 ClusterMessage::Message(_) => {
108 let (buffer, header) = item.split();
109 let buffer_ref: &[u8] = buffer.as_ref();
110 let header_ref: &[u8] = header.as_ref();
111
112 dst.reserve(header_ref.len() + buffer_ref.len() + (ENDIAN_LENGTH * 2));
113 dst.put_u32((header_ref.len() + buffer_ref.len()) as u32);
114 dst.put_u32(header_ref.len() as u32);
115 dst.put(header_ref);
116 dst.put(buffer_ref);
117
118 return Ok(());
119 }
120 _ => {}
121 }
122
123 let msg = flexbuffers::to_vec(&item).unwrap();
124
125 let msg_ref: &[u8] = msg.as_ref();
126
127 dst.reserve(msg_ref.len() + (ENDIAN_LENGTH * 2));
128 dst.put_u32(msg_ref.len() as u32);
129 dst.put_u32(msg_ref.len() as u32);
130 dst.put(msg_ref);
131 Ok(())
132 }
133}