kompact/messaging/
framing.rs

1//! Message framing (serialization and deserialization into and from byte buffers)
2
3use crate::{
4    actors::{ActorPath, NamedPath, SystemField, SystemPath, Transport, UniquePath},
5    messaging::bitfields::{BitField, BitFieldExt},
6    serialisation::{serialisation_ids, Deserialiser, SerError, SerId, Serialisable},
7};
8use bytes::{Buf, BufMut};
9use std::{any::Any, convert::TryFrom, net::IpAddr};
10use uuid::Uuid;
11
12/// The type of address used
13#[derive(Copy, Clone, Debug, PartialEq, Eq)]
14#[repr(u8)]
15pub enum AddressType {
16    /// An IPv4 address
17    IPv4 = 0,
18    /// An IPv6 address
19    IPv6 = 1,
20    /// A domain name
21    Domain = 2,
22}
23
24/// The type of path used
25#[derive(Copy, Clone, Debug, PartialEq, Eq)]
26#[repr(u8)]
27pub enum PathType {
28    /// A [unique path](ActorPath::Unique)
29    Unique = 0,
30    /// A [named path](ActorPath::Named)
31    Named = 1,
32}
33
34impl BitField for AddressType {
35    const POS: usize = 2;
36    const WIDTH: usize = 2;
37}
38
39impl BitField for PathType {
40    const POS: usize = 7;
41    const WIDTH: usize = 1;
42}
43
44impl BitField for Transport {
45    const POS: usize = 0;
46    const WIDTH: usize = 5;
47}
48
49// other direction is try_from
50#[allow(clippy::from_over_into)]
51impl Into<u8> for AddressType {
52    fn into(self) -> u8 {
53        self as u8
54    }
55}
56
57// other direction is try_from
58#[allow(clippy::from_over_into)]
59impl Into<u8> for PathType {
60    fn into(self) -> u8 {
61        self as u8
62    }
63}
64
65// other direction is try_from
66#[allow(clippy::from_over_into)]
67impl Into<u8> for Transport {
68    fn into(self) -> u8 {
69        self as u8
70    }
71}
72
73impl TryFrom<u8> for AddressType {
74    type Error = SerError;
75
76    fn try_from(x: u8) -> Result<Self, Self::Error> {
77        match x {
78            x if x == AddressType::IPv4 as u8 => Ok(AddressType::IPv4),
79            x if x == AddressType::IPv6 as u8 => Ok(AddressType::IPv6),
80            _ => Err(SerError::InvalidType("Unsupported AddressType".into())),
81        }
82    }
83}
84
85impl TryFrom<u8> for PathType {
86    type Error = SerError;
87
88    fn try_from(x: u8) -> Result<Self, Self::Error> {
89        match x {
90            x if x == PathType::Unique as u8 => Ok(PathType::Unique),
91            x if x == PathType::Named as u8 => Ok(PathType::Named),
92            _ => Err(SerError::InvalidType("Unsupported PathType".into())),
93        }
94    }
95}
96
97impl TryFrom<u8> for Transport {
98    type Error = SerError;
99
100    fn try_from(x: u8) -> Result<Self, Self::Error> {
101        match x {
102            x if x == Transport::Local as u8 => Ok(Transport::Local),
103            x if x == Transport::Udp as u8 => Ok(Transport::Udp),
104            x if x == Transport::Tcp as u8 => Ok(Transport::Tcp),
105            _ => Err(SerError::InvalidType(
106                "Unsupported transport protocol".into(),
107            )),
108        }
109    }
110}
111
112impl<'a> From<&'a IpAddr> for AddressType {
113    fn from(addr: &'a IpAddr) -> Self {
114        match addr {
115            IpAddr::V4(_) => AddressType::IPv4,
116            IpAddr::V6(_) => AddressType::IPv6,
117        }
118    }
119}
120
121/// The header for a [system path](SystemPath)
122#[derive(Debug)]
123pub struct SystemPathHeader {
124    storage: [u8; 1],
125    pub(crate) path_type: PathType,
126    pub(crate) protocol: Transport,
127    pub(crate) address_type: AddressType,
128}
129
130impl SystemPathHeader {
131    /// Create header from an actor path
132    pub fn from_path(sys: &ActorPath) -> Self {
133        let path_type = match sys {
134            ActorPath::Unique(_) => PathType::Unique,
135            ActorPath::Named(_) => PathType::Named,
136        };
137        let address_type: AddressType = sys.address().into();
138
139        let mut storage = [0u8];
140        storage
141            .store(path_type)
142            .expect("path_type could not be stored");
143        storage
144            .store(sys.protocol())
145            .expect("protocol could not be stored");
146        storage
147            .store(address_type)
148            .expect("address could not be stored");
149
150        SystemPathHeader {
151            storage,
152            path_type,
153            protocol: sys.protocol(),
154            address_type: sys.address().into(),
155        }
156    }
157
158    /// Create header from a system path
159    pub fn from_system(sys: &SystemPath) -> Self {
160        let path_type = PathType::Unique; // doesn't matter, will be ignored anyway
161        let address_type: AddressType = sys.address().into();
162
163        let mut storage = [0u8];
164        storage
165            .store(path_type)
166            .expect("path_type could not be stored");
167        storage
168            .store(sys.protocol())
169            .expect("protocol could not be stored");
170        storage
171            .store(address_type)
172            .expect("address could not be stored");
173
174        SystemPathHeader {
175            storage,
176            path_type,
177            protocol: sys.protocol(),
178            address_type: sys.address().into(),
179        }
180    }
181
182    /// Put this header's data into the give buffer
183    pub fn put_into(&self, buf: &mut dyn BufMut) {
184        buf.put_u8(self.storage[0])
185    }
186}
187
188impl TryFrom<u8> for SystemPathHeader {
189    type Error = SerError;
190
191    fn try_from(value: u8) -> Result<Self, Self::Error> {
192        let storage = [value];
193        let path_type = storage
194            .get_as::<PathType>()
195            .map_err(|_| SerError::InvalidData("System Path could not be read.".to_owned()))?;
196        let protocol = storage.get_as::<Transport>().map_err(|_| {
197            SerError::InvalidData("System Path Transport could not be read.".to_owned())
198        })?;
199        let address_type = storage.get_as::<AddressType>().map_err(|_| {
200            SerError::InvalidData("System Path AddressType could not be read.".to_owned())
201        })?;
202
203        let header = SystemPathHeader {
204            storage,
205            path_type,
206            protocol,
207            address_type,
208        };
209        Ok(header)
210    }
211}
212
213/// # Actor Path Serialization
214/// An actor path is either Unique or Named and contains a [SystemPath].
215/// The SystemPath's header disambiguates the type (Path type).
216///
217/// # Unique Actor Paths
218///  ```text
219/// +---------------------------+
220/// | System path (*)         ...
221/// +---------------+-----------+
222/// |      UUID (16 bytes)      |
223/// +---------------------------+
224/// ```
225/// # Named Actor Paths
226/// ```text
227/// +---------------------------+
228/// | System path (*)         ...
229/// +---------------+-----------+-------------------------------+
230/// |      Named path (2 bytes prefix + variable length)      ...
231/// +-----------------------------------------------------------+
232/// ```
233///
234/// # System Paths
235/// ```text
236/// +-------------------+-------------------+-----------------------+
237/// | Path type (1 bit) | Protocol (5 bits) | Address Type (2 bits) |
238/// +-------------------+-------------------+-----------------------+----------------+
239/// |                   Address (4/16/ * bytes)                  ...| Port (2 bytes) |
240/// +---------------------------------------------------------------+----------------+
241/// ```
242impl Serialisable for SystemPath {
243    fn ser_id(&self) -> SerId {
244        serialisation_ids::SYSTEM_PATH
245    }
246
247    fn size_hint(&self) -> Option<usize> {
248        let mut size: usize = 0;
249        size += 1; // header
250        size += match self.address() {
251            IpAddr::V4(_) => 4,  // IPv4 uses 4 bytes
252            IpAddr::V6(_) => 16, // IPv4 uses 16 bytes
253        };
254        size += 2; // port # (0-65_535)
255        Some(size)
256    }
257
258    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
259        let header = SystemPathHeader::from_system(self);
260        header.put_into(buf);
261
262        system_path_put_into_buf(self, buf);
263
264        Ok(())
265    }
266
267    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
268        unimplemented!()
269    }
270}
271
272#[inline(always)]
273fn system_path_put_into_buf(path: &SystemPath, buf: &mut dyn BufMut) -> () {
274    match *path.address() {
275        IpAddr::V4(ref ip) => buf.put_slice(&ip.octets()),
276        IpAddr::V6(ref ip) => buf.put_slice(&ip.octets()),
277        // TODO support named Domain
278    }
279    buf.put_u16(path.port());
280}
281#[inline(always)]
282fn system_path_from_buf(buf: &mut dyn Buf) -> Result<(SystemPathHeader, SystemPath), SerError> {
283    // Deserialize system path
284    let fields: u8 = buf.get_u8();
285    let header = SystemPathHeader::try_from(fields)?;
286    let address: IpAddr = match header.address_type {
287        AddressType::IPv4 => {
288            if buf.remaining() < 4 {
289                return Err(SerError::InvalidData(
290                    "Could not parse 4 bytes for IPv4 address".into(),
291                ));
292            } else {
293                let mut ip_bytes = [0u8; 4];
294                buf.copy_to_slice(&mut ip_bytes);
295                IpAddr::from(ip_bytes)
296            }
297        }
298        AddressType::IPv6 => {
299            if buf.remaining() < 16 {
300                return Err(SerError::InvalidData(
301                    "Could not parse 16 bytes for IPv6 address".into(),
302                ));
303            } else {
304                let mut ip_bytes = [0u8; 16];
305                buf.copy_to_slice(&mut ip_bytes);
306                IpAddr::from(ip_bytes)
307            }
308        }
309        AddressType::Domain => {
310            unimplemented!();
311        }
312    };
313    let port = buf.get_u16();
314    let system_path = SystemPath::new(header.protocol, address, port);
315    Ok((header, system_path))
316}
317
318impl Deserialiser<SystemPath> for SystemPath {
319    const SER_ID: SerId = serialisation_ids::SYSTEM_PATH;
320
321    fn deserialise(buf: &mut dyn Buf) -> Result<SystemPath, SerError> {
322        system_path_from_buf(buf).map(|t| t.1)
323    }
324}
325
326impl Serialisable for ActorPath {
327    fn ser_id(&self) -> SerId {
328        serialisation_ids::ACTOR_PATH
329    }
330
331    // Returns the total size for this actor path, including system path information.
332    // Returns `None` if `ActorPath::Named` and the name overflows the designated 2 bytes.
333    fn size_hint(&self) -> Option<usize> {
334        let mut size: usize = 0;
335        size += self.system().size_hint()?; // def. returns Some
336
337        size += match *self {
338            ActorPath::Unique(_) => {
339                // UUIDs are 16 bytes long (see [UuidBytes])
340                16
341            }
342            ActorPath::Named(ref np) => {
343                // Named paths are length-prefixed (2 bytes)
344                // followed by variable-length name
345                let path_len: u16 = 2;
346                // Use 5 bytes per segment as base heuristic.
347                // This is much cheaper than calculating the actual length there.
348                let name_len = np.path_ref().len() * 5;
349                let name_len = u16::try_from(name_len).ok()?;
350                path_len.checked_add(name_len)? as usize
351            }
352        };
353        Some(size)
354    }
355
356    /// Serializes a Unique or Named actor path.
357    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
358        // System Path
359        let header = SystemPathHeader::from_path(self);
360        header.put_into(buf);
361        system_path_put_into_buf(self.system(), buf);
362
363        // Actor Path
364        match self {
365            ActorPath::Unique(up) => {
366                let uuid = up.id();
367                buf.put_slice(uuid.as_bytes())
368            }
369            ActorPath::Named(np) => {
370                let path = np.path_ref().join("/");
371                let data = path.as_bytes();
372                let name_len: u16 = u16::try_from(data.len()).map_err(|_| {
373                    SerError::InvalidData("Named path overflows designated 2 bytes length.".into())
374                })?;
375                buf.put_u16(name_len);
376                buf.put_slice(data);
377            }
378        }
379        Ok(())
380    }
381
382    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
383        Ok(self)
384    }
385}
386impl Deserialiser<ActorPath> for ActorPath {
387    const SER_ID: SerId = serialisation_ids::ACTOR_PATH;
388
389    fn deserialise(buf: &mut dyn Buf) -> Result<ActorPath, SerError> {
390        let (header, system_path) = system_path_from_buf(buf)?;
391
392        let path = match header.path_type {
393            PathType::Unique => {
394                if buf.remaining() < 16 {
395                    return Err(SerError::InvalidData(
396                        "Could not get 16 bytes for UUID".into(),
397                    ));
398                } else {
399                    let mut uuid_bytes = [0u8; 16];
400                    buf.copy_to_slice(&mut uuid_bytes);
401                    let uuid = Uuid::from_bytes(uuid_bytes);
402                    ActorPath::Unique(UniquePath::with_system(system_path, uuid))
403                }
404            }
405            PathType::Named => {
406                let name_len = buf.get_u16() as usize;
407                if buf.remaining() < name_len {
408                    return Err(SerError::InvalidData(format!(
409                        "Could not get {} bytes for path name",
410                        name_len
411                    )));
412                } else {
413                    let mut name_bytes = vec![0u8; name_len];
414                    buf.copy_to_slice(&mut name_bytes);
415                    let name = unsafe {
416                        // since we serialised it ourselves, this should be fine
417                        String::from_utf8_unchecked(name_bytes)
418                    };
419                    let parts: Vec<&str> = name.split('/').collect();
420                    if parts.is_empty() {
421                        return Err(SerError::InvalidData(
422                            "Could not determine name for Named path type".into(),
423                        ));
424                    } else {
425                        let path = parts.into_iter().map(|s| s.to_string()).collect();
426                        ActorPath::Named(NamedPath::with_system(system_path, path))
427                    }
428                }
429            }
430        };
431        Ok(path)
432    }
433}
434
435#[cfg(test)]
436mod serialisation_tests {
437    use super::*;
438    use crate::actors::SystemField;
439    use bytes::BytesMut; //IntoBuf
440
441    #[test]
442    fn system_path_serequiv() {
443        use super::{PathType, SystemPathHeader};
444        use crate::{
445            actors::{ActorPath, NamedPath, SystemPath, Transport},
446            messaging::framing::AddressType,
447        };
448
449        let system_path = SystemPath::new(Transport::Tcp, "127.0.0.1".parse().unwrap(), 8080u16);
450        let named_path = ActorPath::Named(NamedPath::with_system(
451            system_path.clone(),
452            vec!["actor-name".into()],
453        ));
454        let unique_path =
455            ActorPath::Unique(UniquePath::with_system(system_path.clone(), Uuid::new_v4()));
456        {
457            let header = SystemPathHeader::from_path(&named_path);
458            assert_eq!(header.path_type, PathType::Named);
459            assert_eq!(header.protocol, Transport::Tcp);
460            assert_eq!(header.address_type, AddressType::IPv4);
461        }
462        {
463            let header = SystemPathHeader::from_path(&unique_path);
464            assert_eq!(header.path_type, PathType::Unique);
465            assert_eq!(header.protocol, Transport::Tcp);
466            assert_eq!(header.address_type, AddressType::IPv4);
467        }
468
469        let mut buf = BytesMut::with_capacity(system_path.size_hint().unwrap());
470        system_path
471            .serialise(&mut buf)
472            .expect("SystemPath should serialise!");
473
474        //let mut buf = buf.into();
475        let deserialised =
476            SystemPath::deserialise(&mut buf).expect("SystemPath should deserialise!");
477
478        assert_eq!(system_path, deserialised);
479    }
480
481    #[test]
482    fn actor_path_serequiv() {
483        let expected_transport: Transport = Transport::Tcp;
484        let expected_addr: IpAddr = "12.0.0.1".parse().unwrap();
485        let unique_id: Uuid = Uuid::new_v4();
486        let port: u16 = 1234;
487
488        let unique_path = ActorPath::Unique(UniquePath::new(
489            expected_transport,
490            expected_addr,
491            port,
492            unique_id,
493        ));
494
495        let name: Vec<String> = vec!["test", "me", "please"]
496            .into_iter()
497            .map(|s| s.to_string())
498            .collect();
499        let named_path = ActorPath::Named(NamedPath::new(
500            expected_transport,
501            expected_addr,
502            port,
503            name.clone(),
504        ));
505
506        // unique paths
507        {
508            let size = Serialisable::size_hint(&unique_path).expect("Paths should have size hints");
509            let mut buf = BytesMut::with_capacity(size);
510            Serialisable::serialise(&unique_path, &mut buf)
511                .expect("UUID ActorPath Serialisation should succeed");
512
513            // Deserialise
514            //let mut buf: Buf = buf.into();
515            let deser_path = ActorPath::deserialise(&mut buf)
516                .expect("UUID ActorPath Deserialisation should succeed");
517            assert_eq!(buf.len(), 0);
518            let deser_sys: &SystemPath = SystemField::system(&deser_path);
519            assert_eq!(deser_sys.address(), &expected_addr);
520            match deser_path {
521                ActorPath::Unique(ref up) => {
522                    assert_eq!(up.id(), unique_id);
523                }
524                ActorPath::Named(_) => panic!("expected Unique path, got Named path"),
525            }
526        }
527
528        // named paths
529        {
530            let size = Serialisable::size_hint(&named_path).expect("Paths should have size hints");
531            let mut buf = BytesMut::with_capacity(size);
532            Serialisable::serialise(&named_path, &mut buf)
533                .expect("Named ActorPath Serialisation should succeed");
534
535            // Deserialise
536            let mut buf = buf.copy_to_bytes(buf.remaining());
537            let deser_path = ActorPath::deserialise(&mut buf)
538                .expect("Named ActorPath Deserialisation should succeed");
539            assert_eq!(buf.len(), 0);
540            let deser_sys: &SystemPath = SystemField::system(&deser_path);
541            assert_eq!(deser_sys.address(), &expected_addr);
542            match deser_path {
543                ActorPath::Unique(_) => panic!("expected Named path, got Unique path"),
544                ActorPath::Named(ref np) => {
545                    assert_eq!(np.path_ref(), name.as_slice());
546                }
547            }
548        }
549    }
550}