use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::{Mutex, RwLock};
use crate::common::frame::{Frame, FrameReader, FrameWriter};
use super::message::Message;
#[derive(Debug, Clone)]
pub struct SendStream {
id: String,
frame_writer: FrameWriter,
closed: Arc<RwLock<bool>>,
}
impl SendStream {
pub fn new(id: String, send: quinn::SendStream) -> Self {
SendStream {
id,
frame_writer: FrameWriter::new(Arc::new(Mutex::new(send))),
closed: Arc::new(RwLock::new(false)),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn is_closed(&self) -> bool {
*self.closed.read().await
}
pub async fn close(&self) {
let mut closed = self.closed.write().await;
*closed = true;
}
pub async fn write(&self, data: Bytes) -> Result<(), std::io::Error> {
let frame =
Frame::new(data).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.frame_writer
.write_frame(&frame)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
pub async fn write_message(&self, message: Message) -> Result<(), std::io::Error> {
self.write(message.serialize().unwrap().payload).await
}
}
pub struct RecvStream {
id: String,
frame_reader: FrameReader,
closed: Arc<RwLock<bool>>,
}
impl RecvStream {
pub fn new(id: String, recv: quinn::RecvStream) -> Self {
RecvStream {
id,
frame_reader: FrameReader::new(Arc::new(Mutex::new(recv))),
closed: Arc::new(RwLock::new(false)),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn is_closed(&self) -> bool {
*self.closed.read().await
}
pub async fn close(&self) {
let mut closed = self.closed.write().await;
*closed = true;
}
pub async fn recv(&self) -> Result<Bytes, std::io::Error> {
self.frame_reader
.read_frame()
.await
.map(|frame| frame.payload)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
pub async fn recv_exact(&self, _: usize) -> Result<Bytes, std::io::Error> {
self.frame_reader
.read_frame()
.await
.map(|frame| frame.payload)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
pub async fn recv_config_frame(&self) -> Result<Bytes, std::io::Error> {
self.recv_exact(0).await
}
pub async fn recv_message(&self) -> Result<Message, std::io::Error> {
let frame = self.recv().await?;
Message::from_packet(frame).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
}