mod common;
use beyond_resp::{RespCodec, Value, Version};
use futures_util::{SinkExt, StreamExt};
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Framed;
async fn scripted_server(script: Vec<Value>) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = listener.accept().await.unwrap();
let mut server = Framed::new(socket, RespCodec::resp2());
for response in script {
let _ = server.next().await;
server.send(response).await.unwrap();
}
});
addr
}
async fn client(addr: SocketAddr) -> Framed<TcpStream, RespCodec> {
let stream = TcpStream::connect(addr).await.unwrap();
Framed::new(stream, RespCodec::resp2())
}
#[tokio::test]
async fn single_request_response() {
let addr = scripted_server(vec![Value::SimpleString("PONG".into())]).await;
let mut c = client(addr).await;
c.send(Value::Array(vec![Value::BulkString("PING".into())])).await.unwrap();
let resp = c.next().await.unwrap().unwrap();
assert_eq!(resp, Value::SimpleString("PONG".into()));
}
#[tokio::test]
async fn pipeline_three_commands() {
let addr = scripted_server(vec![
Value::SimpleString("PONG".into()),
Value::SimpleString("PONG".into()),
Value::SimpleString("PONG".into()),
])
.await;
let mut c = client(addr).await;
let ping = Value::Array(vec![Value::BulkString("PING".into())]);
for _ in 0..3 {
c.feed(ping.clone()).await.unwrap();
}
SinkExt::<Value>::flush(&mut c).await.unwrap();
for i in 0..3 {
let resp = c.next().await.unwrap().unwrap();
assert_eq!(resp, Value::SimpleString("PONG".into()), "response {i}");
}
}
#[tokio::test]
async fn resp3_upgrade_mid_stream() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = listener.accept().await.unwrap();
let mut server = Framed::new(socket, RespCodec::resp2());
let _ = server.next().await;
server.codec_mut().set_version(Version::Resp3);
server
.send(Value::Map(vec![(
Value::BulkString("proto".into()),
Value::Integer(3),
)]))
.await
.unwrap();
let _ = server.next().await;
server.send(Value::Boolean(true)).await.unwrap();
});
let stream = TcpStream::connect(addr).await.unwrap();
let mut c = Framed::new(stream, RespCodec::resp2());
c.send(Value::Array(vec![
Value::BulkString("HELLO".into()),
Value::BulkString("3".into()),
]))
.await
.unwrap();
c.codec_mut().set_version(Version::Resp3);
let hello_resp = c.next().await.unwrap().unwrap();
assert!(
matches!(hello_resp, Value::Map(_)),
"expected RESP3 Map from HELLO response, got {hello_resp:?}"
);
c.send(Value::Array(vec![Value::BulkString("PING".into())])).await.unwrap();
let resp = c.next().await.unwrap().unwrap();
assert_eq!(resp, Value::Boolean(true));
}
#[tokio::test]
async fn large_bulk_string() {
let payload: Vec<u8> = (0u8..=255).cycle().take(1 << 16).collect(); let expected = Value::BulkString(payload.clone().into());
let addr = scripted_server(vec![expected.clone()]).await;
let mut c = client(addr).await;
c.send(Value::Array(vec![Value::BulkString("GET".into())])).await.unwrap();
let resp = c.next().await.unwrap().unwrap();
assert_eq!(resp, expected);
}
#[tokio::test]
async fn interleaved_push_messages() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = listener.accept().await.unwrap();
let mut server = Framed::new(socket, RespCodec::resp3());
let _ = server.next().await; server
.send(Value::Push(vec![
Value::SimpleString("message".into()),
Value::SimpleString("chan".into()),
Value::BulkString("hello".into()),
]))
.await
.unwrap();
server.send(Value::SimpleString("OK".into())).await.unwrap();
});
let stream = TcpStream::connect(addr).await.unwrap();
let mut c = Framed::new(stream, RespCodec::resp3());
c.send(Value::Array(vec![Value::BulkString("SUBSCRIBE".into())])).await.unwrap();
let push = c.next().await.unwrap().unwrap();
assert!(matches!(push, Value::Push(_)));
let ok = c.next().await.unwrap().unwrap();
assert_eq!(ok, Value::SimpleString("OK".into()));
}