use crate::{
Encoding, Status,
encoding::DEFAULT_MAX_MESSAGE_SIZE,
frame::{
reader::{ReadState, poll_read_message},
writer::encode_payload,
},
};
use bytes::Bytes;
use futures_lite::{AsyncWriteExt, Stream};
use std::{
future::poll_fn,
pin::Pin,
task::{Context, Poll},
};
use trillium::Upgrade;
pub struct RequestStream<'a, T> {
upgrade: &'a mut Upgrade,
state: ReadState,
decode: fn(&[u8]) -> Result<T, Status>,
encoding: Encoding,
max_message_size: usize,
}
impl<'a, T> RequestStream<'a, T> {
pub(crate) fn new(
upgrade: &'a mut Upgrade,
decode: fn(&[u8]) -> Result<T, Status>,
encoding: Encoding,
) -> Self {
Self {
upgrade,
state: ReadState::new(),
decode,
encoding,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
impl<T: 'static> Stream for RequestStream<'_, T> {
type Item = Result<T, Status>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
poll_read_message(
Pin::new(&mut *this.upgrade),
&mut this.state,
cx,
this.decode,
this.encoding,
this.max_message_size,
)
}
}
pub struct ResponseSink<'a, T> {
upgrade: &'a mut Upgrade,
encode: fn(&T) -> Result<Bytes, Status>,
encoding: Encoding,
}
impl<'a, T> ResponseSink<'a, T> {
pub(crate) fn new(
upgrade: &'a mut Upgrade,
encode: fn(&T) -> Result<Bytes, Status>,
encoding: Encoding,
) -> Self {
Self {
upgrade,
encode,
encoding,
}
}
pub async fn send(&mut self, value: T) -> Result<(), Status> {
let payload = (self.encode)(&value)?;
let frame = encode_payload(&payload, self.encoding)?;
self.upgrade
.write_all(&frame)
.await
.map_err(|e| Status::unavailable(format!("write error: {e}")))
}
}
pub struct Channel<'a, Req, Resp> {
upgrade: &'a mut Upgrade,
state: ReadState,
decode: fn(&[u8]) -> Result<Req, Status>,
encode: fn(&Resp) -> Result<Bytes, Status>,
inbound_encoding: Encoding,
outbound_encoding: Encoding,
max_message_size: usize,
}
impl<'a, Req, Resp> Channel<'a, Req, Resp> {
pub(crate) fn new(
upgrade: &'a mut Upgrade,
decode: fn(&[u8]) -> Result<Req, Status>,
encode: fn(&Resp) -> Result<Bytes, Status>,
inbound_encoding: Encoding,
outbound_encoding: Encoding,
) -> Self {
Self {
upgrade,
state: ReadState::new(),
decode,
encode,
inbound_encoding,
outbound_encoding,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
pub async fn recv(&mut self) -> Option<Result<Req, Status>> {
let upgrade = &mut *self.upgrade;
let state = &mut self.state;
let decode = self.decode;
let encoding = self.inbound_encoding;
let max = self.max_message_size;
poll_fn(|cx| poll_read_message(Pin::new(&mut *upgrade), state, cx, decode, encoding, max))
.await
}
pub async fn send(&mut self, value: Resp) -> Result<(), Status> {
let payload = (self.encode)(&value)?;
let frame = encode_payload(&payload, self.outbound_encoding)?;
self.upgrade
.write_all(&frame)
.await
.map_err(|e| Status::unavailable(format!("write error: {e}")))
}
}