rsocket_rust 0.7.5

rsocket-rust is an implementation of the RSocket protocol in Rust.
Documentation
use bytes::{Buf, BufMut, Bytes, BytesMut};

use super::{Body, Frame, Version};
use crate::error::RSocketError;
use crate::utils::Writeable;

#[derive(Debug, Eq, PartialEq)]
pub struct Resume {
    version: Version,
    token: Option<Bytes>,
    last_received_server_position: u64,
    first_available_client_position: u64,
}

pub struct ResumeBuilder {
    stream_id: u32,
    flag: u16,
    inner: Resume,
}

impl Resume {
    fn new() -> Resume {
        Resume {
            version: Version::default(),
            token: None,
            last_received_server_position: 0,
            first_available_client_position: 0,
        }
    }

    pub(crate) fn decode(flag: u16, b: &mut BytesMut) -> crate::Result<Resume> {
        if b.len() < 6 {
            return Err(RSocketError::InCompleteFrame.into());
        }
        let major = b.get_u16();
        let minor = b.get_u16();
        let token_size = b.get_u16() as usize;
        let token = if token_size > 0 {
            if b.len() < token_size {
                return Err(RSocketError::InCompleteFrame.into());
            }
            Some(b.split_to(token_size).freeze())
        } else {
            None
        };
        if b.len() < 16 {
            return Err(RSocketError::InCompleteFrame.into());
        }
        let last_received_server_position = b.get_u64();
        let first_available_client_position = b.get_u64();
        Ok(Resume {
            version: Version::new(major, minor),
            token,
            last_received_server_position,
            first_available_client_position,
        })
    }

    pub fn builder(stream_id: u32, flag: u16) -> ResumeBuilder {
        ResumeBuilder::new(stream_id, flag)
    }

    pub fn get_version(&self) -> Version {
        self.version
    }

    pub fn get_token(&self) -> &Option<Bytes> {
        &self.token
    }

    pub fn get_last_received_server_position(&self) -> u64 {
        self.last_received_server_position
    }

    pub fn get_first_available_client_position(&self) -> u64 {
        self.first_available_client_position
    }
}

impl ResumeBuilder {
    fn new(stream_id: u32, flag: u16) -> ResumeBuilder {
        ResumeBuilder {
            stream_id,
            flag,
            inner: Resume::new(),
        }
    }

    pub fn set_token(mut self, token: Bytes) -> Self {
        self.inner.token = Some(token);
        self
    }

    pub fn set_last_received_server_position(mut self, position: u64) -> Self {
        self.inner.last_received_server_position = position;
        self
    }

    pub fn set_first_available_client_position(mut self, position: u64) -> Self {
        self.inner.first_available_client_position = position;
        self
    }

    pub fn build(self) -> Frame {
        Frame {
            stream_id: self.stream_id,
            flag: self.flag,
            body: Body::Resume(self.inner),
        }
    }
}

impl Writeable for Resume {
    fn write_to(&self, bf: &mut BytesMut) {
        self.version.write_to(bf);
        if let Some(b) = self.get_token() {
            bf.put_u16(b.len() as u16);
            bf.extend_from_slice(b);
        }
        bf.put_u64(self.get_last_received_server_position());
        bf.put_u64(self.get_first_available_client_position());
    }

    fn len(&self) -> usize {
        let mut size: usize = 22;
        if let Some(b) = self.get_token() {
            size += b.len();
        }
        size
    }
}