actix_telepathy/
codec.rs

1use crate::remote::RemoteWrapper;
2use actix::prelude::*;
3use byteorder::{ByteOrder, NetworkEndian};
4use bytes::{Buf, BufMut, BytesMut};
5use futures::io::Error;
6use serde::{Deserialize, Serialize};
7use std::io;
8use tokio_util::codec::{Decoder, Encoder};
9
10const PREFIX: &[u8] = b"ACTIX/1.0\r\n";
11const ENDIAN_LENGTH: usize = 4;
12
13#[derive(Message, Deserialize, Serialize, Debug)]
14#[rtype(result = "()")]
15pub enum ClusterMessage {
16    /// bool = is_seed?
17    Request(u16, bool),
18    Response,
19    Message(RemoteWrapper),
20    Decline,
21}
22
23impl ClusterMessage {
24    pub fn split(&self) -> (Vec<u8>, Vec<u8>) {
25        (
26            match &self {
27                Self::Message(wrapper) => wrapper.message_buffer.clone(),
28                _ => panic!("split should not be used if not ClusterMessage::Message"),
29            },
30            flexbuffers::to_vec(self).unwrap(),
31        )
32    }
33
34    pub fn set_buffer(&mut self, bytes: Vec<u8>) {
35        match self {
36            Self::Message(ref mut wrapper) => wrapper.message_buffer = bytes,
37            _ => panic!("set_buffer should not be used if not ClusterMessage::Message"),
38        }
39    }
40}
41
42pub struct ConnectCodec {
43    prefix: bool,
44}
45
46impl ConnectCodec {
47    pub fn new() -> ConnectCodec {
48        ConnectCodec { prefix: false }
49    }
50}
51
52impl Decoder for ConnectCodec {
53    type Item = ClusterMessage;
54    type Error = Error;
55
56    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
57        if !self.prefix {
58            if src.len() < 11 {
59                return Ok(None);
60            }
61            if &src[..11] == PREFIX {
62                let _s = src.split_to(11);
63                self.prefix = true;
64            } else {
65                return Err(io::Error::new(io::ErrorKind::Other, "Prefix mismatch"));
66            }
67        }
68
69        let size = {
70            if src.len() < ENDIAN_LENGTH {
71                return Ok(None);
72            }
73            NetworkEndian::read_u32(src.as_ref()) as usize
74        };
75
76        if src.len() >= size + (ENDIAN_LENGTH * 2) {
77            src.advance(ENDIAN_LENGTH);
78            let header_size = NetworkEndian::read_u32(src.as_ref()) as usize;
79            src.advance(ENDIAN_LENGTH);
80
81            if size > header_size {
82                let header = src.split_to(header_size);
83                let buf = src.split_to(size - header_size);
84                let mut cluster_message =
85                    flexbuffers::from_slice::<ClusterMessage>(&header).unwrap();
86                cluster_message.set_buffer(buf.to_vec());
87                Ok(Some(cluster_message))
88            } else {
89                let buf = src.split_to(size);
90                Ok(Some(
91                    flexbuffers::from_slice::<ClusterMessage>(&buf).unwrap(),
92                ))
93            }
94        } else {
95            Ok(None)
96        }
97    }
98}
99
100impl Encoder<ClusterMessage> for ConnectCodec {
101    type Error = Error;
102
103    fn encode(&mut self, item: ClusterMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
104        match &item {
105            ClusterMessage::Request(_, _) => dst.extend_from_slice(PREFIX),
106            ClusterMessage::Response => dst.extend_from_slice(PREFIX),
107            ClusterMessage::Message(_) => {
108                let (buffer, header) = item.split();
109                let buffer_ref: &[u8] = buffer.as_ref();
110                let header_ref: &[u8] = header.as_ref();
111
112                dst.reserve(header_ref.len() + buffer_ref.len() + (ENDIAN_LENGTH * 2));
113                dst.put_u32((header_ref.len() + buffer_ref.len()) as u32);
114                dst.put_u32(header_ref.len() as u32);
115                dst.put(header_ref);
116                dst.put(buffer_ref);
117
118                return Ok(());
119            }
120            _ => {}
121        }
122
123        let msg = flexbuffers::to_vec(&item).unwrap();
124
125        let msg_ref: &[u8] = msg.as_ref();
126
127        dst.reserve(msg_ref.len() + (ENDIAN_LENGTH * 2));
128        dst.put_u32(msg_ref.len() as u32);
129        dst.put_u32(msg_ref.len() as u32);
130        dst.put(msg_ref);
131        Ok(())
132    }
133}