use anyhow::{anyhow, Result};
use bytes::{BufMut, Bytes, BytesMut};
use quinn::{RecvStream, SendStream};
use std::sync::Arc;
use tokio::sync::Mutex;
use super::HEADER_LENGTH;
const MAX_FRAME_SIZE: usize = 16_777_215;
pub struct Frame {
pub length: usize,
pub payload: Bytes,
}
impl Frame {
pub fn new(payload: Bytes) -> Result<Self> {
let length = payload.len();
if length > MAX_FRAME_SIZE {
return Err(anyhow!("Payload too large: {} bytes", length));
}
Ok(Frame { length, payload })
}
pub async fn write(&self, send: &Arc<Mutex<SendStream>>) -> Result<()> {
let mut header = BytesMut::with_capacity(HEADER_LENGTH);
header.put_uint(self.length as u64, 3);
let mut send = send.lock().await;
send.write_all(&header).await?;
send.write_all(&self.payload).await?;
Ok(())
}
pub async fn read(recv: &Arc<Mutex<RecvStream>>) -> Result<Self> {
let mut recv = recv.lock().await;
let header = recv
.read_chunk(HEADER_LENGTH, true)
.await?
.ok_or_else(|| anyhow!("Incomplete message"))?;
let length =
u32::from_be_bytes([0, header.bytes[0], header.bytes[1], header.bytes[2]]) as usize;
if length > MAX_FRAME_SIZE {
return Err(anyhow!("Payload too large: {} bytes", length));
}
let payload = recv
.read_chunk(length, true)
.await?
.ok_or_else(|| anyhow!("Incomplete message"))?;
Ok(Frame {
length,
payload: payload.bytes,
})
}
}
#[derive(Debug, Clone)]
pub struct FrameReader {
recv: Arc<Mutex<RecvStream>>,
}
impl FrameReader {
pub fn new(recv: Arc<Mutex<RecvStream>>) -> Self {
FrameReader { recv }
}
pub async fn read_frame(&self) -> Result<Frame> {
Frame::read(&self.recv).await
}
}
#[derive(Debug, Clone)]
pub struct FrameWriter {
send: Arc<Mutex<SendStream>>,
}
impl FrameWriter {
pub fn new(send: Arc<Mutex<SendStream>>) -> Self {
FrameWriter { send }
}
pub async fn write_frame(&self, frame: &Frame) -> Result<()> {
frame.write(&self.send).await
}
}