use crate::{application, stream};
use core::task::Poll;
#[derive(Default, Debug)]
pub struct Request<'a> {
pub tx: Option<tx::Request<'a>>,
pub rx: Option<rx::Request<'a>>,
}
impl<'a> Request<'a> {
pub fn send(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
self.tx_mut().chunks = Some(chunks);
self
}
pub fn reset(&mut self, error: application::Error) -> &mut Self {
self.tx_mut().reset = Some(error);
self
}
pub fn flush(&mut self) -> &mut Self {
self.tx_mut().flush = true;
self
}
pub fn finish(&mut self) -> &mut Self {
self.tx_mut().finish = true;
self
}
pub fn receive(&mut self, chunks: &'a mut [bytes::Bytes]) -> &mut Self {
self.rx_mut().chunks = Some(chunks);
self
}
pub fn stop_sending(&mut self, error: application::Error) -> &mut Self {
self.rx_mut().stop_sending = Some(error);
self
}
pub fn with_watermark(&mut self, low: usize, high: usize) -> &mut Self {
let rx = self.rx_mut();
rx.low_watermark = low.min(high);
rx.high_watermark = high.max(low);
self
}
pub fn with_low_watermark(&mut self, low: usize) -> &mut Self {
let rx = self.rx_mut();
rx.low_watermark = low;
rx.high_watermark = rx.high_watermark.max(low);
self
}
pub fn with_high_watermark(&mut self, high: usize) -> &mut Self {
let rx = self.rx_mut();
rx.high_watermark = high;
rx.low_watermark = rx.low_watermark.min(high);
self
}
pub fn detach_tx(&mut self) -> &mut Self {
let tx = self.tx_mut();
tx.detached = true;
self
}
pub fn detach_rx(&mut self) -> &mut Self {
let rx = self.rx_mut();
rx.detached = true;
self
}
fn tx_mut(&mut self) -> &mut tx::Request<'a> {
if self.tx.is_none() {
self.tx = Some(Default::default());
}
self.tx.as_mut().expect("tx should always be initialized")
}
fn rx_mut(&mut self) -> &mut rx::Request<'a> {
if self.rx.is_none() {
self.rx = Some(Default::default());
}
self.rx.as_mut().expect("rx should always be initialized")
}
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Response {
pub tx: Option<tx::Response>,
pub rx: Option<rx::Response>,
}
impl Response {
pub fn is_pending(&self) -> bool {
self.tx.iter().any(|tx| tx.is_pending()) || self.rx.iter().any(|rx| rx.is_pending())
}
pub fn tx(&self) -> Option<&tx::Response> {
self.tx.as_ref()
}
pub fn rx(&self) -> Option<&rx::Response> {
self.rx.as_ref()
}
}
pub mod tx {
use super::*;
#[derive(Default, Debug)]
pub struct Request<'a> {
pub chunks: Option<&'a mut [bytes::Bytes]>,
pub reset: Option<application::Error>,
pub flush: bool,
pub finish: bool,
pub detached: bool,
}
#[derive(Debug, PartialEq, Eq)]
pub struct Response {
pub bytes: Bytes,
pub chunks: Chunks,
pub will_wake: bool,
pub status: Status,
}
impl Default for Response {
fn default() -> Self {
Self {
bytes: Bytes::default(),
chunks: Chunks::default(),
will_wake: false,
status: Status::Open,
}
}
}
impl Response {
pub fn is_pending(&self) -> bool {
self.will_wake
}
pub fn tx(&self) -> Option<&Self> {
Some(self)
}
}
}
pub mod rx {
use super::*;
#[derive(Debug)]
pub struct Request<'a> {
pub chunks: Option<&'a mut [bytes::Bytes]>,
pub low_watermark: usize,
pub high_watermark: usize,
pub stop_sending: Option<application::Error>,
pub detached: bool,
}
impl Default for Request<'_> {
fn default() -> Self {
Self {
chunks: None,
low_watermark: 0,
high_watermark: usize::MAX,
stop_sending: None,
detached: false,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct Response {
pub bytes: Bytes,
pub chunks: Chunks,
pub will_wake: bool,
pub status: Status,
}
impl Default for Response {
fn default() -> Self {
Self {
bytes: Bytes::default(),
chunks: Chunks::default(),
will_wake: false,
status: Status::Open,
}
}
}
impl Response {
pub fn is_pending(&self) -> bool {
self.will_wake
}
pub fn rx(&self) -> Option<&Self> {
Some(self)
}
}
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Bytes {
pub consumed: usize,
pub available: usize,
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Chunks {
pub consumed: usize,
pub available: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Status {
Open,
Finishing,
Finished,
Resetting,
Reset(stream::StreamError),
}
macro_rules! impl_status {
(| $self:ident | $value:expr) => {
pub fn is_open(&self) -> bool {
matches!(self.status(), Status::Open)
}
pub fn is_finishing(&self) -> bool {
matches!(self.status(), Status::Finishing)
}
pub fn is_finished(&self) -> bool {
matches!(self.status(), Status::Finished)
}
pub fn is_resetting(&self) -> bool {
matches!(self.status(), Status::Resetting)
}
pub fn is_reset(&self) -> bool {
matches!(self.status(), Status::Reset(_))
}
pub fn is_closing(&self) -> bool {
self.is_finishing() || self.is_resetting()
}
pub fn is_closed(&self) -> bool {
self.is_finished() || self.is_reset()
}
const fn status(&$self) -> Status {
$value
}
};
}
impl Status {
impl_status!(|self| *self);
}
impl rx::Response {
impl_status!(|self| self.status);
}
impl tx::Response {
impl_status!(|self| self.status);
}
macro_rules! conversions {
($name:path) => {
impl $name {
pub fn into_poll(self) -> Poll<Self> {
if self.is_pending() {
Poll::Pending
} else {
Poll::Ready(self)
}
}
}
impl From<$name> for () {
fn from(_: $name) {}
}
impl<T, E> From<$name> for Poll<Result<T, E>>
where
$name: Into<T>,
{
fn from(v: $name) -> Poll<Result<T, E>> {
if v.is_pending() {
Poll::Pending
} else {
Poll::Ready(Ok(v.into()))
}
}
}
};
}
conversions!(Response);
conversions!(tx::Response);
conversions!(rx::Response);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_builder_test() {
let mut request = Request::default();
let mut send_chunks = [bytes::Bytes::from_static(&[1])];
let mut receive_chunks = [
bytes::Bytes::from_static(&[2]),
bytes::Bytes::from_static(&[3]),
];
request
.send(&mut send_chunks)
.finish()
.flush()
.reset(application::Error::new(1).unwrap())
.receive(&mut receive_chunks)
.with_watermark(5, 10)
.stop_sending(application::Error::new(2).unwrap());
assert!(matches!(
request,
Request {
tx: Some(tx::Request {
chunks: Some(tx_chunks),
finish: true,
flush: true,
reset: Some(reset),
detached: false,
}),
rx: Some(rx::Request {
chunks: Some(rx_chunks),
low_watermark: 5,
high_watermark: 10,
stop_sending: Some(stop_sending),
detached: false,
})
} if reset == application::Error::new(1).unwrap()
&& stop_sending == application::Error::new(2).unwrap()
&& tx_chunks.len() == 1
&& rx_chunks.len() == 2
));
}
#[test]
fn response_pending_test() {
for rx_pending in [false, true] {
for tx_pending in [false, true] {
let response = Response {
tx: Some(tx::Response {
will_wake: tx_pending,
..Default::default()
}),
rx: Some(rx::Response {
will_wake: rx_pending,
..Default::default()
}),
};
assert_eq!(response.is_pending(), rx_pending || tx_pending);
if rx_pending || tx_pending {
assert_eq!(response.into_poll(), Poll::Pending);
} else {
assert_eq!(
response.into_poll(),
Poll::Ready(Response {
tx: Some(tx::Response::default()),
rx: Some(rx::Response::default()),
})
);
}
}
}
}
}