Skip to main content

kevy_client_async/
transport.rs

1//! Async IO traits — runtime-agnostic core.
2//!
3//! Per RFC F5 (locked) the core of `kevy-client-async` depends only on
4//! `core::future` / `core::task` / `std::io`. We do NOT pull `futures-io`,
5//! `tokio::io::AsyncRead`, nor any other crate's IO traits — the
6//! ecosystem's three runtimes each define their own near-identical
7//! `AsyncRead` / `AsyncWrite`, and binding to any one of them would
8//! bleed that runtime's dep through the core.
9//!
10//! Instead this module defines the traits ourselves in the
11//! `futures-io` shape (poll-based, `&mut [u8]` buffers, returns
12//! `Poll<io::Result<usize>>`). The runtime feature modules T4.5/T4.6/
13//! T4.7 each ship a tiny adapter that implements these traits on top
14//! of `<runtime>::net::TcpStream`.
15//!
16//! `AsyncTransport` is the bound the RESP3 codec (T4.4) and connection
17//! type (T4.9) actually require: a single `AsyncRead + AsyncWrite +
18//! Send + Unpin` thing. Blanket-impl'd for any qualifying type so
19//! callers can hand in any compatible transport.
20
21use core::future::Future;
22use core::pin::Pin;
23use core::task::{Context, Poll};
24use std::io;
25
26/// Async equivalent of [`std::io::Read`] — poll-based, owned-buffer.
27///
28/// Implementors return `Poll::Pending` to register a waker and resume
29/// when bytes become readable. `0` bytes returned from `Poll::Ready(Ok(0))`
30/// signals clean EOF, mirroring the blocking semantics.
31pub trait AsyncRead {
32    /// Attempt to read bytes into `buf`. Returns the number of bytes
33    /// written, or `Pending` if the underlying transport has nothing
34    /// available yet.
35    fn poll_read(
36        self: Pin<&mut Self>,
37        cx: &mut Context<'_>,
38        buf: &mut [u8],
39    ) -> Poll<io::Result<usize>>;
40}
41
42/// Async equivalent of [`std::io::Write`] — poll-based, owned-buffer.
43pub trait AsyncWrite {
44    /// Attempt to write bytes from `buf`. Returns the number of bytes
45    /// accepted, or `Pending`.
46    fn poll_write(
47        self: Pin<&mut Self>,
48        cx: &mut Context<'_>,
49        buf: &[u8],
50    ) -> Poll<io::Result<usize>>;
51
52    /// Attempt to flush buffered bytes to the transport.
53    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
54
55    /// Initiate / continue a graceful shutdown of the write half.
56    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
57}
58
59/// Bound used everywhere downstream: codec (T4.4), `AsyncConnection`
60/// (T4.9), pipeline runner (T4.16). Blanket-impl'd so any
61/// `AsyncRead + AsyncWrite + Send + Unpin` value satisfies it.
62pub trait AsyncTransport: AsyncRead + AsyncWrite + Send + Unpin {}
63
64impl<T> AsyncTransport for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
65
66// ─── Small read/write helpers built on the poll traits ────────────────
67//
68// The codec consumes bytes one chunk at a time. Rather than have every
69// codec call site write its own poll-loop, expose a couple of futures
70// that turn `poll_read` / `poll_write` into `.await`-able primitives.
71// Both are zero-allocation — they borrow the transport + the buffer.
72
73/// Future returned by [`read`]: drives a single `poll_read` to
74/// completion.
75pub struct Read<'a, T: ?Sized> {
76    transport: &'a mut T,
77    buf: &'a mut [u8],
78}
79
80/// Future returned by [`write_all`]: drives `poll_write` to completion
81/// for the whole buffer (loops on partial writes internally).
82pub struct WriteAll<'a, T: ?Sized> {
83    transport: &'a mut T,
84    buf: &'a [u8],
85    written: usize,
86}
87
88/// Single-chunk async read. Resolves to the number of bytes read; `0`
89/// = clean EOF.
90pub fn read<'a, T>(transport: &'a mut T, buf: &'a mut [u8]) -> Read<'a, T>
91where
92    T: AsyncRead + Unpin + ?Sized,
93{
94    Read { transport, buf }
95}
96
97/// Async equivalent of `Write::write_all` — succeeds only after every
98/// byte in `buf` is accepted by the transport.
99pub fn write_all<'a, T>(transport: &'a mut T, buf: &'a [u8]) -> WriteAll<'a, T>
100where
101    T: AsyncWrite + Unpin + ?Sized,
102{
103    WriteAll {
104        transport,
105        buf,
106        written: 0,
107    }
108}
109
110impl<T> Future for Read<'_, T>
111where
112    T: AsyncRead + Unpin + ?Sized,
113{
114    type Output = io::Result<usize>;
115    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
116        let me = self.get_mut();
117        Pin::new(&mut *me.transport).poll_read(cx, me.buf)
118    }
119}
120
121impl<T> Future for WriteAll<'_, T>
122where
123    T: AsyncWrite + Unpin + ?Sized,
124{
125    type Output = io::Result<()>;
126    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127        let me = self.get_mut();
128        while me.written < me.buf.len() {
129            let rem = &me.buf[me.written..];
130            match Pin::new(&mut *me.transport).poll_write(cx, rem) {
131                Poll::Ready(Ok(0)) => {
132                    return Poll::Ready(Err(io::Error::new(
133                        io::ErrorKind::WriteZero,
134                        "transport accepted zero bytes",
135                    )));
136                }
137                Poll::Ready(Ok(n)) => me.written += n,
138                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
139                Poll::Pending => return Poll::Pending,
140            }
141        }
142        Poll::Ready(Ok(()))
143    }
144}