kevy_client_async/
conn.rs1use std::io;
12
13use kevy_resp::Reply;
14
15use crate::codec::AsyncRespCodec;
16use crate::url::parse_url;
17
18#[cfg(feature = "tokio")]
24type DefaultTransport = tokio::net::TcpStream;
25#[cfg(feature = "smol")]
26type DefaultTransport = smol::net::TcpStream;
27#[cfg(feature = "async-std")]
28type DefaultTransport = async_std::net::TcpStream;
29
30#[cfg(feature = "tokio")]
31async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
32 crate::rt_tokio::connect(host, port).await
33}
34#[cfg(feature = "smol")]
35async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
36 crate::rt_smol::connect(host, port).await
37}
38#[cfg(feature = "async-std")]
39async fn connect_default(host: &str, port: u16) -> io::Result<DefaultTransport> {
40 crate::rt_async_std::connect(host, port).await
41}
42
43pub struct AsyncConnection {
49 codec: AsyncRespCodec<DefaultTransport>,
50}
51
52impl AsyncConnection {
53 pub async fn open(url: &str) -> io::Result<Self> {
59 let parsed = parse_url(url)?;
60 let transport = connect_default(&parsed.host, parsed.port).await?;
61 let mut codec = AsyncRespCodec::new(transport);
62 if let Some(db) = parsed.db {
63 let reply = codec
64 .request(&[b"SELECT".to_vec(), db.to_string().into_bytes()])
65 .await?;
66 if let Reply::Error(msg) = reply {
67 let text = String::from_utf8_lossy(&msg);
68 return Err(io::Error::other(format!("SELECT {db} rejected: {text}")));
69 }
70 }
71 Ok(Self { codec })
72 }
73
74 pub fn from_transport(transport: DefaultTransport) -> Self {
77 Self {
78 codec: AsyncRespCodec::new(transport),
79 }
80 }
81
82 pub async fn ping(&mut self) -> io::Result<()> {
84 let reply = self.codec.request(&[b"PING".to_vec()]).await?;
85 expect_pong(reply)
86 }
87
88 pub fn codec_mut(&mut self) -> &mut AsyncRespCodec<DefaultTransport> {
92 &mut self.codec
93 }
94}
95
96fn expect_pong(reply: Reply) -> io::Result<()> {
97 match reply {
98 Reply::Simple(s) if s == b"PONG" => Ok(()),
99 Reply::Bulk(s) if s == b"PONG" => Ok(()),
100 Reply::Error(msg) => Err(io::Error::other(format!(
101 "PING failed: {}",
102 String::from_utf8_lossy(&msg)
103 ))),
104 other => Err(io::Error::new(
105 io::ErrorKind::InvalidData,
106 format!("PING returned unexpected reply: {other:?}"),
107 )),
108 }
109}