plabble-codec 0.1.0

Plabble Transport Protocol codec
Documentation
use crate::{
    abstractions::{
        Serializable,
        SerializationError::{self, InvalidFormat, MissingInfo},
        SerializationInfo, KEY_SIZE, TYPE_APPEND, TYPE_CONNECT, TYPE_CREATE, TYPE_PUT,
        TYPE_REQUEST, TYPE_SUBSCRIBE, TYPE_UNSUBSCRIBE, TYPE_WIPE,
    },
    codec::{
        common::{assert_len, dyn_int, SlotBody, SlotRange},
        ptp_packet::PtpBody,
    },
};

/// The body of a request packet
///
/// # Variants
///
/// * `CONNECT` - connect to a server
/// * `CREATE` - create a new bucket
/// * `PUT` - put one or multiple items in a bucket
/// * `APPEND` - append one or multiple items to a bucket
/// * `WIPE` - wipe or delete a bucket
/// * `REQUEST` - request one or multiple items from a bucket
/// * `SUBSCRIBE` - subscribe to a bucket
/// * `UNSUBSCRIBE` - unsubscribe from a bucket
#[derive(Debug)]
pub enum RequestBody {
    CONNECT {
        protocol_version: u8,
        pub_key: [u8; KEY_SIZE],
    },
    CREATE(SlotRange),
    PUT {
        slots: SlotBody,
    },
    APPEND {
        items: Vec<Vec<u8>>,
    },
    WIPE(SlotRange),
    REQUEST(SlotRange),
    SUBSCRIBE(SlotRange),
    UNSUBSCRIBE,
}

impl PtpBody for RequestBody {
    fn packet_type(&self) -> u8 {
        match self {
            RequestBody::CONNECT { .. } => TYPE_CONNECT,
            RequestBody::CREATE(_) => TYPE_CREATE,
            RequestBody::PUT { .. } => TYPE_PUT,
            RequestBody::APPEND { .. } => TYPE_APPEND,
            RequestBody::WIPE(_) => TYPE_WIPE,
            RequestBody::REQUEST(_) => TYPE_REQUEST,
            RequestBody::SUBSCRIBE(_) => TYPE_SUBSCRIBE,
            RequestBody::UNSUBSCRIBE => TYPE_UNSUBSCRIBE,
        }
    }
}

impl Serializable for RequestBody {
    fn size(&self) -> usize {
        match self {
            RequestBody::CONNECT { .. } => 1 + KEY_SIZE,
            RequestBody::PUT { slots } => slots.size(),
            RequestBody::APPEND { items } => items
                .iter()
                .map(|s| s.len() + dyn_int::encoded_size(s.len() as u128))
                .sum::<usize>(),
            RequestBody::CREATE(sr)
            | RequestBody::WIPE(sr)
            | RequestBody::REQUEST(sr)
            | RequestBody::SUBSCRIBE(sr) => sr.size(),
            RequestBody::UNSUBSCRIBE => 0,
        }
    }

    fn get_bytes(&self) -> Vec<u8> {
        let mut buff = Vec::new();
        match self {
            RequestBody::CONNECT {
                protocol_version,
                pub_key,
            } => {
                buff.push(*protocol_version);
                buff.extend_from_slice(pub_key);
            }
            RequestBody::PUT { slots } => {
                buff = slots.get_bytes();
            }
            RequestBody::APPEND { items } => {
                for data in items.iter() {
                    buff.append(&mut dyn_int::encode(data.len() as u128));
                    buff.extend_from_slice(data)
                }
            }
            RequestBody::CREATE(sr)
            | RequestBody::WIPE(sr)
            | RequestBody::REQUEST(sr)
            | RequestBody::SUBSCRIBE(sr) => {
                buff = sr.get_bytes(); // or buff.append(&mut sr.get_bytes())
            }
            _ => (),
        };

        buff
    }

    fn from_bytes(data: &[u8], info: Option<SerializationInfo>) -> Result<Self, SerializationError>
    where
        Self: Sized,
    {
        if let Some(SerializationInfo::PacketType(packet_type)) = info {
            match packet_type {
                TYPE_CONNECT => {
                    assert_len(data, 1 + KEY_SIZE)?;
                    let protocol_version = data[0];
                    let pub_key: [u8; KEY_SIZE] = data[1..]
                        .try_into()
                        .expect("Failed to copy pub_key from slice");
                    Ok(RequestBody::CONNECT {
                        protocol_version,
                        pub_key,
                    })
                }
                TYPE_CREATE => Ok(RequestBody::CREATE(SlotRange::from_bytes(data, None)?)),
                TYPE_PUT => Ok(RequestBody::PUT {
                    slots: SlotBody::from_bytes(data, None)?,
                }),
                TYPE_APPEND => {
                    let mut items = Vec::new();
                    let mut bytes_read = 0;
                    while bytes_read != data.len() {
                        assert_len(data, bytes_read + 1)?;
                        let (data_len, read) = dyn_int::read_from_slice(&data[bytes_read..])?;
                        bytes_read += read;
                        assert_len(data, bytes_read + data_len as usize)?;
                        let data = data[bytes_read..(bytes_read + data_len as usize)].to_vec();
                        bytes_read += data_len as usize;
                        items.push(data);
                    }
                    Ok(RequestBody::APPEND { items })
                }
                TYPE_WIPE => Ok(RequestBody::WIPE(SlotRange::from_bytes(data, None)?)),
                TYPE_REQUEST => Ok(RequestBody::REQUEST(SlotRange::from_bytes(data, None)?)),
                TYPE_SUBSCRIBE => Ok(RequestBody::SUBSCRIBE(SlotRange::from_bytes(data, None)?)),
                TYPE_UNSUBSCRIBE => Ok(RequestBody::UNSUBSCRIBE),
                _ => Err(InvalidFormat(format!(
                    "Packet type {} is not supported",
                    packet_type
                ))),
            }
        } else {
            Err(MissingInfo(String::from(
                "Missing info paramter of PacketType",
            )))
        }
    }
}

