#![cfg(feature = "std")]
use crate::libp2p::read_write;
use core::{
fmt, future, mem, ops,
pin::{self, Pin},
task::Poll,
};
use futures_util::{AsyncRead, AsyncWrite};
use std::io;
#[pin_project::pin_project]
pub struct WithBuffers<TSocketFut, TSocket, TNow> {
#[pin]
socket: Socket<TSocketFut, TSocket>,
error: Option<io::Error>,
read_buffer: Vec<u8>,
read_buffer_valid: usize,
read_buffer_reasonable_capacity: usize,
read_closed: bool,
write_buffers: Vec<Vec<u8>>,
write_closed: bool,
close_pending: bool,
flush_pending: bool,
read_write_now: Option<TNow>,
read_write_wake_up_after: Option<TNow>,
}
#[pin_project::pin_project(project = SocketProj)]
enum Socket<TSocketFut, TSocket> {
Pending(#[pin] TSocketFut),
Resolved(#[pin] TSocket),
}
impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow>
where
TNow: Clone + Ord,
{
pub fn new(socket: TSocketFut) -> Self {
let read_buffer_reasonable_capacity = 65536;
WithBuffers {
socket: Socket::Pending(socket),
error: None,
read_buffer: Vec::with_capacity(read_buffer_reasonable_capacity),
read_buffer_valid: 0,
read_buffer_reasonable_capacity,
read_closed: false,
write_buffers: Vec::with_capacity(64),
write_closed: false,
close_pending: false,
flush_pending: false,
read_write_now: None,
read_write_wake_up_after: None,
}
}
pub fn read_write_access(
self: Pin<&'_ mut Self>,
now: TNow,
) -> Result<ReadWriteAccess<'_, TNow>, &'_ io::Error> {
let this = self.project();
debug_assert!(
this.read_write_now
.as_ref()
.map_or(true, |old_now| *old_now <= now)
);
*this.read_write_wake_up_after = None;
*this.read_write_now = Some(now.clone());
if let Some(error) = this.error.as_ref() {
return Err(error);
}
this.read_buffer.truncate(*this.read_buffer_valid);
let is_resolved = matches!(*this.socket, Socket::Resolved(_));
let write_bytes_queued = this.write_buffers.iter().map(Vec::len).sum();
Ok(ReadWriteAccess {
read_buffer_len_before: this.read_buffer.len(),
write_buffers_len_before: this.write_buffers.len(),
read_write: read_write::ReadWrite {
now,
incoming_buffer: mem::take(this.read_buffer),
expected_incoming_bytes: if !*this.read_closed { Some(0) } else { None },
read_bytes: 0,
write_bytes_queued,
write_buffers: mem::take(this.write_buffers),
write_bytes_queueable: if !is_resolved {
Some(0)
} else if !*this.write_closed {
Some((128 * 1024usize).saturating_sub(write_bytes_queued))
} else {
None
},
wake_up_after: this.read_write_wake_up_after.take(),
},
read_buffer: this.read_buffer,
read_buffer_valid: this.read_buffer_valid,
read_buffer_reasonable_capacity: *this.read_buffer_reasonable_capacity,
write_buffers: this.write_buffers,
write_closed: this.write_closed,
close_pending: this.close_pending,
read_write_wake_up_after: this.read_write_wake_up_after,
})
}
}
impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow>
where
TSocket: AsyncRead + AsyncWrite,
TSocketFut: Future<Output = Result<TSocket, io::Error>>,
TNow: Clone + Ord,
{
pub async fn wait_read_write_again<F>(
self: Pin<&mut Self>,
timer_builder: impl FnOnce(TNow) -> F,
) where
F: Future<Output = ()>,
{
let mut this = self.project();
match (&*this.read_write_wake_up_after, &*this.read_write_now) {
(_, None) => return,
(Some(when_wake_up), Some(now)) if *when_wake_up <= *now => {
return;
}
_ => {}
}
let mut timer = pin::pin!({
let fut = this
.read_write_wake_up_after
.as_ref()
.map(|when| timer_builder(when.clone()));
async {
if let Some(fut) = fut {
fut.await;
} else {
future::pending::<()>().await;
}
}
});
this.read_buffer.resize(this.read_buffer.capacity(), 0);
future::poll_fn(move |cx| {
if this.error.is_some() {
return Poll::Pending;
}
let mut pending = true;
match Future::poll(Pin::new(&mut timer), cx) {
Poll::Pending => {}
Poll::Ready(()) => {
pending = false;
}
}
match this.socket.as_mut().project() {
SocketProj::Pending(future) => match Future::poll(future, cx) {
Poll::Pending => {}
Poll::Ready(Ok(socket)) => {
this.socket.set(Socket::Resolved(socket));
pending = false;
}
Poll::Ready(Err(err)) => {
*this.error = Some(err);
return Poll::Ready(());
}
},
SocketProj::Resolved(mut socket) => {
if !*this.read_closed && *this.read_buffer_valid < this.read_buffer.len() {
let read_result = AsyncRead::poll_read(
socket.as_mut(),
cx,
&mut this.read_buffer[*this.read_buffer_valid..],
);
match read_result {
Poll::Pending => {}
Poll::Ready(Ok(0)) => {
*this.read_closed = true;
pending = false;
}
Poll::Ready(Ok(n)) => {
*this.read_buffer_valid += n;
pending = false;
}
Poll::Ready(Err(err)) => {
*this.error = Some(err);
return Poll::Ready(());
}
};
}
loop {
if this.write_buffers.iter().any(|b| !b.is_empty()) {
let write_result = {
let buffers = this
.write_buffers
.iter()
.map(|buf| io::IoSlice::new(buf))
.collect::<Vec<_>>();
AsyncWrite::poll_write_vectored(socket.as_mut(), cx, &buffers)
};
match write_result {
Poll::Ready(Ok(0)) => {
unreachable!();
}
Poll::Ready(Ok(mut n)) => {
*this.flush_pending = true;
while n > 0 {
let first_buf = this.write_buffers.first_mut().unwrap();
if first_buf.len() <= n {
n -= first_buf.len();
this.write_buffers.remove(0);
} else {
first_buf.copy_within(n.., 0);
first_buf.truncate(first_buf.len() - n);
break;
}
}
if this.write_buffers.is_empty() {
pending = false;
}
}
Poll::Ready(Err(err)) => {
*this.error = Some(err);
return Poll::Ready(());
}
Poll::Pending => break,
};
} else if *this.flush_pending {
match AsyncWrite::poll_flush(socket.as_mut(), cx) {
Poll::Ready(Ok(())) => {
*this.flush_pending = false;
}
Poll::Ready(Err(err)) => {
*this.error = Some(err);
return Poll::Ready(());
}
Poll::Pending => break,
}
} else if *this.close_pending {
match AsyncWrite::poll_close(socket.as_mut(), cx) {
Poll::Ready(Ok(())) => {
*this.close_pending = false;
pending = false;
break;
}
Poll::Ready(Err(err)) => {
*this.error = Some(err);
return Poll::Ready(());
}
Poll::Pending => break,
}
} else {
break;
}
}
}
};
if !pending {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;
}
}
impl<TSocketFut, TSocket: fmt::Debug, TNow> fmt::Debug for WithBuffers<TSocketFut, TSocket, TNow> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut t = f.debug_tuple("WithBuffers");
if let Socket::Resolved(socket) = &self.socket {
t.field(socket);
} else {
t.field(&"<pending>");
}
t.finish()
}
}
pub struct ReadWriteAccess<'a, TNow: Clone> {
read_write: read_write::ReadWrite<TNow>,
read_buffer_len_before: usize,
write_buffers_len_before: usize,
read_buffer: &'a mut Vec<u8>,
read_buffer_valid: &'a mut usize,
read_buffer_reasonable_capacity: usize,
write_buffers: &'a mut Vec<Vec<u8>>,
write_closed: &'a mut bool,
close_pending: &'a mut bool,
read_write_wake_up_after: &'a mut Option<TNow>,
}
impl<'a, TNow: Clone> ops::Deref for ReadWriteAccess<'a, TNow> {
type Target = read_write::ReadWrite<TNow>;
fn deref(&self) -> &Self::Target {
&self.read_write
}
}
impl<'a, TNow: Clone> ops::DerefMut for ReadWriteAccess<'a, TNow> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.read_write
}
}
impl<'a, TNow: Clone> Drop for ReadWriteAccess<'a, TNow> {
fn drop(&mut self) {
*self.read_buffer = mem::take(&mut self.read_write.incoming_buffer);
*self.read_buffer_valid = self.read_buffer.len();
if let Some(expected_incoming_bytes) = self.read_write.expected_incoming_bytes {
if expected_incoming_bytes < self.read_buffer_reasonable_capacity
&& self.read_buffer.is_empty()
{
self.read_buffer.shrink_to(0);
self.read_buffer
.reserve(self.read_buffer_reasonable_capacity);
} else if expected_incoming_bytes > self.read_buffer.len() {
self.read_buffer
.reserve(expected_incoming_bytes - self.read_buffer.len());
}
debug_assert!(self.read_buffer.capacity() >= expected_incoming_bytes);
}
*self.write_buffers = mem::take(&mut self.read_write.write_buffers);
if self.read_write.write_bytes_queueable.is_none() && !*self.write_closed {
*self.write_closed = true;
*self.close_pending = true;
}
*self.read_write_wake_up_after = self.read_write.wake_up_after.take();
if (self.read_buffer_len_before != self.read_buffer.len()
&& self
.read_write
.expected_incoming_bytes
.map_or(false, |b| b <= self.read_buffer.len()))
|| (self.write_buffers_len_before != self.write_buffers.len() && !*self.write_closed)
{
*self.read_write_wake_up_after = Some(self.read_write.now.clone());
}
}
}