kevy_client_async/transport.rs
1//! Async IO traits — runtime-agnostic core.
2//!
3//! Per RFC F5 (locked) the core of `kevy-client-async` depends only on
4//! `core::future` / `core::task` / `std::io`. We do NOT pull `futures-io`,
5//! `tokio::io::AsyncRead`, nor any other crate's IO traits — the
6//! ecosystem's three runtimes each define their own near-identical
7//! `AsyncRead` / `AsyncWrite`, and binding to any one of them would
8//! bleed that runtime's dep through the core.
9//!
10//! Instead this module defines the traits ourselves in the
11//! `futures-io` shape (poll-based, `&mut [u8]` buffers, returns
12//! `Poll<io::Result<usize>>`). The runtime feature modules T4.5/T4.6/
13//! T4.7 each ship a tiny adapter that implements these traits on top
14//! of `<runtime>::net::TcpStream`.
15//!
16//! `AsyncTransport` is the bound the RESP3 codec (T4.4) and connection
17//! type (T4.9) actually require: a single `AsyncRead + AsyncWrite +
18//! Send + Unpin` thing. Blanket-impl'd for any qualifying type so
19//! callers can hand in any compatible transport.
20
21use core::future::Future;
22use core::pin::Pin;
23use core::task::{Context, Poll};
24use std::io;
25
26/// Async equivalent of [`std::io::Read`] — poll-based, owned-buffer.
27///
28/// Implementors return `Poll::Pending` to register a waker and resume
29/// when bytes become readable. `0` bytes returned from `Poll::Ready(Ok(0))`
30/// signals clean EOF, mirroring the blocking semantics.
31pub trait AsyncRead {
32 /// Attempt to read bytes into `buf`. Returns the number of bytes
33 /// written, or `Pending` if the underlying transport has nothing
34 /// available yet.
35 fn poll_read(
36 self: Pin<&mut Self>,
37 cx: &mut Context<'_>,
38 buf: &mut [u8],
39 ) -> Poll<io::Result<usize>>;
40}
41
42/// Async equivalent of [`std::io::Write`] — poll-based, owned-buffer.
43pub trait AsyncWrite {
44 /// Attempt to write bytes from `buf`. Returns the number of bytes
45 /// accepted, or `Pending`.
46 fn poll_write(
47 self: Pin<&mut Self>,
48 cx: &mut Context<'_>,
49 buf: &[u8],
50 ) -> Poll<io::Result<usize>>;
51
52 /// Attempt to flush buffered bytes to the transport.
53 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
54
55 /// Initiate / continue a graceful shutdown of the write half.
56 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
57}
58
59/// Bound used everywhere downstream: codec (T4.4), `AsyncConnection`
60/// (T4.9), pipeline runner (T4.16). Blanket-impl'd so any
61/// `AsyncRead + AsyncWrite + Send + Unpin` value satisfies it.
62pub trait AsyncTransport: AsyncRead + AsyncWrite + Send + Unpin {}
63
64impl<T> AsyncTransport for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
65
66// ─── Small read/write helpers built on the poll traits ────────────────
67//
68// The codec consumes bytes one chunk at a time. Rather than have every
69// codec call site write its own poll-loop, expose a couple of futures
70// that turn `poll_read` / `poll_write` into `.await`-able primitives.
71// Both are zero-allocation — they borrow the transport + the buffer.
72
73/// Future returned by [`read`]: drives a single `poll_read` to
74/// completion.
75pub struct Read<'a, T: ?Sized> {
76 transport: &'a mut T,
77 buf: &'a mut [u8],
78}
79
80/// Future returned by [`write_all`]: drives `poll_write` to completion
81/// for the whole buffer (loops on partial writes internally).
82pub struct WriteAll<'a, T: ?Sized> {
83 transport: &'a mut T,
84 buf: &'a [u8],
85 written: usize,
86}
87
88/// Single-chunk async read. Resolves to the number of bytes read; `0`
89/// = clean EOF.
90pub fn read<'a, T>(transport: &'a mut T, buf: &'a mut [u8]) -> Read<'a, T>
91where
92 T: AsyncRead + Unpin + ?Sized,
93{
94 Read { transport, buf }
95}
96
97/// Async equivalent of `Write::write_all` — succeeds only after every
98/// byte in `buf` is accepted by the transport.
99pub fn write_all<'a, T>(transport: &'a mut T, buf: &'a [u8]) -> WriteAll<'a, T>
100where
101 T: AsyncWrite + Unpin + ?Sized,
102{
103 WriteAll {
104 transport,
105 buf,
106 written: 0,
107 }
108}
109
110impl<T> Future for Read<'_, T>
111where
112 T: AsyncRead + Unpin + ?Sized,
113{
114 type Output = io::Result<usize>;
115 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
116 let me = self.get_mut();
117 Pin::new(&mut *me.transport).poll_read(cx, me.buf)
118 }
119}
120
121impl<T> Future for WriteAll<'_, T>
122where
123 T: AsyncWrite + Unpin + ?Sized,
124{
125 type Output = io::Result<()>;
126 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127 let me = self.get_mut();
128 while me.written < me.buf.len() {
129 let rem = &me.buf[me.written..];
130 match Pin::new(&mut *me.transport).poll_write(cx, rem) {
131 Poll::Ready(Ok(0)) => {
132 return Poll::Ready(Err(io::Error::new(
133 io::ErrorKind::WriteZero,
134 "transport accepted zero bytes",
135 )));
136 }
137 Poll::Ready(Ok(n)) => me.written += n,
138 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
139 Poll::Pending => return Poll::Pending,
140 }
141 }
142 Poll::Ready(Ok(()))
143 }
144}