#![cfg(feature = "tokio")]
use std::io;
use kevy_client_async::AsyncConnection;
use kevy_resp::Reply;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
async fn spawn_replier_seq(steps: Vec<(Vec<u8>, Vec<u8>)>) -> io::Result<u16> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.expect("accept");
for (incoming, outgoing) in steps {
let mut buf = vec![0u8; incoming.len()];
sock.read_exact(&mut buf).await.expect("read");
assert_eq!(buf, incoming, "client wire mismatch");
sock.write_all(&outgoing).await.expect("write");
}
sock.shutdown().await.ok();
});
Ok(port)
}
async fn spawn_replier(
incoming_expected: Vec<u8>,
outgoing: Vec<u8>,
) -> io::Result<u16> {
spawn_replier_seq(vec![(incoming_expected, outgoing)]).await
}
#[tokio::test]
async fn ping_round_trip() {
let port = spawn_replier(b"*1\r\n$4\r\nPING\r\n".to_vec(), b"+PONG\r\n".to_vec())
.await
.unwrap();
let url = format!("tcp://127.0.0.1:{port}");
let mut conn = AsyncConnection::open(&url).await.unwrap();
conn.ping().await.unwrap();
}
#[tokio::test]
async fn set_then_get() {
let port = spawn_replier_seq(vec![
(
b"*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n".to_vec(),
b"+OK\r\n".to_vec(),
),
(
b"*2\r\n$3\r\nGET\r\n$1\r\nk\r\n".to_vec(),
b"$1\r\nv\r\n".to_vec(),
),
])
.await
.unwrap();
let url = format!("tcp://127.0.0.1:{port}");
let mut conn = AsyncConnection::open(&url).await.unwrap();
conn.set(b"k", b"v").await.unwrap();
let v = conn.get(b"k").await.unwrap();
assert_eq!(v.as_deref(), Some(&b"v"[..]));
}
#[tokio::test]
async fn pipeline_one_round_trip() {
let port = spawn_replier(
b"*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n\
*2\r\n$3\r\nGET\r\n$1\r\nk\r\n\
*2\r\n$4\r\nINCR\r\n$3\r\ncnt\r\n"
.to_vec(),
b"+OK\r\n$1\r\nv\r\n:1\r\n".to_vec(),
)
.await
.unwrap();
let url = format!("tcp://127.0.0.1:{port}");
let mut conn = AsyncConnection::open(&url).await.unwrap();
let replies = conn
.pipeline()
.set(b"k", b"v")
.get(b"k")
.incr(b"cnt")
.run(&mut conn)
.await
.unwrap();
assert_eq!(replies.len(), 3);
assert!(matches!(replies[0], Reply::Simple(ref s) if s == b"OK"));
assert!(matches!(replies[1], Reply::Bulk(ref v) if v == b"v"));
assert!(matches!(replies[2], Reply::Int(1)));
}
#[tokio::test]
async fn server_close_yields_unexpected_eof() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 32];
let _ = sock.read(&mut buf).await;
});
let url = format!("tcp://127.0.0.1:{port}");
let mut conn = AsyncConnection::open(&url).await.unwrap();
let err = conn.ping().await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
}
#[tokio::test]
async fn connect_works_with_real_tcpstream_typeshape() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 32];
let _ = sock.read(&mut buf).await;
sock.write_all(b"+PONG\r\n").await.unwrap();
});
let s = TcpStream::connect(("127.0.0.1", port)).await.unwrap();
let mut conn = AsyncConnection::from_transport(s);
conn.ping().await.unwrap();
}