fred 10.1.0

An async client for Redis and Valkey.
Documentation
use super::*;
use crate::{
  interfaces,
  protocol::{
    command::{CommandKind, RouterCommand},
    utils as protocol_utils,
  },
  runtime::oneshot_channel,
  types::{cluster::*, Key, MultipleHashSlots},
  utils,
};
use bytes_utils::Str;
use std::convert::TryInto;

value_cmd!(cluster_bumpepoch, ClusterBumpEpoch);
ok_cmd!(cluster_flushslots, ClusterFlushSlots);
value_cmd!(cluster_myid, ClusterMyID);
value_cmd!(cluster_nodes, ClusterNodes);
ok_cmd!(cluster_saveconfig, ClusterSaveConfig);
values_cmd!(cluster_slots, ClusterSlots);

pub async fn cluster_info<C: ClientLike>(client: &C) -> Result<Value, Error> {
  let frame = utils::request_response(client, || Ok((CommandKind::ClusterInfo, vec![]))).await?;
  protocol_utils::frame_to_results(frame)
}

pub async fn cluster_add_slots<C: ClientLike>(client: &C, slots: MultipleHashSlots) -> Result<(), Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(slots.len());

    for slot in slots.inner().into_iter() {
      args.push(slot.into());
    }

    Ok((CommandKind::ClusterAddSlots, args))
  })
  .await?;

  let response = protocol_utils::frame_to_results(frame)?;
  protocol_utils::expect_ok(&response)
}

pub async fn cluster_count_failure_reports<C: ClientLike>(client: &C, node_id: Str) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    Ok((CommandKind::ClusterCountFailureReports, vec![node_id.into()]))
  })
  .await?;
  protocol_utils::frame_to_results(frame)
}

pub async fn cluster_count_keys_in_slot<C: ClientLike>(client: &C, slot: u16) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    Ok((CommandKind::ClusterCountKeysInSlot, vec![slot.into()]))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn cluster_del_slots<C: ClientLike>(client: &C, slots: MultipleHashSlots) -> Result<(), Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(slots.len());

    for slot in slots.inner().into_iter() {
      args.push(slot.into());
    }

    Ok((CommandKind::ClusterDelSlots, args))
  })
  .await?;

  let response = protocol_utils::frame_to_results(frame)?;
  protocol_utils::expect_ok(&response)
}

pub async fn cluster_failover<C: ClientLike>(client: &C, flag: Option<ClusterFailoverFlag>) -> Result<(), Error> {
  let frame = utils::request_response(client, move || {
    let args = if let Some(flag) = flag {
      vec![flag.to_str().into()]
    } else {
      Vec::new()
    };

    Ok((CommandKind::ClusterFailOver, args))
  })
  .await?;

  let response = protocol_utils::frame_to_results(frame)?;
  protocol_utils::expect_ok(&response)
}

pub async fn cluster_forget<C: ClientLike>(client: &C, node_id: Str) -> Result<(), Error> {
  one_arg_ok_cmd(client, CommandKind::ClusterForget, node_id.into()).await
}

pub async fn cluster_get_keys_in_slot<C: ClientLike>(client: &C, slot: u16, count: u64) -> Result<Value, Error> {
  let count: Value = count.try_into()?;

  let frame = utils::request_response(client, move || {
    let args = vec![slot.into(), count];
    Ok((CommandKind::ClusterGetKeysInSlot, args))
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub async fn cluster_keyslot<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
  one_arg_value_cmd(client, CommandKind::ClusterKeySlot, key.into()).await
}

pub async fn cluster_meet<C: ClientLike>(client: &C, ip: Str, port: u16) -> Result<(), Error> {
  args_ok_cmd(client, CommandKind::ClusterMeet, vec![ip.into(), port.into()]).await
}

pub async fn cluster_replicate<C: ClientLike>(client: &C, node_id: Str) -> Result<(), Error> {
  one_arg_ok_cmd(client, CommandKind::ClusterReplicate, node_id.into()).await
}

pub async fn cluster_replicas<C: ClientLike>(client: &C, node_id: Str) -> Result<Value, Error> {
  one_arg_value_cmd(client, CommandKind::ClusterReplicas, node_id.into()).await
}

pub async fn cluster_reset<C: ClientLike>(client: &C, mode: Option<ClusterResetFlag>) -> Result<(), Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(1);
    if let Some(flag) = mode {
      args.push(flag.to_str().into());
    }

    Ok((CommandKind::ClusterReset, args))
  })
  .await?;

  let response = protocol_utils::frame_to_results(frame)?;
  protocol_utils::expect_ok(&response)
}

pub async fn cluster_set_config_epoch<C: ClientLike>(client: &C, epoch: u64) -> Result<(), Error> {
  let epoch: Value = epoch.try_into()?;
  one_arg_ok_cmd(client, CommandKind::ClusterSetConfigEpoch, epoch).await
}

pub async fn cluster_setslot<C: ClientLike>(client: &C, slot: u16, state: ClusterSetSlotState) -> Result<(), Error> {
  let frame = utils::request_response(client, move || {
    let mut args = Vec::with_capacity(3);
    args.push(slot.into());

    let (state, arg) = state.to_str();
    args.push(state.into());
    if let Some(arg) = arg {
      args.push(arg.into());
    }

    Ok((CommandKind::ClusterSetSlot, args))
  })
  .await?;

  let response = protocol_utils::frame_to_results(frame)?;
  protocol_utils::expect_ok(&response)
}

pub async fn sync_cluster<C: ClientLike>(client: &C) -> Result<(), Error> {
  let (tx, rx) = oneshot_channel();
  let command = RouterCommand::SyncCluster { tx };
  interfaces::send_to_router(client.inner(), command)?;

  rx.await?
}