rsub 0.1.0

A high-performance message broker with QUIC transport and pub/sub messaging patterns
Documentation
use std::sync::Arc;

use bytes::Bytes;
use tokio::sync::{Mutex, RwLock};

use crate::common::frame::{Frame, FrameReader, FrameWriter};

use super::message::Message;

/// A stream for sending data using QUIC
#[derive(Debug, Clone)]
pub struct SendStream {
    id: String,
    frame_writer: FrameWriter,
    closed: Arc<RwLock<bool>>,
}

impl SendStream {
    /// Creates a new SendStream with the given ID and QUIC send stream
    ///
    /// # Arguments
    /// * `id` - Unique identifier for this stream
    /// * `send` - The underlying QUIC send stream
    pub fn new(id: String, send: quinn::SendStream) -> Self {
        SendStream {
            id,
            frame_writer: FrameWriter::new(Arc::new(Mutex::new(send))),
            closed: Arc::new(RwLock::new(false)),
        }
    }

    /// Returns the ID of this stream
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Checks if the stream is closed
    pub async fn is_closed(&self) -> bool {
        *self.closed.read().await
    }

    /// Marks the stream as closed
    pub async fn close(&self) {
        let mut closed = self.closed.write().await;
        *closed = true;
    }

    /// Writes data to the stream as a frame
    ///
    /// # Arguments
    /// * `data` - The bytes to write
    ///
    /// # Returns
    /// * `Ok(())` if write was successful
    /// * `Err` if there was an error writing
    pub async fn write(&self, data: Bytes) -> Result<(), std::io::Error> {
        let frame =
            Frame::new(data).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
        self.frame_writer
            .write_frame(&frame)
            .await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }

    /// Writes a message to the stream
    ///
    /// # Arguments
    /// * `message` - The message to write
    ///
    /// # Returns
    /// * `Ok(())` if write was successful
    /// * `Err` if there was an error writing
    pub async fn write_message(&self, message: Message) -> Result<(), std::io::Error> {
        self.write(message.serialize().unwrap().payload).await
    }
}

/// A stream for receiving data using QUIC
pub struct RecvStream {
    id: String,
    frame_reader: FrameReader,
    closed: Arc<RwLock<bool>>,
}

impl RecvStream {
    /// Creates a new RecvStream with the given ID and QUIC receive stream
    ///
    /// # Arguments
    /// * `id` - Unique identifier for this stream
    /// * `recv` - The underlying QUIC receive stream
    pub fn new(id: String, recv: quinn::RecvStream) -> Self {
        RecvStream {
            id,
            frame_reader: FrameReader::new(Arc::new(Mutex::new(recv))),
            closed: Arc::new(RwLock::new(false)),
        }
    }

    /// Returns the ID of this stream
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Checks if the stream is closed
    pub async fn is_closed(&self) -> bool {
        *self.closed.read().await
    }

    /// Marks the stream as closed
    pub async fn close(&self) {
        let mut closed = self.closed.write().await;
        *closed = true;
    }

    /// Receives the next frame from the stream
    ///
    /// # Returns
    /// * `Ok(Bytes)` containing the frame payload if successful
    /// * `Err` if there was an error reading
    pub async fn recv(&self) -> Result<Bytes, std::io::Error> {
        self.frame_reader
            .read_frame()
            .await
            .map(|frame| frame.payload)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }

    /// Receives a frame of exact size from the stream
    ///
    /// # Arguments
    /// * `_` - Expected size of frame (currently unused)
    ///
    /// # Returns
    /// * `Ok(Bytes)` containing the frame payload if successful
    /// * `Err` if there was an error reading
    pub async fn recv_exact(&self, _: usize) -> Result<Bytes, std::io::Error> {
        self.frame_reader
            .read_frame()
            .await
            .map(|frame| frame.payload)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }

    /// Receives a configuration frame from the stream
    ///
    /// # Returns
    /// * `Ok(Bytes)` containing the configuration data if successful
    /// * `Err` if there was an error reading
    pub async fn recv_config_frame(&self) -> Result<Bytes, std::io::Error> {
        self.recv_exact(0).await
    }

    /// Receives a message from the stream
    ///
    /// # Returns
    /// * `Ok(Message)` if successful
    /// * `Err` if there was an error reading   
    pub async fn recv_message(&self) -> Result<Message, std::io::Error> {
        let frame = self.recv().await?;
        Message::from_packet(frame).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
    }
}