use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use bytes::Bytes;
use memcached_async::extract::{Exptime, Flags, Key, Keys, State, Value};
use memcached_async::{
MetaCode, MetaResponse, Op, Response, Router, Server, ServerConfig, Stored, ValueEntry,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
#[tokio::test]
async fn ascii_set_get() {
let store = Arc::new(Mutex::new(HashMap::<Bytes, (Bytes, u32)>::new()));
let app =
Router::from_state(store.clone())
.route(
Op::Set,
|Key(key): Key,
Value(value): Value,
Flags(flags): Flags,
Exptime(_ttl): Exptime,
State(store): State<Arc<Mutex<HashMap<Bytes, (Bytes, u32)>>>>| async move {
let mut guard = store.lock().await;
guard.insert(key, (value, flags));
Stored
},
)
.route(
Op::Get,
|Keys(keys): Keys,
State(store): State<Arc<Mutex<HashMap<Bytes, (Bytes, u32)>>>>| async move {
let guard = store.lock().await;
let mut entries = Vec::new();
for key in keys {
if let Some((value, flags)) = guard.get(&key) {
entries.push(ValueEntry {
key: key.clone(),
value: value.clone(),
flags: *flags,
cas: None,
});
}
}
Response::Values(entries)
},
);
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = Server::bind(addr.to_string())
.with_config(ServerConfig::default())
.serve_with_listener(listener, app);
tokio::spawn(server);
let mut stream = TcpStream::connect(addr).await.unwrap();
stream
.write_all(b"set key 1 10 5\r\nhello\r\n")
.await
.unwrap();
let mut resp = [0u8; 8];
stream.read_exact(&mut resp).await.unwrap();
assert_eq!(&resp, b"STORED\r\n");
stream.write_all(b"get key\r\n").await.unwrap();
let mut buf = vec![0u8; 27];
stream.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"VALUE key 1 5\r\nhello\r\nEND\r\n");
}
#[tokio::test]
async fn meta_q_mn_flush() {
let app = Router::from_state(()).route(Op::MetaGet, |_key: Key| async move {
Response::Meta(MetaResponse::new(MetaCode::En))
});
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = Server::bind(addr.to_string())
.with_config(ServerConfig::default())
.serve_with_listener(listener, app);
tokio::spawn(server);
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write_all(b"mg key q\r\nmn\r\n").await.unwrap();
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"MN\r\n");
}
#[tokio::test]
async fn binary_quiet_get_noop() {
let store = Arc::new(Mutex::new(HashMap::<Bytes, Bytes>::new()));
store
.lock()
.await
.insert(Bytes::from_static(b"k"), Bytes::from_static(b"v"));
let app = Router::from_state(store.clone())
.route(
Op::Get,
|Key(key): Key, State(store): State<Arc<Mutex<HashMap<Bytes, Bytes>>>>| async move {
let guard = store.lock().await;
if let Some(value) = guard.get(&key) {
Response::Value(ValueEntry {
key,
value: value.clone(),
flags: 0,
cas: None,
})
} else {
Response::NotFound
}
},
)
.route(Op::Noop, || async move { Response::Noop });
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = Server::bind(addr.to_string())
.with_config(ServerConfig::default())
.serve_with_listener(listener, app);
tokio::spawn(server);
let mut stream = TcpStream::connect(addr).await.unwrap();
let mut req = Vec::new();
req.extend_from_slice(&[
0x80, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]);
req.extend_from_slice(b"k");
req.extend_from_slice(&[
0x80, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]);
stream.write_all(&req).await.unwrap();
let mut header = [0u8; 24];
stream.read_exact(&mut header).await.unwrap();
assert_eq!(header[0], 0x81);
assert_eq!(header[1], 0x09);
let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
let mut body = vec![0u8; body_len];
stream.read_exact(&mut body).await.unwrap();
let mut header = [0u8; 24];
stream.read_exact(&mut header).await.unwrap();
assert_eq!(header[1], 0x0a);
}
#[tokio::test]
async fn ascii_quit_closes() {
let app = Router::from_state(());
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let server = Server::bind(addr.to_string())
.with_config(ServerConfig::default())
.serve_with_listener(listener, app);
tokio::spawn(server);
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write_all(b"quit\r\n").await.unwrap();
let mut buf = [0u8; 1];
let read = stream.read(&mut buf).await.unwrap();
assert_eq!(read, 0);
}