corgi 0.0.3

🦀 Rust based RPC library on UDP
Documentation
use std::collections::HashMap;

use bytes::{Bytes, BytesMut};

use crate::protocol::{
    codec::{EnvelopeCodec, PackageChunkCodec},
    types::{CallId, PackageChunk, RpcCall, RpcError},
};

#[derive(Default)]
pub(crate) struct Parser {
    chunks: HashMap<CallId, Vec<PackageChunk>>,
    chunk_codec: PackageChunkCodec,
    envelope_codec: EnvelopeCodec,
}

impl Parser {
    pub(crate) fn apply(&mut self, data: &[u8]) -> Result<Option<RpcCall>, RpcError> {
        if let Some(call_id) = self.feed(data)? {
            let bytes = self.build_package(call_id);
            let envelope = self.envelope_codec.decode(&bytes)?;
            let call = RpcCall::new(call_id, envelope);
            return Ok(Some(call));
        }

        Ok(None)
    }

    fn feed(&mut self, data: &[u8]) -> Result<Option<CallId>, RpcError> {
        let chunk = self.chunk_codec.decode(data)?;
        let total = chunk.header().total() as usize;
        let call_id = chunk.header().call_id();
        let package_chunks = self
            .chunks
            .entry(chunk.header().call_id())
            .or_insert_with(|| {
                let mut chunks = Vec::with_capacity(chunk.header().total() as usize);
                chunks.push(chunk);
                chunks
            });

        if total == package_chunks.len() {
            package_chunks.sort();
            return Ok(Some(call_id));
        }

        Ok(None)
    }

    fn build_package(&mut self, call_id: CallId) -> Bytes {
        let package_chunks = self.chunks.remove(&call_id).unwrap();
        package_chunks
            .iter()
            .map(|p| p.payload())
            .fold(BytesMut::new(), |mut acc, value| {
                acc.extend_from_slice(value);
                acc
            })
            .freeze()
    }
}