#[cfg(test)]
mod parse_test {
    use std::vec;

    use super::*;
    use crate::abstractions::{Serializable, SerializationError::InvalidFormat};

    /*
        Parsing packets
    */

    #[test]
    fn can_parse_connect_req() {
        let data = &[
            0x77u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
            23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
        ];
        let connect = RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(0)));

        match connect {
            Ok(RequestBody::CONNECT {
                protocol_version,
                pub_key,
            }) => {
                assert_eq!(0x77, protocol_version);
                assert_eq!(&data[1..33], pub_key);
                assert_eq!(data.len(), connect.unwrap().size());
            }
            Err(e) => panic!("Error: {:?}", e),
            _ => panic!("Not a connect"),
        }
    }

    #[test]
    fn can_parse_put_req() {
        let data = &[
            0, 5u8, 0xA, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 2u8, 0x5, 1, 2, 3, 4, 5,
        ];
        let put = RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(2))).unwrap();
        assert_eq!(21, put.size());
        if let RequestBody::PUT { slots } = put {
            assert_eq!(2, slots.len());
            assert!(slots.keys().any(|x| *x == 5));
            assert!(slots.keys().any(|x| *x == 2));
            assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], slots[&5]);
            assert_eq!(vec![1, 2, 3, 4, 5], slots[&2]);
        } else {
            panic!("Not a put request");
        }
    }

    #[test]
    fn can_parse_append_req() {
        let data = &[10u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5u8, 1, 2, 3, 4, 5];
        let append = RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(3))).unwrap();
        assert_eq!(17, append.size());
        if let RequestBody::APPEND { items } = append {
            assert_eq!(2, items.len());
            assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], items[0]);
            assert_eq!(vec![1, 2, 3, 4, 5], items[1]);
        } else {
            panic!("Not an append request");
        }
    }

    #[test]
    fn can_parse_slot_range_0_slots_req() {
        let data: &[u8] = &[];
        if let RequestBody::REQUEST(sr) =
            RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(5))).unwrap()
        {
            assert_eq!(None, sr.from);
            assert_eq!(None, sr.to);
        } else {
            panic!("Not a request");
        }
    }

    #[test]
    fn can_parse_slot_range_1_slot_req() {
        let data: &[u8] = &[0, 15];
        if let RequestBody::WIPE(sr) =
            RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(4))).unwrap()
        {
            assert_eq!(Some(15), sr.from);
            assert_eq!(None, sr.to);
        } else {
            panic!("Not a wipe request");
        }
    }

    #[test]
    fn can_parse_slot_range_2_slots_req() {
        let data: &[u8] = &[0, 15, 255, 255];
        let body = RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(6))).unwrap();
        assert_eq!(4, body.size());
        if let RequestBody::SUBSCRIBE(sr) = body {
            assert_eq!(Some(15), sr.from);
            assert_eq!(Some(u16::MAX), sr.to);
        } else {
            panic!("Not a subscribe request");
        }
    }

    #[test]
    fn cant_parse_unknown_packet_type() {
        let data = &[0u8, 0];
        let packet_type = 8;
        let error = RequestBody::from_bytes(data, Some(SerializationInfo::PacketType(packet_type)))
            .unwrap_err();
        assert_eq!(
            InvalidFormat(String::from("Packet type 8 is not supported")),
            error
        );
    }
}

#[cfg(test)]
mod serialize_test {
    use std::collections::HashMap;

    use crate::abstractions::Serializable;

    use super::*;
    /*
        Serializing packets
    */

    #[test]
    fn can_serialize_connect_req() {
        let connect = RequestBody::CONNECT {
            protocol_version: 5,
            pub_key: [
                1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
                23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
            ],
        };
        let expected = vec![
            5, 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
            24, 25, 26, 27, 28, 29, 30, 31, 32,
        ];
        let serialized = connect.get_bytes();
        assert_eq!(expected, serialized);
    }

    #[test]
    fn can_serialize_put_req() {
        let mut slots = HashMap::new();
        slots.insert(25, vec![1, 2, 3, 4, 5]);
        slots.insert(6, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
        let put = RequestBody::PUT { slots }.get_bytes();
        assert_eq!(
            put,
            vec![0, 6, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 25, 5, 1, 2, 3, 4, 5],
        );
    }

    #[test]
    fn can_serialize_create_req() {
        let create = RequestBody::CREATE(SlotRange {
            from: Some(5),
            to: None,
        })
        .get_bytes();
        assert_eq!(vec![0, 5], create);
    }

    #[test]
    fn can_serialize_create_req_2() {
        let create = RequestBody::CREATE(SlotRange {
            from: Some(5),
            to: Some(u16::MAX - 256),
        })
        .get_bytes();
        assert_eq!(vec![0, 5, 254, 255], create);
    }

    #[test]
    fn can_serialize_append_req() {
        let items = vec![vec![1, 2, 3, 4, 5], vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]];
        let append = RequestBody::APPEND { items }.get_bytes();
        assert_eq!(
            vec![5, 1, 2, 3, 4, 5, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            append
        );
    }
}