rsub 0.1.0

A high-performance message broker with QUIC transport and pub/sub messaging patterns
Documentation
use anyhow::{anyhow, Result};
use bytes::{BufMut, Bytes, BytesMut};
use quinn::{RecvStream, SendStream};
use std::sync::Arc;
use tokio::sync::Mutex;

use super::HEADER_LENGTH;

const MAX_FRAME_SIZE: usize = 16_777_215; // 2^24 - 1, max value for 3 byte length

pub struct Frame {
    pub length: usize,
    pub payload: Bytes,
}

impl Frame {
    pub fn new(payload: Bytes) -> Result<Self> {
        let length = payload.len();
        if length > MAX_FRAME_SIZE {
            return Err(anyhow!("Payload too large: {} bytes", length));
        }
        Ok(Frame { length, payload })
    }

    pub async fn write(&self, send: &Arc<Mutex<SendStream>>) -> Result<()> {
        let mut header = BytesMut::with_capacity(HEADER_LENGTH);
        header.put_uint(self.length as u64, 3);

        let mut send = send.lock().await;
        send.write_all(&header).await?;
        send.write_all(&self.payload).await?;
        Ok(())
    }

    pub async fn read(recv: &Arc<Mutex<RecvStream>>) -> Result<Self> {
        let mut recv = recv.lock().await;

        let header = recv
            .read_chunk(HEADER_LENGTH, true)
            .await?
            .ok_or_else(|| anyhow!("Incomplete message"))?;

        let length =
            u32::from_be_bytes([0, header.bytes[0], header.bytes[1], header.bytes[2]]) as usize;

        if length > MAX_FRAME_SIZE {
            return Err(anyhow!("Payload too large: {} bytes", length));
        }

        let payload = recv
            .read_chunk(length, true)
            .await?
            .ok_or_else(|| anyhow!("Incomplete message"))?;

        Ok(Frame {
            length,
            payload: payload.bytes,
        })
    }
}

#[derive(Debug, Clone)]
pub struct FrameReader {
    recv: Arc<Mutex<RecvStream>>,
}

impl FrameReader {
    pub fn new(recv: Arc<Mutex<RecvStream>>) -> Self {
        FrameReader { recv }
    }

    pub async fn read_frame(&self) -> Result<Frame> {
        Frame::read(&self.recv).await
    }
}

#[derive(Debug, Clone)]
pub struct FrameWriter {
    send: Arc<Mutex<SendStream>>,
}

impl FrameWriter {
    pub fn new(send: Arc<Mutex<SendStream>>) -> Self {
        FrameWriter { send }
    }

    pub async fn write_frame(&self, frame: &Frame) -> Result<()> {
        frame.write(&self.send).await
    }
}