fred 10.1.0

An async client for Redis and Valkey.
Documentation
use super::*;
use crate::{
  error::*,
  interfaces,
  modules::inner::ClientInner,
  protocol::{
    command::{Command, CommandKind},
    responders::ResponseKind,
    types::*,
  },
  runtime::{channel, RefCount},
  types::{scan::*, ClusterHash, Key, Value},
  utils,
};
use bytes_utils::Str;
use futures::stream::{Stream, TryStreamExt};

static STARTING_CURSOR: &str = "0";

fn values_args(key: Key, pattern: Str, count: Option<u32>) -> Vec<Value> {
  let mut args = Vec::with_capacity(6);
  args.push(key.into());
  args.push(static_val!(STARTING_CURSOR));
  args.push(static_val!(MATCH));
  args.push(pattern.into());

  if let Some(count) = count {
    args.push(static_val!(COUNT));
    args.push(count.into());
  }

  args
}

fn create_scan_args(
  args: &mut Vec<Value>,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
  cursor: Option<Value>,
) {
  args.push(cursor.unwrap_or_else(|| static_val!(STARTING_CURSOR)));
  args.push(static_val!(MATCH));
  args.push(pattern.into());

  if let Some(count) = count {
    args.push(static_val!(COUNT));
    args.push(count.into());
  }
  if let Some(r#type) = r#type {
    args.push(static_val!(TYPE));
    args.push(r#type.to_str().into());
  }
}

fn pattern_hash_slot(inner: &RefCount<ClientInner>, pattern: &str) -> Option<u16> {
  if inner.config.server.is_clustered() {
    if utils::clustered_scan_pattern_has_hash_tag(inner, pattern) {
      Some(redis_protocol::redis_keyslot(pattern.as_bytes()))
    } else {
      None
    }
  } else {
    None
  }
}

pub fn scan_cluster(
  inner: &RefCount<ClientInner>,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
) -> impl Stream<Item = Result<ScanResult, Error>> {
  let (tx, rx) = channel(0);

  let hash_slots = inner.with_cluster_state(|state| Ok(state.unique_hash_slots()));
  let hash_slots = match hash_slots {
    Ok(slots) => slots,
    Err(e) => {
      let _ = tx.try_send(Err(e));
      return rx.into_stream();
    },
  };

  let mut args = Vec::with_capacity(7);
  create_scan_args(&mut args, pattern, count, r#type, None);
  for slot in hash_slots.into_iter() {
    _trace!(inner, "Scan cluster hash slot server: {}", slot);
    let response = ResponseKind::KeyScan(KeyScanInner {
      hash_slot:  Some(slot),
      args:       args.clone(),
      cursor_idx: 0,
      tx:         tx.clone(),
      server:     None,
    });
    let command: Command = (CommandKind::Scan, Vec::new(), response).into();

    if let Err(e) = interfaces::default_send_command(inner, command) {
      let _ = tx.try_send(Err(e));
      break;
    }
  }

  rx.into_stream()
}

pub fn scan_cluster_buffered(
  inner: &RefCount<ClientInner>,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
) -> impl Stream<Item = Result<Key, Error>> {
  let (tx, rx) = channel(0);

  let hash_slots = inner.with_cluster_state(|state| Ok(state.unique_hash_slots()));
  let hash_slots = match hash_slots {
    Ok(slots) => slots,
    Err(e) => {
      let _ = tx.try_send(Err(e));
      return rx.into_stream();
    },
  };

  let mut args = Vec::with_capacity(7);
  create_scan_args(&mut args, pattern, count, r#type, None);
  for slot in hash_slots.into_iter() {
    _trace!(inner, "Scan cluster buffered hash slot server: {}", slot);
    let response = ResponseKind::KeyScanBuffered(KeyScanBufferedInner {
      hash_slot:  Some(slot),
      args:       args.clone(),
      cursor_idx: 0,
      tx:         tx.clone(),
      server:     None,
    });
    let command: Command = (CommandKind::Scan, Vec::new(), response).into();

    if let Err(e) = interfaces::default_send_command(inner, command) {
      let _ = tx.try_send(Err(e));
      break;
    }
  }

  rx.into_stream()
}

pub async fn scan_page<C: ClientLike>(
  client: &C,
  cursor: Str,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
  server: Option<Server>,
  cluster_hash: Option<ClusterHash>,
) -> Result<Value, Error> {
  let frame = utils::request_response(client, move || {
    let hash_slot = pattern_hash_slot(client.inner(), &pattern);
    let mut args = Vec::with_capacity(7);
    create_scan_args(&mut args, pattern, count, r#type, Some(cursor.into()));

    let mut command = Command::new(CommandKind::Scan, args);
    if let Some(server) = server {
      command.cluster_node = Some(server);
    } else if let Some(hasher) = cluster_hash {
      command.hasher = hasher;
    } else if let Some(slot) = hash_slot {
      command.hasher = ClusterHash::Custom(slot);
    }
    Ok(command)
  })
  .await?;

  protocol_utils::frame_to_results(frame)
}

pub fn scan(
  inner: &RefCount<ClientInner>,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
) -> impl Stream<Item = Result<ScanResult, Error>> {
  let (tx, rx) = channel(0);

  let hash_slot = pattern_hash_slot(inner, &pattern);
  let mut args = Vec::with_capacity(7);
  create_scan_args(&mut args, pattern, count, r#type, None);
  let response = ResponseKind::KeyScan(KeyScanInner {
    hash_slot,
    args,
    server: None,
    cursor_idx: 0,
    tx: tx.clone(),
  });
  let command: Command = (CommandKind::Scan, Vec::new(), response).into();

  if let Err(e) = interfaces::default_send_command(inner, command) {
    let _ = tx.try_send(Err(e));
  }

  rx.into_stream()
}

pub fn scan_buffered(
  inner: &RefCount<ClientInner>,
  pattern: Str,
  count: Option<u32>,
  r#type: Option<ScanType>,
  server: Option<Server>,
) -> impl Stream<Item = Result<Key, Error>> {
  let (tx, rx) = channel(0);

  let hash_slot = pattern_hash_slot(inner, &pattern);
  let mut args = Vec::with_capacity(7);
  create_scan_args(&mut args, pattern, count, r#type, None);
  let response = ResponseKind::KeyScanBuffered(KeyScanBufferedInner {
    hash_slot,
    args,
    server,
    cursor_idx: 0,
    tx: tx.clone(),
  });
  let command: Command = (CommandKind::Scan, Vec::new(), response).into();

  if let Err(e) = interfaces::default_send_command(inner, command) {
    let _ = tx.try_send(Err(e));
  }

  rx.into_stream()
}

pub fn hscan(
  inner: &RefCount<ClientInner>,
  key: Key,
  pattern: Str,
  count: Option<u32>,
) -> impl Stream<Item = Result<HScanResult, Error>> {
  let (tx, rx) = channel(0);
  let args = values_args(key, pattern, count);

  let response = ResponseKind::ValueScan(ValueScanInner {
    tx: tx.clone(),
    cursor_idx: 1,
    args,
  });
  let command: Command = (CommandKind::Hscan, Vec::new(), response).into();
  if let Err(e) = interfaces::default_send_command(inner, command) {
    let _ = tx.try_send(Err(e));
  }

  rx.into_stream().try_filter_map(|result| async move {
    match result {
      ValueScanResult::HScan(res) => Ok(Some(res)),
      _ => Err(Error::new(ErrorKind::Protocol, "Expected HSCAN result.")),
    }
  })
}

pub fn sscan(
  inner: &RefCount<ClientInner>,
  key: Key,
  pattern: Str,
  count: Option<u32>,
) -> impl Stream<Item = Result<SScanResult, Error>> {
  let (tx, rx) = channel(0);
  let args = values_args(key, pattern, count);

  let response = ResponseKind::ValueScan(ValueScanInner {
    tx: tx.clone(),
    cursor_idx: 1,
    args,
  });
  let command: Command = (CommandKind::Sscan, Vec::new(), response).into();

  if let Err(e) = interfaces::default_send_command(inner, command) {
    let _ = tx.try_send(Err(e));
  }

  rx.into_stream().try_filter_map(|result| async move {
    match result {
      ValueScanResult::SScan(res) => Ok(Some(res)),
      _ => Err(Error::new(ErrorKind::Protocol, "Expected SSCAN result.")),
    }
  })
}

pub fn zscan(
  inner: &RefCount<ClientInner>,
  key: Key,
  pattern: Str,
  count: Option<u32>,
) -> impl Stream<Item = Result<ZScanResult, Error>> {
  let inner = inner.clone();
  let (tx, rx) = channel(0);
  let args = values_args(key, pattern, count);

  let response = ResponseKind::ValueScan(ValueScanInner {
    tx: tx.clone(),
    cursor_idx: 1,
    args,
  });
  let command: Command = (CommandKind::Zscan, Vec::new(), response).into();

  if let Err(e) = interfaces::default_send_command(&inner, command) {
    let _ = tx.try_send(Err(e));
  }

  rx.into_stream().try_filter_map(|result| async move {
    match result {
      ValueScanResult::ZScan(res) => Ok(Some(res)),
      _ => Err(Error::new(ErrorKind::Protocol, "Expected ZSCAN result.")),
    }
  })
}