use super::TransportError;
use crate::hub::event::IOSource;
use bytes::{BufMut, Bytes, BytesMut};
use std::future::Future;
use std::io;
use std::net::Shutdown;
use std::task::{ready, Context, Poll};
use tokio::io::ReadBuf;
pub trait NonBlockingStream: Send {
fn try_recv(&mut self) -> Result<Bytes, TransportError>;
fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError>;
fn source(&mut self) -> IOSource;
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;
}
pub trait NonBlockingStreamRead: Send {
fn try_recv(&mut self) -> Result<Bytes, TransportError>;
fn source(&mut self) -> Box<dyn GenericSource>;
}
pub trait NonBlockingStreamWrite: Send {
fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError>;
fn source(&mut self) -> Box<dyn GenericSource>;
}
pub trait AsyncStreamRead: Send + Unpin {
fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes, TransportError>>;
}
pub trait AsyncStreamWrite: Send + Unpin {
fn poll_send(self: Pin<&mut Self>, cx: &mut Context<'_>, data: Bytes) -> Poll<Result<(), TransportError>>;
}
pub struct AsyncStreamReader<'a, T: ?Sized + AsyncStreamRead>(&'a mut T);
pub struct AsyncStreamWriter<'a, T: ?Sized + AsyncStreamWrite>(&'a mut T, Bytes);
impl<'a, T: ?Sized + AsyncStreamRead + Unpin> Future for AsyncStreamReader<'a, T> {
type Output = Result<Bytes, TransportError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut (*Pin::into_inner(self).0)).poll_recv(cx)
}
}
impl<'a, T: ?Sized + AsyncStreamWrite + Unpin> Future for AsyncStreamWriter<'a, T> {
type Output = Result<(), TransportError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data = self.1.clone();
Pin::new(&mut (*Pin::into_inner(self).0)).poll_send(cx, data)
}
}
pub trait AsyncStreamReadExt {
fn recv(&mut self) -> AsyncStreamReader<Self>
where
Self: AsyncStreamRead,
{
AsyncStreamReader(self)
}
}
pub trait AsyncStreamWriteExt {
fn send(&mut self, data: Bytes) -> AsyncStreamWriter<Self>
where
Self: AsyncStreamWrite,
{
AsyncStreamWriter(self, data)
}
}
impl<R: ?Sized + AsyncStreamRead> AsyncStreamReadExt for R {}
impl<W: ?Sized + AsyncStreamWrite> AsyncStreamWriteExt for W {}
use std::cell::UnsafeCell;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
struct StreamInner<T> {
locked: AtomicBool,
stream: UnsafeCell<T>,
}
pub struct StreamReadHalf<T>(Arc<StreamInner<T>>);
pub struct StreamWriteHalf<T>(Arc<StreamInner<T>>);
struct StreamPoll<'a, T>(&'a StreamInner<T>);
struct StreamGuard<'a, T>(&'a StreamInner<T>);
impl<'a, T: AsyncStreamRead + AsyncStreamWrite> StreamPoll<'a, T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<StreamGuard<T>> {
if self
.0
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
.is_ok()
{
Poll::Ready(StreamGuard(&*self.0))
} else {
std::thread::yield_now();
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
impl<T> StreamGuard<'_, T> {
fn stream_pin(&mut self) -> Pin<&mut T> {
unsafe { Pin::new_unchecked(&mut *self.0.stream.get()) }
}
}
impl<T> Drop for StreamGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
}
}
unsafe impl<T: Sync> Sync for StreamInner<T> {}
impl<T: AsyncStreamRead + AsyncStreamWrite + Sync> AsyncStreamRead for StreamReadHalf<T> {
fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes, TransportError>> {
let lock = StreamPoll(self.0.as_ref());
let mut s = ready!(lock.poll_lock(cx));
s.stream_pin().poll_recv(cx)
}
}
impl<T: AsyncStreamRead + AsyncStreamWrite + Sync> AsyncStreamWrite for StreamWriteHalf<T> {
fn poll_send(self: Pin<&mut Self>, cx: &mut Context<'_>, data: Bytes) -> Poll<Result<(), TransportError>> {
let lock = StreamPoll(self.0.as_ref());
let mut s = ready!(lock.poll_lock(cx));
s.stream_pin().poll_send(cx, data)
}
}
pub fn split<T: AsyncStreamRead + AsyncStreamWrite>(stream: T) -> (StreamReadHalf<T>, StreamWriteHalf<T>) {
let inner = Arc::new(StreamInner {
locked: AtomicBool::new(false),
stream: UnsafeCell::new(stream),
});
(StreamReadHalf(inner.clone()), StreamWriteHalf(inner))
}
pub trait Tokenizer: Default {
fn add_bytes(&mut self, bytes: &[u8]);
fn try_parse(&mut self) -> Option<Bytes>;
fn new_token(payload: &[u8]) -> Bytes;
}
pub struct SimpleTokenizer {
buffered: BytesMut,
len: Option<usize>,
}
impl SimpleTokenizer {
const HEADER_LEN: usize = std::mem::size_of::<u32>();
fn check_token(&mut self, len: usize) -> Option<Bytes> {
if self.buffered.len() >= len {
self.len = None;
return Some(self.buffered.split_to(len).freeze())
}
None
}
}
impl Default for SimpleTokenizer {
fn default() -> Self {
Self {
buffered: BytesMut::new(),
len: None,
}
}
}
impl Tokenizer for SimpleTokenizer {
fn add_bytes(&mut self, bytes: &[u8]) {
self.buffered.extend_from_slice(bytes)
}
fn try_parse(&mut self) -> Option<Bytes> {
match self.len {
Some(len) => self.check_token(len),
None => {
if self.buffered.len() >= Self::HEADER_LEN {
let header = self.buffered.split_to(Self::HEADER_LEN).freeze();
let len = u32::from_le_bytes(header[..].try_into().unwrap()) as usize;
self.len = Some(len);
self.check_token(len)
} else {
None
}
}
}
}
fn new_token(payload: &[u8]) -> Bytes {
let mut token = bytes::BytesMut::with_capacity(Self::HEADER_LEN + payload.len());
token.put_u32_le(payload.len() as u32);
token.extend_from_slice(payload);
token.freeze()
}
}
struct MessageStreamInner<U: Tokenizer> {
tokenizer: U,
leftover: BytesMut,
}
impl<U: Tokenizer> MessageStreamInner<U> {
fn new() -> Self {
MessageStreamInner {
tokenizer: U::default(),
leftover: BytesMut::new(),
}
}
fn try_parse(&mut self) -> Option<Bytes> {
self.tokenizer.try_parse()
}
fn check_recv(&mut self, res: io::Result<usize>, bucket: &mut [u8]) -> Result<(), TransportError> {
match res {
Ok(nrecv) => {
if nrecv == 0 {
Err(TransportError::HalfTerminated)
} else {
self.tokenizer.add_bytes(&bucket[..nrecv]);
Ok(())
}
}
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => Err(TransportError::NotReady),
_ => Err(TransportError::BothTerminated),
},
}
}
fn buffer_send(&mut self, data: Bytes) {
self.leftover.put(U::new_token(&data))
}
fn prepare_send(&mut self) -> Bytes {
std::mem::replace(&mut self.leftover, BytesMut::new()).freeze()
}
fn check_send(&mut self, res: io::Result<usize>, data: Bytes) -> Result<bool, TransportError> {
match res {
Ok(nsent) => Ok(if nsent < data.len() {
self.leftover.put(&data[nsent..]);
true
} else {
false
}),
Err(e) => match e.kind() {
io::ErrorKind::BrokenPipe => Err(TransportError::HalfTerminated),
io::ErrorKind::WouldBlock => {
self.leftover.put(data);
return Ok(true)
}
_ => Err(TransportError::BothTerminated),
},
}
}
}
pub struct MessageStream<T: std::io::Read + std::io::Write, U: Tokenizer> {
inner: MessageStreamInner<U>,
bucket: Vec<u8>,
stream: T,
}
impl<T: std::io::Read + std::io::Write, U: Tokenizer> MessageStream<T, U> {
pub fn new(stream: T, bucket_size: usize) -> Self {
let mut bucket = Vec::new();
bucket.resize(bucket_size, 0);
Self {
inner: MessageStreamInner::new(),
bucket,
stream,
}
}
pub fn as_inner_mut(&mut self) -> &mut T {
&mut self.stream
}
pub fn as_inner(&self) -> &T {
&self.stream
}
pub fn try_recv(&mut self) -> Result<Bytes, TransportError> {
if let Some(token) = self.inner.try_parse() {
return Ok(token)
}
loop {
self.inner
.check_recv(self.stream.read(&mut self.bucket), &mut self.bucket)?;
if let Some(token) = self.inner.try_parse() {
return Ok(token)
}
}
}
pub fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
match data {
Some(data) => {
self.inner.buffer_send(data);
let data = self.inner.prepare_send();
self.inner.check_send(self.stream.write(&data), data)
}
None => {
let data = self.inner.prepare_send();
self.inner.check_send(self.stream.write(&data), data)
}
}
}
pub fn from_async<S: AsyncRead + AsyncWrite>(s: AsyncMessageStream<S, U>, converter: impl Fn(S) -> T) -> Self {
Self {
inner: s.inner,
bucket: s.bucket,
stream: converter(s.stream),
}
}
}
pub struct AsyncMessageStream<T: AsyncRead + AsyncWrite, U: Tokenizer> {
inner: MessageStreamInner<U>,
bucket: Vec<u8>,
stream: T,
}
impl<T: AsyncRead + AsyncWrite, U: Tokenizer> AsyncMessageStream<T, U> {
pub fn new(stream: T, bucket_size: usize) -> Self {
let mut bucket = Vec::new();
bucket.resize(bucket_size, 0);
Self {
inner: MessageStreamInner::new(),
bucket,
stream,
}
}
pub fn as_inner(&mut self) -> &mut T {
&mut self.stream
}
pub fn into_inner(self) -> T {
self.stream
}
}
use tokio::io::{AsyncRead, AsyncWrite};
impl<T: AsyncRead + AsyncWrite + Unpin + Send, U: Tokenizer + Unpin + Send> AsyncStreamRead
for AsyncMessageStream<T, U>
{
fn poll_recv(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes, TransportError>> {
if let Some(token) = self.inner.try_parse() {
return Poll::Ready(Ok(token))
}
let s = Pin::into_inner(self);
loop {
let mut buff = ReadBuf::new(&mut s.bucket);
s.inner.check_recv(
ready!(Pin::new(&mut s.stream).poll_read(cx, &mut buff)).map(|_| buff.filled().len()),
&mut s.bucket,
)?;
if let Some(token) = s.inner.try_parse() {
return Poll::Ready(Ok(token))
}
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + Send, U: Tokenizer + Unpin + Send> AsyncStreamWrite
for AsyncMessageStream<T, U>
{
fn poll_send(mut self: Pin<&mut Self>, cx: &mut Context<'_>, data: Bytes) -> Poll<Result<(), TransportError>> {
self.inner.buffer_send(data);
let s = Pin::into_inner(self);
while {
let data = s.inner.prepare_send();
s.inner
.check_send(ready!(Pin::new(&mut s.stream).poll_write(cx, &data)), data)?
} {}
Poll::Ready(Ok(()))
}
}
use crate::hub::event::{GenericSource, IOInterest, IONotifier};
use std::sync::{atomic, Arc};
use tokio::sync::{mpsc, oneshot};
pub enum AdaptedRead {
Async(Box<dyn AsyncStreamRead>),
NonBlocking(Box<dyn NonBlockingStreamRead>),
}
pub enum AdaptedWrite {
Async(Box<dyn AsyncStreamWrite>),
NonBlocking(Box<dyn NonBlockingStreamWrite>),
}
enum AdaptedReadHandle {
Async(
mpsc::Receiver<Bytes>,
mpsc::UnboundedSender<IONotifier>,
Option<oneshot::Sender<()>>,
Arc<atomic::AtomicBool>,
tokio::task::JoinHandle<()>,
),
NonBlocking(Option<Box<dyn NonBlockingStreamRead>>),
}
enum AdaptedWriteHandle {
Async(
mpsc::Sender<Bytes>,
mpsc::UnboundedSender<IONotifier>,
Option<oneshot::Sender<()>>,
Arc<atomic::AtomicBool>,
tokio::task::JoinHandle<()>,
),
NonBlocking(Option<Box<dyn NonBlockingStreamWrite>>),
}
impl Drop for AdaptedReadHandle {
fn drop(&mut self) {
if let Self::Async(_, _, _, _, handle) = self {
handle.abort();
}
}
}
impl Drop for AdaptedWriteHandle {
fn drop(&mut self) {
if let Self::Async(_, _, _, _, handle) = self {
handle.abort();
}
}
}
pub struct NonBlockingStreamAdapter {
broken: Arc<atomic::AtomicBool>,
source: NonBlockingStreamAdapterSource,
}
impl NonBlockingStreamAdapter {
fn create_async_read_handle(
r: Box<dyn AsyncStreamRead>, buffer: usize, broken: Arc<atomic::AtomicBool>,
) -> AdaptedReadHandle {
let notready = Arc::new(atomic::AtomicBool::new(false));
let notready_clone = notready.clone();
let (read_tx, read) = mpsc::channel(buffer);
let (notifier_tx, mut notifier_rx): (mpsc::UnboundedSender<IONotifier>, _) = mpsc::unbounded_channel();
let (closer_tx, mut closer_rx) = oneshot::channel();
let handle = tokio::spawn(async move {
let mut reader = Some((read_tx, r));
loop {
let notifier = match notifier_rx.recv().await {
Some(n) => n,
None => return,
};
if let Some((read_ref, r)) = &mut reader {
loop {
let closer = &mut closer_rx;
tokio::select! {
res = r.recv() => {
match res {
Ok(bytes) => match read_ref.send(bytes).await {
Err(mpsc::error::SendError(_)) => return,
_ => {
if notready_clone.swap(false, atomic::Ordering::AcqRel) {
notifier.notify(IOInterest::READABLE).await;
}
}
},
Err(e) => {
drop(reader.take());
if let TransportError::BothTerminated = e {
broken.fetch_or(true, atomic::Ordering::Release);
}
break
}
}
},
_ = closer => {
drop(reader.take());
break
}
}
}
}
}
});
AdaptedReadHandle::Async(read, notifier_tx, Some(closer_tx), notready, handle)
}
fn create_async_write_handle(
w: Box<dyn AsyncStreamWrite>, buffer: usize, broken: Arc<atomic::AtomicBool>,
) -> AdaptedWriteHandle {
let notready = Arc::new(atomic::AtomicBool::new(false));
let notready_clone = notready.clone();
let (write, write_rx): (_, mpsc::Receiver<Bytes>) = mpsc::channel(buffer);
let (notifier_tx, mut notifier_rx): (mpsc::UnboundedSender<IONotifier>, _) = mpsc::unbounded_channel();
let (closer_tx, mut closer_rx) = oneshot::channel();
let handle = tokio::spawn(async move {
let mut writer = Some((write_rx, w));
loop {
let notifier = match notifier_rx.recv().await {
Some(n) => n,
None => return,
};
if let Some((write_ref, w)) = &mut writer {
loop {
let closer = &mut closer_rx;
tokio::select! {
res = write_ref.recv() => match res {
Some(bytes) => {
match w.send(bytes.clone()).await {
Err(e) => {
drop(writer.take());
if let TransportError::BothTerminated = e {
broken.fetch_or(true, atomic::Ordering::Release);
}
break
}
_ => {
if notready_clone.swap(false, atomic::Ordering::AcqRel) {
notifier.notify(IOInterest::WRITABLE).await;
}
}
}
},
None => return,
},
_ = closer => {
drop(writer.take());
break
}
}
}
}
}
});
AdaptedWriteHandle::Async(write, notifier_tx, Some(closer_tx), notready, handle)
}
pub fn new(r: AdaptedRead, w: AdaptedWrite, buffer: usize) -> Self {
let broken = Arc::new(atomic::AtomicBool::new(false));
let read = match r {
AdaptedRead::Async(r) => Self::create_async_read_handle(r, buffer, broken.clone()),
AdaptedRead::NonBlocking(r) => AdaptedReadHandle::NonBlocking(Some(r)),
};
let write = match w {
AdaptedWrite::Async(r) => Self::create_async_write_handle(r, buffer, broken.clone()),
AdaptedWrite::NonBlocking(r) => AdaptedWriteHandle::NonBlocking(Some(r)),
};
Self {
broken,
source: NonBlockingStreamAdapterSource { read, write },
}
}
}
struct NonBlockingStreamAdapterSource {
read: AdaptedReadHandle,
write: AdaptedWriteHandle,
}
impl GenericSource for NonBlockingStreamAdapterSource {
fn register(&mut self, notifier: IONotifier) -> Result<(), io::Error> {
match &mut self.read {
AdaptedReadHandle::Async(_, tx, _, _, _) => {
tx.send(notifier.clone()).ok();
}
AdaptedReadHandle::NonBlocking(reader) => {
if let Some(r) = reader {
r.source().register(notifier.clone())?;
}
}
}
match &mut self.write {
AdaptedWriteHandle::Async(_, tx, _, _, _) => {
tx.send(notifier.clone()).ok();
}
AdaptedWriteHandle::NonBlocking(writer) => {
if let Some(w) = writer {
w.source().register(notifier.clone())?;
}
}
}
Ok(())
}
fn deregister(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
impl NonBlockingStream for NonBlockingStreamAdapter {
fn try_recv(&mut self) -> Result<Bytes, TransportError> {
match &mut self.source.read {
AdaptedReadHandle::Async(reader, _, _, notready, _) => {
use mpsc::error::TryRecvError::*;
notready.swap(true, atomic::Ordering::AcqRel);
match reader.try_recv() {
Ok(bytes) => Ok(bytes),
Err(Empty) => Err(TransportError::NotReady),
Err(Disconnected) => Err(match self.broken.fetch_or(false, atomic::Ordering::Acquire) {
true => TransportError::BothTerminated,
false => TransportError::HalfTerminated,
}),
}
}
AdaptedReadHandle::NonBlocking(reader) => match reader {
Some(r) => r.try_recv(),
None => Err(TransportError::HalfTerminated),
},
}
}
fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
match &mut self.source.write {
AdaptedWriteHandle::Async(writer, _, _, notready, _) => {
let data = match data {
Some(d) => d,
None => unreachable!(),
};
use mpsc::error::TrySendError::*;
notready.swap(true, atomic::Ordering::AcqRel);
match writer.try_send(data) {
Ok(()) => Ok(false),
Err(Full(_)) => Err(TransportError::NotReady),
Err(Closed(_)) => Err(match self.broken.fetch_or(false, atomic::Ordering::Acquire) {
true => TransportError::BothTerminated,
false => TransportError::HalfTerminated,
}),
}
}
AdaptedWriteHandle::NonBlocking(writer) => match writer {
Some(w) => w.try_send(data),
None => Err(TransportError::HalfTerminated),
},
}
}
fn source(&mut self) -> IOSource {
IOSource::Generic(&mut self.source)
}
fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
let mut read = false;
let mut write = false;
match how {
Shutdown::Read => read = true,
Shutdown::Write => write = true,
Shutdown::Both => {
read = true;
write = true;
}
}
if read {
match &mut self.source.read {
AdaptedReadHandle::Async(_, _, tx, _, _) => {
if let Some(tx) = tx.take() {
tx.send(()).ok();
}
}
AdaptedReadHandle::NonBlocking(r) => drop(r.take()),
}
}
if write {
match &mut self.source.write {
AdaptedWriteHandle::Async(_, _, tx, _, _) => {
if let Some(tx) = tx.take() {
tx.send(()).ok();
}
}
AdaptedWriteHandle::NonBlocking(w) => drop(w.take()),
}
}
Ok(())
}
}
pub struct ChannelStream {
tx: mpsc::Sender<Bytes>,
rx: mpsc::Receiver<Bytes>,
rx_leftover: Bytes,
}
impl io::Read for ChannelStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.rx_leftover.is_empty() {
match self.rx.try_recv() {
Ok(chunk) => self.rx_leftover = chunk,
Err(e) => {
return Err(match e {
mpsc::error::TryRecvError::Disconnected => {
io::Error::new(io::ErrorKind::BrokenPipe, "channel disconnected")
}
mpsc::error::TryRecvError::Empty => {
io::Error::new(io::ErrorKind::WouldBlock, "channel not ready")
}
})
}
}
}
let len = if buf.len() > self.rx_leftover.len() {
self.rx_leftover.len()
} else {
buf.len()
};
buf[..len].copy_from_slice(&self.rx_leftover.split_to(len));
return Ok(len)
}
}
impl io::Write for ChannelStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
return match self.tx.try_send(Bytes::copy_from_slice(buf)) {
Ok(_) => Ok(buf.len()),
Err(e) => Err(match e {
mpsc::error::TrySendError::Closed(_) => io::Error::new(io::ErrorKind::BrokenPipe, "channel closed"),
mpsc::error::TrySendError::Full(_) => io::Error::new(io::ErrorKind::WouldBlock, "channel not ready"),
}),
}
}
fn flush(&mut self) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Unsupported, "not supported by mpsc"))
}
}
pub fn new_channel_stream(buffer: usize) -> (ChannelStream, ChannelStream) {
let (tx1, rx1) = mpsc::channel(buffer);
let (tx2, rx2) = mpsc::channel(buffer);
(
ChannelStream {
tx: tx2,
rx: rx1,
rx_leftover: Bytes::new(),
},
ChannelStream {
tx: tx1,
rx: rx2,
rx_leftover: Bytes::new(),
},
)
}
pub struct AsyncChannelStream {
tx: tokio_util::sync::PollSender<Bytes>,
rx: mpsc::Receiver<Bytes>,
rx_leftover: BytesMut,
}
impl AsyncRead for AsyncChannelStream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
if self.rx_leftover.is_empty() {
match ready!(self.rx.poll_recv(cx)) {
Some(chunk) => self.rx_leftover.put(chunk),
None => (),
}
if self.rx_leftover.is_empty() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::BrokenPipe, "channel disconnected")))
}
}
let unfilled = buf.capacity() - buf.filled().len();
let len = if unfilled > self.rx_leftover.len() {
self.rx_leftover.len()
} else {
unfilled
};
buf.put_slice(&self.rx_leftover.split_to(len));
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for AsyncChannelStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let err = |_| io::Error::new(io::ErrorKind::BrokenPipe, "channel closed");
ready!(self.tx.poll_reserve(cx)).map_err(err)?;
Poll::Ready(
self.tx
.send_item(Bytes::copy_from_slice(buf))
.map(|_| buf.len())
.map_err(err),
)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
pub fn new_async_channel_stream(buffer: usize) -> (AsyncChannelStream, AsyncChannelStream) {
let (tx1, rx1) = mpsc::channel(buffer);
let (tx2, rx2) = mpsc::channel(buffer);
(
AsyncChannelStream {
tx: tokio_util::sync::PollSender::new(tx2),
rx: rx1,
rx_leftover: BytesMut::new(),
},
AsyncChannelStream {
tx: tokio_util::sync::PollSender::new(tx1),
rx: rx2,
rx_leftover: BytesMut::new(),
},
)
}