use kevy_client::{PubsubEvent, Subscriber};
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn mock_server(expect_in_at_least: usize, reply_bytes: &'static [u8]) -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let (started_tx, started_rx) = mpsc::channel();
thread::spawn(move || {
started_tx.send(()).unwrap();
let (mut sock, _) = listener.accept().unwrap();
sock.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
let mut buf = vec![0u8; 1024];
let mut total = 0;
while total < expect_in_at_least {
match sock.read(&mut buf) {
Ok(0) => break,
Ok(n) => total += n,
Err(_) => break,
}
}
let _ = sock.write_all(reply_bytes);
thread::sleep(Duration::from_millis(50));
});
started_rx.recv().unwrap();
port
}
const SUBSCRIBE_CHAN_REQ_LEN: usize = 31;
#[test]
fn open_subscribes_and_receives_subscribe_ack() {
let port = mock_server(
SUBSCRIBE_CHAN_REQ_LEN,
b"*3\r\n$9\r\nsubscribe\r\n$4\r\nchan\r\n:1\r\n",
);
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Subscribe {
channel: b"chan".to_vec(),
count: 1,
}
);
}
#[test]
fn message_frame_classified_with_payload() {
let port = mock_server(
SUBSCRIBE_CHAN_REQ_LEN,
b"*3\r\n$9\r\nsubscribe\r\n$4\r\nnews\r\n:1\r\n\
*3\r\n$7\r\nmessage\r\n$4\r\nnews\r\n$5\r\nhello\r\n",
);
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"news"]).unwrap();
let _ = sub.recv().unwrap();
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Message {
channel: b"news".to_vec(),
payload: b"hello".to_vec(),
}
);
}
#[test]
fn psubscribe_then_pmessage_round_trip() {
let port = mock_server(
34,
b"*3\r\n$10\r\npsubscribe\r\n$6\r\nnews.*\r\n:1\r\n\
*4\r\n$8\r\npmessage\r\n$6\r\nnews.*\r\n$9\r\nnews.tech\r\n$2\r\nhi\r\n",
);
let mut sub = Subscriber::connect(&format!("kevy://127.0.0.1:{port}")).unwrap();
sub.psubscribe(&[b"news.*"]).unwrap();
assert_eq!(
sub.recv().unwrap(),
PubsubEvent::Psubscribe {
pattern: b"news.*".to_vec(),
count: 1,
}
);
assert_eq!(
sub.recv().unwrap(),
PubsubEvent::Pmessage {
pattern: b"news.*".to_vec(),
channel: b"news.tech".to_vec(),
payload: b"hi".to_vec(),
}
);
}
#[test]
fn unsubscribe_with_nil_channel_classified_as_none() {
let port = mock_server(
SUBSCRIBE_CHAN_REQ_LEN,
b"*3\r\n$11\r\nunsubscribe\r\n$-1\r\n:0\r\n",
);
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Unsubscribe {
channel: None,
count: 0,
}
);
}
#[test]
fn server_close_yields_unexpected_eof() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let (started_tx, started_rx) = mpsc::channel();
thread::spawn(move || {
started_tx.send(()).unwrap();
let (mut sock, _) = listener.accept().unwrap();
let mut buf = vec![0u8; 1024];
let _ = sock.read(&mut buf);
drop(sock);
});
started_rx.recv().unwrap();
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
let err = sub.recv().unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
}
#[test]
fn malformed_frame_yields_invalid_data() {
let port = mock_server(SUBSCRIBE_CHAN_REQ_LEN, b"!totally-bogus\r\n");
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
let err = sub.recv().unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[test]
fn unknown_pubsub_kind_yields_invalid_data() {
let port = mock_server(
SUBSCRIBE_CHAN_REQ_LEN,
b"*3\r\n$5\r\nbogus\r\n$1\r\nx\r\n:0\r\n",
);
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
let err = sub.recv().unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
#[test]
fn read_timeout_blocks_recv() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let (started_tx, started_rx) = mpsc::channel();
thread::spawn(move || {
started_tx.send(()).unwrap();
let (mut sock, _) = listener.accept().unwrap();
let mut buf = vec![0u8; 1024];
let _ = sock.read(&mut buf);
thread::sleep(Duration::from_millis(500));
});
started_rx.recv().unwrap();
let mut sub = Subscriber::open(&format!("kevy://127.0.0.1:{port}"), &[b"chan"]).unwrap();
sub.set_read_timeout(Some(Duration::from_millis(100))).unwrap();
let err = sub.recv().unwrap_err();
let k = err.kind();
assert!(
matches!(
k,
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
),
"unexpected kind: {k:?}"
);
}