redis-protocol 6.0.0

An implementation of the RESP2 and RESP3 protocols.
Documentation
use futures::{SinkExt, StreamExt};
use redis_protocol::{
  codec::{resp2_encode_command, resp3_encode_command, Resp2, Resp3},
  resp2::types::{
    BorrowedFrame as Resp2BorrowedFrame,
    BytesFrame as Resp2BytesFrame,
    FrameKind as Resp2FrameKind,
    Resp2Frame,
  },
  resp3::types::{
    BorrowedFrame as Resp3BorrowedFrame,
    BytesFrame as Resp3BytesFrame,
    FrameKind as Resp3FrameKind,
    FrameMap,
    Resp3Frame,
    RespVersion,
  },
};
use std::env;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;

fn read_env_var(name: &str) -> Option<String> {
  env::var_os(name).and_then(|s| s.into_string().ok())
}

fn read_redis_centralized_host() -> String {
  read_env_var("FRED_REDIS_CENTRALIZED_HOST").unwrap_or("redis-main".into())
}

fn read_redis_centralized_port() -> u16 {
  read_env_var("FRED_REDIS_CENTRALIZED_PORT")
    .and_then(|s| s.parse::<u16>().ok())
    .unwrap_or(6383)
}

fn read_redis_password() -> String {
  read_env_var("REDIS_PASSWORD").expect("Failed to read REDIS_PASSWORD env")
}

fn read_redis_username() -> String {
  read_env_var("REDIS_USERNAME").expect("Failed to read REDIS_USERNAME env")
}

async fn connect_resp2(int_as_bulkstring: bool) -> Framed<TcpStream, Resp2> {
  let addr = format!("{}:{}", read_redis_centralized_host(), read_redis_centralized_port());
  debug!("Connecting to {}", addr);
  let socket = TcpStream::connect(addr).await.unwrap();
  Framed::new(socket, Resp2::new(int_as_bulkstring))
}

async fn connect_resp3(int_as_blobstring: bool) -> Framed<TcpStream, Resp3> {
  let addr = format!("{}:{}", read_redis_centralized_host(), read_redis_centralized_port());
  debug!("Connecting to {}", addr);
  let socket = TcpStream::connect(addr).await.unwrap();
  Framed::new(socket, Resp3::new(int_as_blobstring))
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp2_codec_ping() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp2(false).await;

  let ping = Resp2BytesFrame::Array(vec![Resp2BytesFrame::BulkString("PING".into())]);
  socket.send(ping).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "PONG")
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp2_codec_borrowed_ping_incr() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp2(true).await;

  let inner = vec![Resp2BorrowedFrame::BulkString("PING".as_bytes())];
  let ping = Resp2BorrowedFrame::Array(&inner);
  socket.send(ping).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "PONG");

  let inner = vec![
    Resp2BorrowedFrame::BulkString("INCRBY".as_bytes()),
    Resp2BorrowedFrame::BulkString("baz".as_bytes()),
    Resp2BorrowedFrame::BulkString("2".as_bytes()),
  ];
  let incr = Resp2BorrowedFrame::Array(&inner);
  socket.send(incr).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response, Resp2BytesFrame::Integer(2));
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp2_codec_get_set() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp2(false).await;

  let get_foo = resp2_encode_command("GET foo");
  socket.send(get_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.kind(), Resp2FrameKind::Null);

  let set_foo = resp2_encode_command("SET foo bar");
  socket.send(set_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "OK");

  let get_foo = resp2_encode_command("GET foo");
  socket.send(get_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "bar");

  let del_foo = resp2_encode_command("DEL foo");
  socket.send(del_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.to_string().unwrap(), "1");
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp2_codec_hgetall() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp2(false).await;

  let hmset = resp2_encode_command("HMSET foo a b c d");
  socket.send(hmset).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "OK");

  let hgetall = resp2_encode_command("HGETALL foo");
  socket.send(hgetall).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(
    response,
    Resp2BytesFrame::Array(vec![
      Resp2BytesFrame::BulkString("a".into()),
      Resp2BytesFrame::BulkString("b".into()),
      Resp2BytesFrame::BulkString("c".into()),
      Resp2BytesFrame::BulkString("d".into()),
    ])
  );

  let del_foo = resp2_encode_command("DEL foo");
  socket.send(del_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.to_string().unwrap(), "1");
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp3_codec_hello() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp3(false).await;

  let hello = Resp3BytesFrame::Hello {
    version: RespVersion::RESP3,
    auth:    None,
    setname: None,
  };
  socket.send(hello).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert!(!matches!(
    response.kind(),
    Resp3FrameKind::SimpleError | Resp3FrameKind::BlobError
  ));

  let hello = Resp3BytesFrame::Hello {
    version: RespVersion::RESP3,
    auth:    Some((read_redis_username().into(), read_redis_password().into())),
    setname: None,
  };
  socket.send(hello).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert!(!matches!(
    response.kind(),
    Resp3FrameKind::SimpleError | Resp3FrameKind::BlobError
  ));

  let hello = Resp3BytesFrame::Hello {
    version: RespVersion::RESP3,
    auth:    Some((read_redis_username().into(), read_redis_password().into())),
    setname: Some("baz".into()),
  };
  socket.send(hello).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert!(!matches!(
    response.kind(),
    Resp3FrameKind::SimpleError | Resp3FrameKind::BlobError
  ));
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp3_codec_get_set() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp3(false).await;

  let hello = Resp3BytesFrame::Hello {
    version: RespVersion::RESP3,
    auth:    None,
    setname: None,
  };
  socket.send(hello).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert!(!matches!(
    response.kind(),
    Resp3FrameKind::SimpleError | Resp3FrameKind::BlobError
  ));

  let get_foo = resp3_encode_command("GET foo");
  socket.send(get_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.kind(), Resp3FrameKind::Null);

  let set_foo = resp3_encode_command("SET foo bar");
  socket.send(set_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "OK");

  let get_foo = resp3_encode_command("GET foo");
  socket.send(get_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "bar");

  let del_foo = resp3_encode_command("DEL foo");
  socket.send(del_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.to_string().unwrap(), "1");
}

#[tokio::test(flavor = "multi_thread")]
async fn should_use_resp3_codec_hgetall() {
  let _ = pretty_env_logger::try_init();
  let mut socket = connect_resp3(false).await;

  let hello = Resp3BytesFrame::Hello {
    version: RespVersion::RESP3,
    auth:    None,
    setname: None,
  };
  socket.send(hello).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert!(!matches!(
    response.kind(),
    Resp3FrameKind::SimpleError | Resp3FrameKind::BlobError
  ));

  let hmset = resp3_encode_command("HMSET foo a b c d");
  socket.send(hmset).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.as_str().unwrap(), "OK");

  let mut expected = FrameMap::new();
  expected.insert(
    Resp3BytesFrame::BlobString {
      data:       "a".into(),
      attributes: None,
    },
    Resp3BytesFrame::BlobString {
      data:       "b".into(),
      attributes: None,
    },
  );
  expected.insert(
    Resp3BytesFrame::BlobString {
      data:       "c".into(),
      attributes: None,
    },
    Resp3BytesFrame::BlobString {
      data:       "d".into(),
      attributes: None,
    },
  );
  let expected = Resp3BytesFrame::Map {
    data:       expected,
    attributes: None,
  };

  let hgetall = resp3_encode_command("HGETALL foo");
  socket.send(hgetall).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response, expected);

  let del_foo = resp3_encode_command("DEL foo");
  socket.send(del_foo).await.unwrap();
  let response = socket.next().await.unwrap().unwrap();
  assert_eq!(response.to_string().unwrap(), "1");
}