use crate::core::{self, Stream, StreamFlags};
pub use crate::core::StreamErrorCode as ErrorCode;
pub mod buf;
pub trait Recv {
fn recv<R>(&mut self, capacity: usize, receptor: R) -> future::Recv<R>
where
R: Fn(&[u8], i32) -> usize + Unpin;
}
pub trait Write {
fn write<'a>(&'a mut self, data: &'a [u8]) -> future::Write;
fn write_note<'a>(&'a mut self, data: &'a [u8], note: i32) -> future::Write;
fn write_all<'a>(&'a mut self, data: &'a [u8]) -> future::WriteAll;
}
pub trait Close {
fn close(&mut self) -> future::Close;
}
pub mod future {
pub use crate::core::StreamCloseFuture as Close;
pub use crate::core::StreamRecvFuture as Recv;
pub use crate::core::StreamWriteAllFuture as WriteAll;
pub use crate::core::StreamWriteFuture as Write;
}
pub struct RecvWriteStream {
s: Option<Stream>,
}
impl RecvWriteStream {
pub(crate) fn new(s: Option<Stream>) -> Self {
Self { s }
}
pub fn split(mut self) -> (RecvStream, WriteStream) {
let s = self.s.take();
(RecvStream::new(s.clone()), WriteStream::new(s))
}
pub fn split3(mut self) -> (RecvOnlyStream, WriteOnlyStream, CloseStream) {
let s = self.s.take();
(
RecvOnlyStream::new(s.clone()),
WriteOnlyStream::new(s.clone()),
CloseStream::new(s, core::STREAM_SELF_FLOW | core::STREAM_SELF_DATA),
)
}
}
impl Default for RecvWriteStream {
fn default() -> Self {
Self::new(Default::default())
}
}
impl Recv for RecvWriteStream {
fn recv<R>(&mut self, capacity: usize, receptor: R) -> future::Recv<R>
where
R: Fn(&[u8], i32) -> usize + Unpin,
{
future::Recv::new(&mut self.s, capacity, receptor)
}
}
impl Write for RecvWriteStream {
fn write<'a>(&'a mut self, data: &'a [u8]) -> future::Write {
future::Write::new(&mut self.s, data, 0)
}
fn write_note<'a>(&'a mut self, data: &'a [u8], note: i32) -> future::Write {
future::Write::new(&mut self.s, data, note)
}
fn write_all<'a>(&'a mut self, data: &'a [u8]) -> future::WriteAll {
future::WriteAll::new(&mut self.s, data)
}
}
impl Close for RecvWriteStream {
fn close(&mut self) -> future::Close {
future::Close::new(
self.s.take(),
core::STREAM_SELF_FLOW | core::STREAM_SELF_DATA,
core::STREAM_PEER_DATA | core::STREAM_PEER_FLOW,
)
}
}
impl Drop for RecvWriteStream {
fn drop(&mut self) {
core::drop_stream(
self.s.take(),
core::STREAM_SELF_FLOW | core::STREAM_SELF_DATA,
)
}
}
pub struct RecvStream {
s: Option<Stream>,
}
impl RecvStream {
pub(crate) fn new(s: Option<Stream>) -> Self {
Self { s }
}
pub fn split(mut self) -> (RecvOnlyStream, CloseStream) {
let s = self.s.take();
(
RecvOnlyStream::new(s.clone()),
CloseStream::new(s, core::STREAM_SELF_FLOW),
)
}
}
impl Default for RecvStream {
fn default() -> Self {
Self::new(Default::default())
}
}
impl From<RecvWriteStream> for RecvStream {
fn from(stream: RecvWriteStream) -> Self {
let (r, _) = stream.split();
r
}
}
impl Recv for RecvStream {
fn recv<R>(&mut self, capacity: usize, receptor: R) -> future::Recv<R>
where
R: Fn(&[u8], i32) -> usize + Unpin,
{
future::Recv::new(&mut self.s, capacity, receptor)
}
}
impl Close for RecvStream {
fn close(&mut self) -> future::Close {
future::Close::new(
self.s.take(),
core::STREAM_SELF_FLOW,
core::STREAM_PEER_DATA,
)
}
}
impl Drop for RecvStream {
fn drop(&mut self) {
core::drop_stream(self.s.take(), core::STREAM_SELF_FLOW)
}
}
pub struct RecvOnlyStream {
s: Option<Stream>,
}
impl RecvOnlyStream {
fn new(s: Option<Stream>) -> Self {
Self { s }
}
}
impl Default for RecvOnlyStream {
fn default() -> Self {
Self::new(Default::default())
}
}
impl Recv for RecvOnlyStream {
fn recv<R>(&mut self, capacity: usize, receptor: R) -> future::Recv<R>
where
R: Fn(&[u8], i32) -> usize + Unpin,
{
future::Recv::new(&mut self.s, capacity, receptor)
}
}
impl Drop for RecvOnlyStream {
fn drop(&mut self) {
core::drop_stream(self.s.take(), core::STREAM_SELF_FLOW)
}
}
pub struct WriteStream {
s: Option<Stream>,
}
impl WriteStream {
pub(crate) fn new(s: Option<Stream>) -> Self {
Self { s }
}
pub fn split(mut self) -> (WriteOnlyStream, CloseStream) {
let s = self.s.take();
(
WriteOnlyStream::new(s.clone()),
CloseStream::new(s, core::STREAM_SELF_DATA),
)
}
}
impl Default for WriteStream {
fn default() -> Self {
Self::new(Default::default())
}
}
impl From<RecvWriteStream> for WriteStream {
fn from(stream: RecvWriteStream) -> Self {
let (_, w) = stream.split();
w
}
}
impl Write for WriteStream {
fn write<'a>(&'a mut self, data: &'a [u8]) -> future::Write {
future::Write::new(&mut self.s, data, 0)
}
fn write_note<'a>(&'a mut self, data: &'a [u8], note: i32) -> future::Write {
future::Write::new(&mut self.s, data, note)
}
fn write_all<'a>(&'a mut self, data: &'a [u8]) -> future::WriteAll {
future::WriteAll::new(&mut self.s, data)
}
}
impl Close for WriteStream {
fn close(&mut self) -> future::Close {
future::Close::new(
self.s.take(),
core::STREAM_SELF_DATA,
core::STREAM_PEER_FLOW,
)
}
}
impl Drop for WriteStream {
fn drop(&mut self) {
core::drop_stream(self.s.take(), core::STREAM_SELF_DATA)
}
}
pub struct WriteOnlyStream {
s: Option<Stream>,
}
impl WriteOnlyStream {
fn new(s: Option<Stream>) -> Self {
Self { s }
}
}
impl Default for WriteOnlyStream {
fn default() -> Self {
Self::new(Default::default())
}
}
impl Write for WriteOnlyStream {
fn write<'a>(&'a mut self, data: &'a [u8]) -> future::Write {
future::Write::new(&mut self.s, data, 0)
}
fn write_note<'a>(&'a mut self, data: &'a [u8], note: i32) -> future::Write {
future::Write::new(&mut self.s, data, note)
}
fn write_all<'a>(&'a mut self, data: &'a [u8]) -> future::WriteAll {
future::WriteAll::new(&mut self.s, data)
}
}
impl Drop for WriteOnlyStream {
fn drop(&mut self) {
core::drop_stream(self.s.take(), core::STREAM_SELF_DATA)
}
}
pub struct CloseStream {
s: Option<Stream>,
how: StreamFlags,
}
impl CloseStream {
fn new(s: Option<Stream>, how: StreamFlags) -> Self {
Self { s, how }
}
}
impl Default for CloseStream {
fn default() -> Self {
Self::new(Default::default(), 0)
}
}
impl Close for CloseStream {
fn close(&mut self) -> future::Close {
let wait = self.how << 2; future::Close::new(self.s.take(), self.how, wait)
}
}
impl Drop for CloseStream {
fn drop(&mut self) {
core::drop_stream(self.s.take(), self.how)
}
}