Skip to main content

kevy_client_async/
conn.rs

1//! Async equivalent of [`kevy_client::Connection`] — TCP-only.
2//!
3//! Drop-in mirror: the migration path from blocking is grep-replace
4//! `Connection` → `AsyncConnection` plus `.await` on each call.
5//!
6//! The active transport type is picked at compile-time from whichever
7//! runtime feature is enabled (T4.8 ensures exactly one). The codec
8//! is generic over `AsyncTransport` so this just type-alises the
9//! runtime-specific TcpStream.
10
11use std::io;
12
13use kevy_resp::Reply;
14
15use crate::codec::AsyncRespCodec;
16use crate::url::parse_url;
17
18// ─── Runtime-selected default transport ───────────────────────────────
19//
20// T4.8 guarantees exactly one of the three feature blocks below is
21// active, so `DefaultTransport` is unambiguously defined.
22
23#[cfg(feature = "tokio")]
24type DefaultTransport = tokio::net::TcpStream;
25#[cfg(feature = "smol")]
26type DefaultTransport = smol::net::TcpStream;
27#[cfg(feature = "async-std")]
28type DefaultTransport = async_std::net::TcpStream;
29
30#[cfg(feature = "tokio")]
31async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
32    crate::rt_tokio::connect(host, port).await
33}
34#[cfg(feature = "smol")]
35async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
36    crate::rt_smol::connect(host, port).await
37}
38#[cfg(feature = "async-std")]
39async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
40    crate::rt_async_std::connect(host, port).await
41}
42
43// ─── AsyncConnection ──────────────────────────────────────────────────
44
45/// Async TCP-RESP connection. Mirrors [`kevy_client::Connection`] but
46/// drops the `mem://` / `file://` embedded backends — those are
47/// synchronous and have no async story.
48pub struct AsyncConnection {
49    codec: AsyncRespCodec<DefaultTransport>,
50}
51
52impl AsyncConnection {
53    /// Open a connection from a URL. Accepts `kevy://`, `redis://`,
54    /// `tcp://` — see [`crate::url::parse_url`] for the full grammar.
55    ///
56    /// If the URL carries a `/N` db index (only `kevy://` and `redis://`),
57    /// an initial `SELECT N` round-trip runs before returning.
58    pub async fn open(url: &str) -> io::Result<Self> {
59        let parsed = parse_url(url)?;
60        let transport = connect_default(&parsed.host, parsed.port).await?;
61        let mut codec = AsyncRespCodec::new(transport);
62        if let Some(db) = parsed.db {
63            let reply = codec
64                .request(&[b"SELECT".to_vec(), db.to_string().into_bytes()])
65                .await?;
66            if let Reply::Error(msg) = reply {
67                let text = String::from_utf8_lossy(&msg);
68                return Err(io::Error::other(format!("SELECT {db} rejected: {text}")));
69            }
70        }
71        Ok(Self { codec })
72    }
73
74    /// Direct constructor — useful when the caller wants to manage
75    /// transport setup itself (cluster client, custom socket opts).
76    pub fn from_transport(transport: DefaultTransport) -> Self {
77        Self {
78            codec: AsyncRespCodec::new(transport),
79        }
80    }
81
82    /// `PING`. Returns `Ok(())` on `+PONG`.
83    pub async fn ping(&mut self) -> io::Result<()> {
84        let reply = self.codec.request(&[b"PING".to_vec()]).await?;
85        expect_pong(reply)
86    }
87
88    /// Borrow the underlying codec — exposed so pipeline + subscriber
89    /// adapters built in later tasks (T4.11/T4.14) can share the
90    /// connection state machine.
91    pub fn codec_mut(&mut self) -> &mut AsyncRespCodec<DefaultTransport> {
92        &mut self.codec
93    }
94}
95
96fn expect_pong(reply: Reply) -> io::Result<()> {
97    match reply {
98        Reply::Simple(s) if s == b"PONG" => Ok(()),
99        Reply::Bulk(s) if s == b"PONG" => Ok(()),
100        Reply::Error(msg) => Err(io::Error::other(format!(
101            "PING failed: {}",
102            String::from_utf8_lossy(&msg)
103        ))),
104        other => Err(io::Error::new(
105            io::ErrorKind::InvalidData,
106            format!("PING returned unexpected reply: {other:?}"),
107        )),
108    }
109}