memcached-async 0.0.2

Asynchronous memcached protocol parser
Documentation
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;

use bytes::Bytes;
use memcached_async::extract::{Exptime, Flags, Key, Keys, State, Value};
use memcached_async::{
    MetaCode, MetaResponse, Op, Response, Router, Server, ServerConfig, Stored, ValueEntry,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;

#[tokio::test]
async fn ascii_set_get() {
    let store = Arc::new(Mutex::new(HashMap::<Bytes, (Bytes, u32)>::new()));
    let app =
        Router::from_state(store.clone())
            .route(
                Op::Set,
                |Key(key): Key,
                 Value(value): Value,
                 Flags(flags): Flags,
                 Exptime(_ttl): Exptime,
                 State(store): State<Arc<Mutex<HashMap<Bytes, (Bytes, u32)>>>>| async move {
                    let mut guard = store.lock().await;
                    guard.insert(key, (value, flags));
                    Stored
                },
            )
            .route(
                Op::Get,
                |Keys(keys): Keys,
                 State(store): State<Arc<Mutex<HashMap<Bytes, (Bytes, u32)>>>>| async move {
                    let guard = store.lock().await;
                    let mut entries = Vec::new();
                    for key in keys {
                        if let Some((value, flags)) = guard.get(&key) {
                            entries.push(ValueEntry {
                                key: key.clone(),
                                value: value.clone(),
                                flags: *flags,
                                cas: None,
                            });
                        }
                    }
                    Response::Values(entries)
                },
            );

    let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
        .await
        .unwrap();
    let addr = listener.local_addr().unwrap();

    let server = Server::bind(addr.to_string())
        .with_config(ServerConfig::default())
        .serve_with_listener(listener, app);
    tokio::spawn(server);

    let mut stream = TcpStream::connect(addr).await.unwrap();
    stream
        .write_all(b"set key 1 10 5\r\nhello\r\n")
        .await
        .unwrap();
    let mut resp = [0u8; 8];
    stream.read_exact(&mut resp).await.unwrap();
    assert_eq!(&resp, b"STORED\r\n");

    stream.write_all(b"get key\r\n").await.unwrap();
    let mut buf = vec![0u8; 27];
    stream.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"VALUE key 1 5\r\nhello\r\nEND\r\n");
}

#[tokio::test]
async fn meta_q_mn_flush() {
    let app = Router::from_state(()).route(Op::MetaGet, |_key: Key| async move {
        Response::Meta(MetaResponse::new(MetaCode::En))
    });

    let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
        .await
        .unwrap();
    let addr = listener.local_addr().unwrap();

    let server = Server::bind(addr.to_string())
        .with_config(ServerConfig::default())
        .serve_with_listener(listener, app);
    tokio::spawn(server);

    let mut stream = TcpStream::connect(addr).await.unwrap();
    stream.write_all(b"mg key q\r\nmn\r\n").await.unwrap();
    let mut buf = [0u8; 4];
    stream.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"MN\r\n");
}

#[tokio::test]
async fn binary_quiet_get_noop() {
    let store = Arc::new(Mutex::new(HashMap::<Bytes, Bytes>::new()));
    store
        .lock()
        .await
        .insert(Bytes::from_static(b"k"), Bytes::from_static(b"v"));
    let app = Router::from_state(store.clone())
        .route(
            Op::Get,
            |Key(key): Key, State(store): State<Arc<Mutex<HashMap<Bytes, Bytes>>>>| async move {
                let guard = store.lock().await;
                if let Some(value) = guard.get(&key) {
                    Response::Value(ValueEntry {
                        key,
                        value: value.clone(),
                        flags: 0,
                        cas: None,
                    })
                } else {
                    Response::NotFound
                }
            },
        )
        .route(Op::Noop, || async move { Response::Noop });

    let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
        .await
        .unwrap();
    let addr = listener.local_addr().unwrap();

    let server = Server::bind(addr.to_string())
        .with_config(ServerConfig::default())
        .serve_with_listener(listener, app);
    tokio::spawn(server);

    let mut stream = TcpStream::connect(addr).await.unwrap();

    let mut req = Vec::new();
    // getq for key "k"
    req.extend_from_slice(&[
        0x80, 0x09, 0x00, 0x01, // magic, opcode, key len
        0x00, 0x00, 0x00, 0x00, // extras len, datatype, reserved
        0x00, 0x00, 0x00, 0x01, // body len
        0x00, 0x00, 0x00, 0x00, // opaque
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // cas
    ]);
    req.extend_from_slice(b"k");
    // noop
    req.extend_from_slice(&[
        0x80, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    ]);

    stream.write_all(&req).await.unwrap();

    // read get response header
    let mut header = [0u8; 24];
    stream.read_exact(&mut header).await.unwrap();
    assert_eq!(header[0], 0x81);
    assert_eq!(header[1], 0x09);
    let body_len = u32::from_be_bytes([header[8], header[9], header[10], header[11]]) as usize;
    let mut body = vec![0u8; body_len];
    stream.read_exact(&mut body).await.unwrap();

    // read noop response header
    let mut header = [0u8; 24];
    stream.read_exact(&mut header).await.unwrap();
    assert_eq!(header[1], 0x0a);
}

#[tokio::test]
async fn ascii_quit_closes() {
    let app = Router::from_state(());
    let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
        .await
        .unwrap();
    let addr = listener.local_addr().unwrap();

    let server = Server::bind(addr.to_string())
        .with_config(ServerConfig::default())
        .serve_with_listener(listener, app);
    tokio::spawn(server);

    let mut stream = TcpStream::connect(addr).await.unwrap();
    stream.write_all(b"quit\r\n").await.unwrap();
    let mut buf = [0u8; 1];
    let read = stream.read(&mut buf).await.unwrap();
    assert_eq!(read, 0);
}