use crate::{
clients::Client,
interfaces,
modules::inner::ClientInner,
protocol::{
command::{Command, CommandKind},
responders::ResponseKind,
types::{KeyScanInner, ValueScanInner},
},
runtime::RefCount,
types::{Key, Map, Value},
utils,
};
use bytes_utils::Str;
use std::borrow::Cow;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ScanType {
Set,
String,
ZSet,
List,
Hash,
Stream,
}
impl ScanType {
pub(crate) fn to_str(&self) -> Str {
utils::static_str(match *self {
ScanType::Set => "set",
ScanType::String => "string",
ScanType::List => "list",
ScanType::ZSet => "zset",
ScanType::Hash => "hash",
ScanType::Stream => "stream",
})
}
}
pub trait Scanner {
type Page;
fn cursor(&self) -> Option<Cow<str>>;
fn has_more(&self) -> bool;
fn results(&self) -> &Option<Self::Page>;
fn take_results(&mut self) -> Option<Self::Page>;
fn create_client(&self) -> Client;
fn next(self);
fn cancel(self);
}
pub struct ScanResult {
pub(crate) results: Option<Vec<Key>>,
pub(crate) inner: RefCount<ClientInner>,
pub(crate) scan_state: Option<KeyScanInner>,
pub(crate) can_continue: bool,
}
fn next_key_page(inner: &RefCount<ClientInner>, state: &mut Option<KeyScanInner>) {
if let Some(state) = state.take() {
let cluster_node = state.server.clone();
let response = ResponseKind::KeyScan(state);
let mut cmd: Command = (CommandKind::Scan, Vec::new(), response).into();
cmd.cluster_node = cluster_node;
let _ = interfaces::default_send_command(inner, cmd);
}
}
impl Drop for ScanResult {
fn drop(&mut self) {
if self.can_continue {
next_key_page(&self.inner, &mut self.scan_state);
}
}
}
impl Scanner for ScanResult {
type Page = Vec<Key>;
fn cursor(&self) -> Option<Cow<str>> {
if let Some(ref state) = self.scan_state {
state.args[state.cursor_idx].as_str()
} else {
None
}
}
fn has_more(&self) -> bool {
self.can_continue
}
fn results(&self) -> &Option<Self::Page> {
&self.results
}
fn take_results(&mut self) -> Option<Self::Page> {
self.results.take()
}
fn create_client(&self) -> Client {
Client {
inner: self.inner.clone(),
}
}
fn next(self) {
if !self.can_continue {
return;
}
let mut _self = self;
next_key_page(&_self.inner, &mut _self.scan_state);
}
fn cancel(mut self) {
let _ = self.scan_state.take();
}
}
pub struct HScanResult {
pub(crate) results: Option<Map>,
pub(crate) inner: RefCount<ClientInner>,
pub(crate) scan_state: Option<ValueScanInner>,
pub(crate) can_continue: bool,
}
fn next_hscan_page(inner: &RefCount<ClientInner>, state: &mut Option<ValueScanInner>) {
if let Some(state) = state.take() {
let response = ResponseKind::ValueScan(state);
let cmd: Command = (CommandKind::Hscan, Vec::new(), response).into();
let _ = interfaces::default_send_command(inner, cmd);
}
}
impl Drop for HScanResult {
fn drop(&mut self) {
if self.can_continue {
next_hscan_page(&self.inner, &mut self.scan_state);
}
}
}
impl Scanner for HScanResult {
type Page = Map;
fn cursor(&self) -> Option<Cow<str>> {
if let Some(ref state) = self.scan_state {
state.args[state.cursor_idx].as_str()
} else {
None
}
}
fn has_more(&self) -> bool {
self.can_continue
}
fn results(&self) -> &Option<Self::Page> {
&self.results
}
fn take_results(&mut self) -> Option<Self::Page> {
self.results.take()
}
fn create_client(&self) -> Client {
Client {
inner: self.inner.clone(),
}
}
fn next(self) {
if !self.can_continue {
return;
}
let mut _self = self;
next_hscan_page(&_self.inner, &mut _self.scan_state);
}
fn cancel(mut self) {
let _ = self.scan_state.take();
}
}
pub struct SScanResult {
pub(crate) results: Option<Vec<Value>>,
pub(crate) inner: RefCount<ClientInner>,
pub(crate) scan_state: Option<ValueScanInner>,
pub(crate) can_continue: bool,
}
fn next_sscan_page(inner: &RefCount<ClientInner>, state: &mut Option<ValueScanInner>) {
if let Some(state) = state.take() {
let response = ResponseKind::ValueScan(state);
let cmd: Command = (CommandKind::Sscan, Vec::new(), response).into();
let _ = interfaces::default_send_command(inner, cmd);
}
}
impl Drop for SScanResult {
fn drop(&mut self) {
if self.can_continue {
next_sscan_page(&self.inner, &mut self.scan_state);
}
}
}
impl Scanner for SScanResult {
type Page = Vec<Value>;
fn cursor(&self) -> Option<Cow<str>> {
if let Some(ref state) = self.scan_state {
state.args[state.cursor_idx].as_str()
} else {
None
}
}
fn results(&self) -> &Option<Self::Page> {
&self.results
}
fn take_results(&mut self) -> Option<Self::Page> {
self.results.take()
}
fn has_more(&self) -> bool {
self.can_continue
}
fn create_client(&self) -> Client {
Client {
inner: self.inner.clone(),
}
}
fn next(self) {
if !self.can_continue {
return;
}
let mut _self = self;
next_sscan_page(&_self.inner, &mut _self.scan_state);
}
fn cancel(mut self) {
let _ = self.scan_state.take();
}
}
pub struct ZScanResult {
pub(crate) results: Option<Vec<(Value, f64)>>,
pub(crate) inner: RefCount<ClientInner>,
pub(crate) scan_state: Option<ValueScanInner>,
pub(crate) can_continue: bool,
}
fn next_zscan_page(inner: &RefCount<ClientInner>, state: &mut Option<ValueScanInner>) {
if let Some(state) = state.take() {
let response = ResponseKind::ValueScan(state);
let cmd: Command = (CommandKind::Zscan, Vec::new(), response).into();
let _ = interfaces::default_send_command(inner, cmd);
}
}
impl Drop for ZScanResult {
fn drop(&mut self) {
if self.can_continue {
next_zscan_page(&self.inner, &mut self.scan_state);
}
}
}
impl Scanner for ZScanResult {
type Page = Vec<(Value, f64)>;
fn cursor(&self) -> Option<Cow<str>> {
if let Some(ref state) = self.scan_state {
state.args[state.cursor_idx].as_str()
} else {
None
}
}
fn has_more(&self) -> bool {
self.can_continue
}
fn results(&self) -> &Option<Self::Page> {
&self.results
}
fn take_results(&mut self) -> Option<Self::Page> {
self.results.take()
}
fn create_client(&self) -> Client {
Client {
inner: self.inner.clone(),
}
}
fn next(self) {
if !self.can_continue {
return;
}
let mut _self = self;
next_zscan_page(&_self.inner, &mut _self.scan_state);
}
fn cancel(mut self) {
let _ = self.scan_state.take();
}
}