fred 10.1.0

An async client for Redis and Valkey.
Documentation
use super::*;
use crate::{
  protocol::{command::CommandKind, utils as protocol_utils},
  types::*,
  utils,
};
use redis_protocol::resp3::types::{BytesFrame as Resp3Frame, Resp3Frame as _Resp3Frame};
use std::{convert::TryInto, str};

pub static FIELDS: &str = "FIELDS";

fn frame_is_queued(frame: &Resp3Frame) -> bool {
  match frame {
    Resp3Frame::SimpleString { ref data, .. } | Resp3Frame::BlobString { ref data, .. } => {
      str::from_utf8(data).ok().map(|s| s == QUEUED).unwrap_or(false)
    },
    _ => false,
  }
}

pub async fn hdel<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(1 + fields.len());
    args.push(key.into());

    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HDel, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hexists<C: ClientLike>(client: &C, key: Key, field: Key) -> Result<Value, Error> {
  let args: Vec<Value> = vec![key.into(), field.into()];
  args_value_cmd(client, CommandKind::HExists, args).await
}

pub async fn hget<C: ClientLike>(client: &C, key: Key, field: Key) -> Result<Value, Error> {
  let args: Vec<Value> = vec![key.into(), field.into()];
  args_value_cmd(client, CommandKind::HGet, args).await
}

pub async fn hgetall<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || Ok((CommandKind::HGetAll, vec![key.into()]))).await?;

  if frame.as_str().map(|s| s == QUEUED).unwrap_or(false) {
    protocol_utils::frame_to_results(frame)
  } else {
    Ok(Value::Map(protocol_utils::frame_to_map(frame)?))
  }
}

pub async fn hincrby<C: ClientLike>(client: &C, key: Key, field: Key, increment: i64) -> Result<Value, Error> {
  let args: Vec<Value> = vec![key.into(), field.into(), increment.into()];
  args_value_cmd(client, CommandKind::HIncrBy, args).await
}

pub async fn hincrbyfloat<C: ClientLike>(client: &C, key: Key, field: Key, increment: f64) -> Result<Value, Error> {
  let args: Vec<Value> = vec![key.into(), field.into(), increment.try_into()?];
  args_value_cmd(client, CommandKind::HIncrByFloat, args).await
}

pub async fn hkeys<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || Ok((CommandKind::HKeys, vec![key.into()]))).await?;
  protocol_utils::frame_to_results(frame)
}

pub async fn hlen<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
  one_arg_value_cmd(client, CommandKind::HLen, key.into()).await
}

pub async fn hmget<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(1 + fields.len());
    args.push(key.into());

    for field in fields.inner().into_iter() {
      args.push(field.into());
    }
    Ok((CommandKind::HMGet, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hmset<C: ClientLike>(client: &C, key: Key, values: Map) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(1 + (values.len() * 2));
    args.push(key.into());

    for (key, value) in values.inner().into_iter().filter(|x| !x.1.is_null()) {
      args.push(key.into());
      args.push(value);
    }
    Ok((CommandKind::HMSet, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hset<C: ClientLike>(client: &C, key: Key, values: Map) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(1 + (values.len() * 2));
    args.push(key.into());

    for (key, value) in values.inner().into_iter().filter(|x| !x.1.is_null()) {
      args.push(key.into());
      args.push(value);
    }

    Ok((CommandKind::HSet, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hsetnx<C: ClientLike>(client: &C, key: Key, field: Key, value: Value) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    Ok((CommandKind::HSetNx, vec![key.into(), field.into(), value]))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hrandfield<C: ClientLike>(client: &C, key: Key, count: Option<(i64, bool)>) -> Result<Value, Error> {
  let (has_count, has_values) = count.as_ref().map(|(_c, b)| (true, *b)).unwrap_or((false, false));

  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(3);
    args.push(key.into());

    if let Some((count, with_values)) = count {
      args.push(count.into());
      if with_values {
        args.push(static_val!(WITH_VALUES));
      }
    }

    Ok((CommandKind::HRandField, args))
  })
  .await?;

  if has_count {
    if has_values && frame.as_str().map(|s| s != QUEUED).unwrap_or(true) {
      let frame = protocol_utils::flatten_frame(frame);
      protocol_utils::frame_to_map(frame).map(Value::Map)
    } else {
      protocol_utils::frame_to_results(frame)
    }
  } else {
    protocol_utils::frame_to_results(frame)
  }
}

pub async fn hstrlen<C: ClientLike>(client: &C, key: Key, field: Key) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    Ok((CommandKind::HStrLen, vec![key.into(), field.into()]))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn hvals<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
  one_arg_values_cmd(client, CommandKind::HVals, key.into()).await
}

#[cfg(feature = "i-hexpire")]
pub async fn httl<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 3);
    args.extend([key.into(), static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HTtl, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hexpire<C: ClientLike>(
  client: &C,
  key: Key,
  seconds: i64,
  options: Option<ExpireOptions>,
  fields: MultipleKeys,
) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 4);
    args.extend([key.into(), seconds.into()]);
    if let Some(options) = options {
      args.push(options.to_str().into());
    }
    args.extend([static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HExpire, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hexpire_at<C: ClientLike>(
  client: &C,
  key: Key,
  time: i64,
  options: Option<ExpireOptions>,
  fields: MultipleKeys,
) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 4);
    args.extend([key.into(), time.into()]);
    if let Some(options) = options {
      args.push(options.to_str().into());
    }
    args.extend([static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HExpireAt, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hexpire_time<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 3);
    args.extend([key.into(), static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HExpireTime, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hpttl<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 3);
    args.extend([key.into(), static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HPTtl, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hpexpire<C: ClientLike>(
  client: &C,
  key: Key,
  milliseconds: i64,
  options: Option<ExpireOptions>,
  fields: MultipleKeys,
) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 4);
    args.extend([key.into(), milliseconds.into()]);
    if let Some(options) = options {
      args.push(options.to_str().into());
    }
    args.extend([static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HPExpire, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hpexpire_at<C: ClientLike>(
  client: &C,
  key: Key,
  time: i64,
  options: Option<ExpireOptions>,
  fields: MultipleKeys,
) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 4);
    args.extend([key.into(), time.into()]);
    if let Some(options) = options {
      args.push(options.to_str().into());
    }
    args.extend([static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HPExpireAt, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hpexpire_time<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 3);
    args.extend([key.into(), static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HPExpireTime, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

#[cfg(feature = "i-hexpire")]
pub async fn hpersist<C: ClientLike>(client: &C, key: Key, fields: MultipleKeys) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(fields.len() + 3);
    args.extend([key.into(), static_val!(FIELDS), fields.len().try_into()?]);
    for field in fields.inner().into_iter() {
      args.push(field.into());
    }

    Ok((CommandKind::HPersist, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}