rsocket_rust 0.6.0

rsocket-rust is an implementation of the RSocket protocol in Rust.
Documentation
use crate::frame::{self, Body, Frame};
use crate::payload::Payload;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::collections::LinkedList;

pub(crate) const MIN_MTU: usize = 64;

pub(crate) struct Joiner {
    inner: LinkedList<Frame>,
}

#[derive(Debug, Clone)]
pub(crate) struct Splitter {
    mtu: usize,
}

impl Splitter {
    pub(crate) fn new(mtu: usize) -> Splitter {
        assert!(mtu > frame::LEN_HEADER, "mtu is too small!");
        Splitter { mtu }
    }

    pub(crate) fn cut(&self, input: Payload, skip: usize) -> impl Iterator<Item = Payload> {
        let (data, meta) = input.split();
        SplitterIter {
            mtu: self.mtu,
            skip,
            data,
            meta,
        }
    }
}

struct SplitterIter {
    mtu: usize,
    skip: usize,
    data: Option<Bytes>,
    meta: Option<Bytes>,
}

impl Iterator for SplitterIter {
    type Item = Payload;

    fn next(&mut self) -> Option<Payload> {
        if self.meta.is_none() && self.data.is_none() {
            return None;
        }
        let mut m: Option<Bytes> = None;
        let mut d: Option<Bytes> = None;
        let mut left = self.mtu - frame::LEN_HEADER - self.skip;
        if let Some(it) = &mut self.meta {
            let msize = it.len();
            if left < msize {
                m = Some(it.split_to(left));
                left = 0;
            } else {
                m = self.meta.take();
                left -= msize;
            }
        }

        if left > 0 {
            if let Some(it) = &mut self.data {
                let dsize = it.len();
                if left < dsize {
                    d = Some(it.split_to(left));
                } else {
                    d = self.data.take();
                }
            }
        }
        self.skip = 0;
        Some(Payload::new(d, m))
    }
}

impl Into<Payload> for Joiner {
    fn into(self) -> Payload {
        let mut bf = BytesMut::new();
        let mut bf2 = BytesMut::new();
        self.inner.into_iter().for_each(|it: Frame| {
            let (d, m) = match it.body {
                Body::RequestResponse(body) => body.split(),
                Body::RequestStream(body) => body.split(),
                Body::RequestChannel(body) => body.split(),
                Body::RequestFNF(body) => body.split(),
                Body::Payload(body) => body.split(),
                _ => (None, None),
            };
            if let Some(raw) = d {
                bf.put(raw);
            }
            if let Some(raw) = m {
                bf2.put(raw);
            }
        });

        let data = if bf.is_empty() {
            None
        } else {
            Some(bf.freeze())
        };
        let metadata = if bf2.is_empty() {
            None
        } else {
            Some(bf2.freeze())
        };
        Payload::new(data, metadata)
    }
}

impl Joiner {
    pub(crate) fn new() -> Joiner {
        Joiner {
            inner: LinkedList::new(),
        }
    }

    pub(crate) fn get_stream_id(&self) -> u32 {
        self.first().get_stream_id()
    }

    pub(crate) fn get_flag(&self) -> u16 {
        self.first().get_flag() & !Frame::FLAG_FOLLOW
    }

    pub(crate) fn first(&self) -> &Frame {
        self.inner.front().expect("No frames pushed!")
    }

    pub(crate) fn push(&mut self, next: Frame) {
        self.inner.push_back(next);
    }
}

#[cfg(test)]
mod tests {

    use crate::frame::{self, Frame};
    use crate::payload::Payload;
    use crate::transport::{Joiner, Splitter};
    use bytes::{Buf, Bytes};

    #[test]
    fn test_joiner() {
        let first = frame::Payload::builder(1, Frame::FLAG_FOLLOW)
            .set_data(Bytes::from("(ROOT)"))
            .set_metadata(Bytes::from("(ROOT)"))
            .build();
        let mut joiner = Joiner::new();
        joiner.push(first);

        for i in 0..10 {
            let flag = if i == 9 { 0u16 } else { Frame::FLAG_FOLLOW };
            let next = frame::Payload::builder(1, flag)
                .set_data(Bytes::from(format!("(data{:04})", i)))
                .set_metadata(Bytes::from(format!("(data{:04})", i)))
                .build();
            joiner.push(next);
        }
        let pa: Payload = joiner.into();
        println!("payload: {:?}", pa);
    }

    #[test]
    fn test_splitter() {
        let input = Payload::builder()
            .set_data_utf8("helloworld")
            .set_metadata_utf8("foobar")
            .build();
        let mut sp = Splitter::new(13);
        for (i, it) in sp.cut(input.clone(), 0).enumerate() {
            println!("{}: {:?}", i, it);
        }
        println!("MODE 100");
        sp = Splitter::new(100);
        for (i, it) in sp.cut(input.clone(), 0).enumerate() {
            println!("{}: {:?}", i, it);
        }
    }
}