Skip to main content

ferriskey/cluster/
scan.rs

1//! This module implements cluster-wide scanning operations for clusters.
2//!
3//! # Overview
4//!
5//! The module provides functionality to scan keys across all nodes in a cluster,
6//! handling topology changes, failovers, and partial cluster coverage scenarios.
7//! It maintains state between scan iterations and ensures consistent scanning even
8//! when cluster topology changes.
9//!
10//! # Key Components
11//!
12//! - [`ClusterScanArgs`]: Configuration for scan operations including filtering and behavior options
13//! - [`ScanStateRC`]: Thread-safe reference-counted wrapper for scan state management
14//! - [`ScanState`]: Internal state tracking for cluster-wide scanning progress
15//! - [`ObjectType`]: Supported data types for filtering scan results
16//!
17//! # Key Features
18//!
19//! - Resumable scanning across cluster nodes
20//! - Automatic handling of cluster topology changes
21//! - Support for all regular SCAN options
22//! - Resilient to node failures and resharding
23//!
24//! # Implementation Details
25//!
26//! The scanning process is implemented using a bitmap to track scanned slots and
27//! maintains epoch information to handle topology changes. The implementation:
28//!
29//! - Uses a 64-bit aligned bitmap for efficient slot tracking
30//! - Maintains cursor position per node
31//! - Handles partial cluster coverage scenarios
32//! - Provides automatic recovery from node failures
33//! - Ensures consistent scanning across topology changes
34//!
35//! # Error Handling
36//!
37//! The module handles various error scenarios including:
38//! - Node failures during scanning
39//! - Cluster topology changes
40//! - Network connectivity issues
41//! - Invalid routing scenarios
42
43use crate::cluster::routing::SlotAddr;
44use crate::cluster::topology::{SLOT_SIZE, get_slot};
45use crate::cluster::{ClusterConnInner, Connect, InnerCore, RefreshPolicy};
46use crate::cmd::cmd;
47use crate::connection::ConnectionLike;
48use crate::value::{ErrorKind, Error, Result, Value, from_value};
49use std::sync::Arc;
50use strum_macros::{Display, EnumString};
51
52const BITS_PER_U64: u16 = u64::BITS as u16;
53const NUM_OF_SLOTS: u16 = SLOT_SIZE;
54const BITS_ARRAY_SIZE: u16 = NUM_OF_SLOTS / BITS_PER_U64;
55const END_OF_SCAN: u16 = NUM_OF_SLOTS;
56type SlotsBitsArray = [u64; BITS_ARRAY_SIZE as usize];
57
58/// Holds configuration for a cluster scan operation.
59///
60/// # Fields
61/// - `scan_state_cursor`: Internal state tracking scan progress
62/// - `match_pattern`: Optional pattern to filter keys
63/// - `count`: Optional limit on number of keys returned per iteration
64/// - `object_type`: Optional filter for specific data types
65/// - `allow_non_covered_slots`: Whether to continue if some slots are uncovered
66///
67/// See examples below for usage with the builder pattern.
68/// # Examples
69///
70/// Create a new `ClusterScanArgs` instance using the builder pattern:
71///
72/// ```rust,no_run,ignore
73/// use ferriskey::ClusterScanArgs;
74/// use ferriskey::ObjectType;
75///
76/// // Create basic scan args with defaults
77/// let basic_scan = ClusterScanArgs::builder().build();
78///
79/// // Create scan args with custom options
80/// let custom_scan = ClusterScanArgs::builder()
81///     .with_match_pattern("user:*")      // Match keys starting with "user:"
82///     .with_count(100)                   // Return 100 keys per iteration
83///     .with_object_type(ObjectType::Hash) // Only scan hash objects
84///     .allow_non_covered_slots(true)     // Continue scanning even if some slots aren't covered
85///     .build();
86///
87/// // The builder can be used to create multiple configurations
88/// let another_scan = ClusterScanArgs::builder()
89///     .with_match_pattern("session:*")
90///     .with_object_type(ObjectType::String)
91///     .build();
92/// ```ignore
93
94#[derive(Clone, Default)]
95pub struct ClusterScanArgs {
96    /// Reference-counted scan state cursor, managed internally.
97    pub scan_state_cursor: ScanStateRC,
98
99    /// Optional pattern to match keys during the scan.
100    pub match_pattern: Option<Vec<u8>>,
101
102    /// A "hint" to the cluster of how much keys to return per scan iteration, if none is sent to the server, the default value is 10.
103    pub count: Option<u32>,
104
105    /// Optional filter to include only keys of a specific data type.
106    pub object_type: Option<ObjectType>,
107
108    /// Flag indicating whether to allow scanning when there are slots not covered by the cluster, by default it is set to false and the scan will stop if some slots are not covered.
109    pub allow_non_covered_slots: bool,
110}
111
112impl ClusterScanArgs {
113    /// Creates a new [`ClusterScanArgsBuilder`] instance.
114    ///
115    /// # Returns
116    ///
117    /// A [`ClusterScanArgsBuilder`] instance for configuring cluster scan arguments.
118    pub fn builder() -> ClusterScanArgsBuilder {
119        ClusterScanArgsBuilder::default()
120    }
121    pub(crate) fn set_scan_state_cursor(&mut self, scan_state_cursor: ScanStateRC) {
122        self.scan_state_cursor = scan_state_cursor;
123    }
124}
125
126#[derive(Default)]
127/// Builder pattern for creating cluster scan arguments.
128///
129/// This struct allows configuring various parameters for scanning keys in a cluster:
130/// * Pattern matching for key filtering
131/// * Count limit for returned keys
132/// * Filtering by object type
133/// * Control over scanning non-covered slots
134///
135/// # Example
136/// ```ignore
137/// use ferriskey::{ClusterScanArgs, ObjectType};
138///
139/// let args = ClusterScanArgs::builder()
140///     .with_match_pattern(b"user:*")
141///     .with_count(100)
142///     .with_object_type(ObjectType::Hash)
143///     .build();
144/// ```ignore
145pub struct ClusterScanArgsBuilder {
146    /// By default, the match pattern is set to `None` and no filtering is applied.
147    match_pattern: Option<Vec<u8>>,
148    /// A "hint" to the cluster of how much keys to return per scan iteration, by default none is sent to the server, the default value is 10.
149    count: Option<u32>,
150    /// By default, the object type is set to `None` and no filtering is applied.
151    object_type: Option<ObjectType>,
152    /// By default, the flag to allow scanning non-covered slots is set to `false`, meaning scanning will stop if some slots are not covered.
153    allow_non_covered_slots: Option<bool>,
154}
155
156impl ClusterScanArgsBuilder {
157    /// Sets the match pattern for the scan operation.
158    ///
159    /// # Arguments
160    ///
161    /// * `pattern` - The pattern to match keys against.
162    ///
163    /// # Returns
164    ///
165    /// The updated [`ClusterScanArgsBuilder`] instance.
166    pub fn with_match_pattern<T: Into<Vec<u8>>>(mut self, pattern: T) -> Self {
167        self.match_pattern = Some(pattern.into());
168        self
169    }
170
171    /// Sets the count for the scan operation.
172    ///
173    /// # Arguments
174    ///
175    /// * `count` - A "hint" to the cluster of how much keys to return per scan iteration.
176    ///
177    /// The actual number of keys returned may be less or more than the count specified.
178    ///
179    /// 4,294,967,295 is the maximum keys possible in a cluster, so higher values will be capped.
180    /// Hence the count is represented as a `u32` instead of `usize`.
181    ///
182    /// The default value is 10, if nothing is sent to the server, meaning nothing set in the builder.
183    ///
184    /// # Returns
185    ///
186    /// The updated [`ClusterScanArgsBuilder`] instance.
187    pub fn with_count(mut self, count: u32) -> Self {
188        self.count = Some(count);
189        self
190    }
191
192    /// Sets the object type for the scan operation.
193    ///
194    /// # Arguments
195    ///
196    /// * `object_type` - The type of object to filter keys by.
197    ///
198    /// See [`ObjectType`] for supported data types.
199    ///
200    /// # Returns
201    ///
202    /// The updated [`ClusterScanArgsBuilder`] instance.
203    pub fn with_object_type(mut self, object_type: ObjectType) -> Self {
204        self.object_type = Some(object_type);
205        self
206    }
207
208    /// Sets the flag to allow scanning non-covered slots.
209    ///
210    /// # Arguments
211    ///
212    /// * `allow` - A boolean flag indicating whether to allow scanning non-covered slots.
213    ///
214    /// # Returns
215    ///
216    /// The updated [`ClusterScanArgsBuilder`] instance.
217    pub fn allow_non_covered_slots(mut self, allow: bool) -> Self {
218        self.allow_non_covered_slots = Some(allow);
219        self
220    }
221
222    /// Builds the [`ClusterScanArgs`] instance with the provided configuration.
223    ///
224    /// # Returns
225    ///
226    /// A [`ClusterScanArgs`] instance with the configured options.
227    pub fn build(self) -> ClusterScanArgs {
228        ClusterScanArgs {
229            scan_state_cursor: ScanStateRC::new(),
230            match_pattern: self.match_pattern,
231            count: self.count,
232            object_type: self.object_type,
233            allow_non_covered_slots: self.allow_non_covered_slots.unwrap_or(false),
234        }
235    }
236}
237
238/// Represents the type of an object used to filter keys by data type.
239///
240/// This enum is used with the `TYPE` option in the `SCAN` command to
241/// filter keys by their data type.
242#[derive(Debug, Clone, Display, PartialEq, EnumString)]
243pub enum ObjectType {
244    /// String data type.
245    String,
246    /// List data type.
247    List,
248    /// Set data type.
249    Set,
250    /// Sorted set data type.
251    ZSet,
252    /// Hash data type.
253    Hash,
254    /// Stream data type.
255    Stream,
256}
257
258impl From<String> for ObjectType {
259    fn from(s: String) -> Self {
260        match s.to_lowercase().as_str() {
261            "string" => ObjectType::String,
262            "list" => ObjectType::List,
263            "set" => ObjectType::Set,
264            "zset" => ObjectType::ZSet,
265            "hash" => ObjectType::Hash,
266            "stream" => ObjectType::Stream,
267            _ => ObjectType::String,
268        }
269    }
270}
271
272#[derive(PartialEq, Debug, Clone, Default)]
273pub enum ScanStateStage {
274    #[default]
275    Initiating,
276    InProgress,
277    Finished,
278}
279
280/// Wrapper struct for managing the state of a cluster scan operation.
281///
282/// This struct holds an `Arc` to the actual scan state and a status indicating
283/// whether the scan is initiating, in progress, or finished.
284#[derive(Debug, Clone, Default)]
285pub struct ScanStateRC {
286    scan_state_rc: Arc<Option<ScanState>>,
287    status: ScanStateStage,
288}
289
290impl ScanStateRC {
291    /// Creates a new instance of [`ScanStateRC`] from a given [`ScanState`].
292    fn from_scan_state(scan_state: ScanState) -> Self {
293        Self {
294            scan_state_rc: Arc::new(Some(scan_state)),
295            status: ScanStateStage::InProgress,
296        }
297    }
298
299    /// Creates a new instance of [`ScanStateRC`].
300    ///
301    /// This method initializes the [`ScanStateRC`] with a reference to a [`ScanState`] that is initially set to `None`.
302    /// An empty ScanState is equivalent to a 0 cursor.
303    pub fn new() -> Self {
304        Self {
305            scan_state_rc: Arc::new(None),
306            status: ScanStateStage::Initiating,
307        }
308    }
309    /// create a new instance of [`ScanStateRC`] with finished state and empty scan state.
310    fn create_finished() -> Self {
311        Self {
312            scan_state_rc: Arc::new(None),
313            status: ScanStateStage::Finished,
314        }
315    }
316    /// Returns `true` if the scan state is finished.
317    pub fn is_finished(&self) -> bool {
318        self.status == ScanStateStage::Finished
319    }
320
321    /// Returns a clone of the scan state, if it exist.
322    pub(crate) fn state_from_wrapper(&self) -> Option<ScanState> {
323        if self.status == ScanStateStage::Initiating || self.status == ScanStateStage::Finished {
324            None
325        } else {
326            self.scan_state_rc.as_ref().clone()
327        }
328    }
329}
330
331/// Represents the state of a cluster scan operation.
332///
333/// This struct keeps track of the current cursor, which slots have been scanned,
334/// the address currently being scanned, and the epoch of that address.
335#[derive(PartialEq, Debug, Clone)]
336pub(crate) struct ScanState {
337    // the real cursor in the scan operation
338    cursor: u64,
339    // a map of the slots that have been scanned
340    scanned_slots_map: SlotsBitsArray,
341    // the address that is being scanned currently, based on the next slot set to 0 in the scanned_slots_map, and the address that "owns" the slot
342    // in the SlotMap
343    pub(crate) address_in_scan: Arc<String>,
344    // epoch represent the version of the address, when a failover happens or slots migrate in the epoch will be updated to +1
345    address_epoch: u64,
346    // the status of the scan operation
347    scan_status: ScanStateStage,
348}
349
350impl ScanState {
351    /// Create a new instance of ScanState.
352    ///
353    /// # Arguments
354    ///
355    /// * `cursor` - The cursor position.
356    /// * `scanned_slots_map` - The scanned slots map.
357    /// * `address_in_scan` - The address being scanned.
358    /// * `address_epoch` - The epoch of the address being scanned.
359    /// * `scan_status` - The status of the scan operation.
360    ///
361    /// # Returns
362    ///
363    /// A new instance of ScanState.
364    pub fn new(
365        cursor: u64,
366        scanned_slots_map: SlotsBitsArray,
367        address_in_scan: Arc<String>,
368        address_epoch: u64,
369        scan_status: ScanStateStage,
370    ) -> Self {
371        Self {
372            cursor,
373            scanned_slots_map,
374            address_in_scan,
375            address_epoch,
376            scan_status,
377        }
378    }
379
380    fn create_finished_state() -> Self {
381        Self {
382            cursor: 0,
383            scanned_slots_map: [0; BITS_ARRAY_SIZE as usize],
384            address_in_scan: Default::default(),
385            address_epoch: 0,
386            scan_status: ScanStateStage::Finished,
387        }
388    }
389
390    /// Initialize a new scan operation.
391    /// This method creates a new scan state with the cursor set to 0, the scanned slots map initialized to 0,
392    /// and the address set to the address associated with slot 0.
393    /// The address epoch is set to the epoch of the address.
394    /// If the address epoch cannot be retrieved, the method returns an error.
395    async fn initiate_scan<C>(
396        core: &InnerCore<C>,
397        allow_non_covered_slots: bool,
398        match_pattern: Option<&[u8]>,
399    ) -> Result<ScanState>
400    where
401        C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
402    {
403        // Hash-tag-scoped scan: if the match pattern embeds a
404        // Valkey hash tag (`{tag}`), every matching key hashes
405        // into the single slot owned by that tag. Pre-mark every
406        // other slot as scanned so the scan only traverses the
407        // one primary that owns the tag's slot and finishes
408        // after that node's cursor wraps. Without this, ferriskey
409        // iterates every primary serially — correct, but wasteful
410        // when the caller has already pinned a hash tag.
411        let (mut new_scanned_slots_map, start_slot) = match match_pattern
412            .and_then(hashtag_from_pattern)
413        {
414            Some(tag) => {
415                let pinned = get_slot(tag);
416                let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
417                unmark_slot_as_scanned(&mut map, pinned);
418                (map, pinned)
419            }
420            None => ([0; BITS_ARRAY_SIZE as usize], 0),
421        };
422        let new_cursor = 0;
423        let address = next_address_to_scan(
424            core,
425            start_slot,
426            &mut new_scanned_slots_map,
427            allow_non_covered_slots,
428        )
429        .await?;
430
431        match address {
432            NextNodeResult::AllSlotsCompleted => Ok(ScanState::create_finished_state()),
433            NextNodeResult::Address(address) => {
434                let address_epoch = core.address_epoch(&address).await.unwrap_or(0);
435                Ok(ScanState::new(
436                    new_cursor,
437                    new_scanned_slots_map,
438                    address,
439                    address_epoch,
440                    ScanStateStage::InProgress,
441                ))
442            }
443        }
444    }
445
446    /// Update the scan state without updating the scanned slots map.
447    /// This method is used when the address epoch has changed, and we can't determine which slots are new.
448    /// In this case, we skip updating the scanned slots map and only update the address and cursor.
449    async fn new_scan_state<C>(
450        &self,
451        core: Arc<InnerCore<C>>,
452        allow_non_covered_slots: bool,
453        new_scanned_slots_map: Option<SlotsBitsArray>,
454    ) -> Result<ScanState>
455    where
456        C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
457    {
458        // If the new scanned slots map is not provided, use the current scanned slots map.
459        // The new scanned slots map is provided in the general case when the address epoch has not changed,
460        // meaning that we could safely update the scanned slots map with the slots owned by the node.
461        // Epoch change means that some slots are new, and we can't determine which slots been there from the beginning and which are new.
462        let mut scanned_slots_map = new_scanned_slots_map.unwrap_or(self.scanned_slots_map);
463        let next_slot = next_slot(&scanned_slots_map).unwrap_or(0);
464        match next_address_to_scan(
465            &core,
466            next_slot,
467            &mut scanned_slots_map,
468            allow_non_covered_slots,
469        )
470        .await
471        {
472            Ok(NextNodeResult::Address(new_address)) => {
473                let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0);
474                Ok(ScanState::new(
475                    0,
476                    scanned_slots_map,
477                    new_address,
478                    new_epoch,
479                    ScanStateStage::InProgress,
480                ))
481            }
482            Ok(NextNodeResult::AllSlotsCompleted) => Ok(ScanState::create_finished_state()),
483            Err(err) => Err(err),
484        }
485    }
486
487    /// Update the scan state and get the next address to scan.
488    /// This method is called when the cursor reaches 0, indicating that the current address has been scanned.
489    /// This method updates the scan state based on the scanned slots map and retrieves the next address to scan.
490    /// If the address epoch has changed, the method skips updating the scanned slots map and only updates the address and cursor.
491    /// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address.
492    /// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch.
493    async fn create_updated_scan_state_for_completed_address<C>(
494        &mut self,
495        core: Arc<InnerCore<C>>,
496        allow_non_covered_slots: bool,
497    ) -> Result<ScanState>
498    where
499        C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
500    {
501        ClusterConnInner::check_topology_and_refresh_if_diff(
502            core.clone(),
503            &RefreshPolicy::NotThrottable,
504        )
505        .await?;
506
507        let mut scanned_slots_map = self.scanned_slots_map;
508        // If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
509        // In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
510        let new_address_epoch = core.address_epoch(&self.address_in_scan).await.unwrap_or(0);
511        if new_address_epoch != self.address_epoch {
512            return self
513                .new_scan_state(core, allow_non_covered_slots, None)
514                .await;
515        }
516        // If epoch wasn't changed, the slots owned by the address after the refresh are all valid as slots that been scanned
517        // So we will update the scanned_slots_map with the slots owned by the address
518        let slots_scanned = core.slots_of_address(self.address_in_scan.clone()).await;
519        for slot in slots_scanned {
520            mark_slot_as_scanned(&mut scanned_slots_map, slot);
521        }
522        // Get the next address to scan and its param base on the next slot set to 0 in the scanned_slots_map
523        self.new_scan_state(core, allow_non_covered_slots, Some(scanned_slots_map))
524            .await
525    }
526}
527
528fn mark_slot_as_scanned(scanned_slots_map: &mut SlotsBitsArray, slot: u16) {
529    let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize;
530    let slot_bit = slot as u64 % (BITS_PER_U64 as u64);
531    scanned_slots_map[slot_index] |= 1 << slot_bit;
532}
533
534/// Clear the bit for `slot` in the bitmap, marking it as NOT
535/// scanned. Used by hash-tag-scoped initiation, where the bitmap
536/// starts with every slot marked scanned except the pinned slot
537/// so `next_address_to_scan` locates only the primary that owns
538/// the hash tag.
539fn unmark_slot_as_scanned(scanned_slots_map: &mut SlotsBitsArray, slot: u16) {
540    let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize;
541    let slot_bit = slot as u64 % (BITS_PER_U64 as u64);
542    scanned_slots_map[slot_index] &= !(1 << slot_bit);
543}
544
545/// Extract the bytes of the first Valkey hash tag (`{...}`) in a
546/// MATCH pattern, or `None` if absent or empty. Matches the
547/// tag-extraction logic in [`crate::cluster::topology`] used for
548/// slot routing so scan pinning and key routing agree.
549fn hashtag_from_pattern(pattern: &[u8]) -> Option<&[u8]> {
550    let open = pattern.iter().position(|b| *b == b'{')?;
551    let close_rel = pattern[open + 1..].iter().position(|b| *b == b'}')?;
552    let tag = &pattern[open + 1..open + 1 + close_rel];
553    (!tag.is_empty()).then_some(tag)
554}
555
556#[derive(PartialEq, Debug, Clone)]
557/// The address type representing a connection address
558///
559/// # Fields
560///
561/// * `Address` - A thread-safe shared string containing the server address
562/// * `AllSlotsCompleted` - Indicates that all slots have been scanned
563enum NextNodeResult {
564    Address(Arc<String>),
565    AllSlotsCompleted,
566}
567
568/// Determines the next node address to scan within the cluster.
569///
570/// This asynchronous function iterates through cluster slots to find the next available
571/// node responsible for scanning. If a slot is not covered and `allow_non_covered_slots`
572/// is enabled, it marks the slot as scanned and proceeds to the next one. The process
573/// continues until a valid address is found or all slots have been scanned.
574///
575/// # Arguments
576///
577/// * `core` - Reference to the cluster's inner core connection.
578/// * `slot` - The current slot number to scan.
579/// * `scanned_slots_map` - Mutable reference to the bitmap tracking scanned slots.
580/// * `allow_non_covered_slots` - Flag indicating whether to allow scanning of uncovered slots.
581///
582/// # Returns
583///
584/// * `Result<NextNodeResult>` - Returns the next node address to scan or indicates completion.
585///
586/// # Type Parameters
587///
588/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`.
589///
590async fn next_address_to_scan<C>(
591    core: &InnerCore<C>,
592    mut slot: u16,
593    scanned_slots_map: &mut SlotsBitsArray,
594    allow_non_covered_slots: bool,
595) -> Result<NextNodeResult>
596where
597    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
598{
599    loop {
600        if slot == END_OF_SCAN {
601            return Ok(NextNodeResult::AllSlotsCompleted);
602        }
603
604        if let Some(addr) = core
605            .conn_lock
606            .read()
607            .slot_map
608            .node_address_for_slot(slot, SlotAddr::ReplicaRequired)
609        {
610            // Found a valid address for the slot
611            return Ok(NextNodeResult::Address(addr));
612        } else if allow_non_covered_slots {
613            // Mark the current slot as scanned
614            mark_slot_as_scanned(scanned_slots_map, slot);
615            slot = next_slot(scanned_slots_map).unwrap();
616        } else {
617            // Error if slots are not covered and scanning is not allowed
618            return Err(Error::from((
619                    ErrorKind::NotAllSlotsCovered,
620                    "Could not find an address covering a slot, SCAN operation cannot continue \n 
621                    If you want to continue scanning even if some slots are not covered, set allow_non_covered_slots to true \n 
622                    Note that this may lead to incomplete scanning, and the SCAN operation lose its all guarantees ",
623                )));
624        }
625    }
626}
627
628/// Get the next slot to be scanned based on the scanned slots map.
629/// If all slots have been scanned, the method returns [`END_OF_SCAN`].
630fn next_slot(scanned_slots_map: &SlotsBitsArray) -> Option<u16> {
631    let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX);
632    if all_slots_scanned {
633        return Some(END_OF_SCAN);
634    }
635    for (i, slot) in scanned_slots_map.iter().enumerate() {
636        let mut mask = 1;
637        for j in 0..BITS_PER_U64 {
638            if (slot & mask) == 0 {
639                return Some(i as u16 * BITS_PER_U64 + j);
640            }
641            mask <<= 1;
642        }
643    }
644    None
645}
646
647/// Performs a cluster-wide `SCAN` operation.
648///
649/// This function scans the cluster for keys based on the provided arguments.
650/// It handles the initiation of a new scan or continues an existing scan, manages
651/// scan state, handles routing failures, and ensures consistent scanning across
652/// cluster topology changes.
653///
654/// # Arguments
655///
656/// * `core` - An `Arc`-wrapped reference to the cluster connection (`InnerCore<C>`).
657/// * `cluster_scan_args` - Configuration and arguments for the scan operation.
658///
659/// # Returns
660///
661/// * `Result<(ScanStateRC, Vec<Value>)>` -
662///   - On success: A tuple containing the updated scan state (`ScanStateRC`) and a vector of `Value`s representing the found keys.
663///   - On failure: A `Error` detailing the reason for the failure.
664///
665/// # Type Parameters
666///
667/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`.
668///
669pub(crate) async fn cluster_scan<C>(
670    core: Arc<InnerCore<C>>,
671    cluster_scan_args: ClusterScanArgs,
672) -> Result<(ScanStateRC, Vec<Value>)>
673where
674    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
675{
676    // Extract the current scan state cursor and the flag for non-covered slots
677    let scan_state_cursor = &cluster_scan_args.scan_state_cursor;
678    let allow_non_covered_slots = cluster_scan_args.allow_non_covered_slots;
679
680    // Determine the current scan state:
681    // - If an existing scan state is present, use it.
682    // - Otherwise, initiate a new scan.
683    let scan_state = match scan_state_cursor.state_from_wrapper() {
684        Some(state) => state,
685        None => match ScanState::initiate_scan(
686            &core,
687            allow_non_covered_slots,
688            cluster_scan_args.match_pattern.as_deref(),
689        )
690        .await
691        {
692            Ok(state) => state,
693            Err(err) => {
694                // Early return if initiating the scan fails
695                return Err(err);
696            }
697        },
698    };
699    // Send the SCAN command using the current scan state and scan arguments
700    let ((new_cursor, new_keys), mut scan_state) =
701        try_scan(&scan_state, &cluster_scan_args, core.clone()).await?;
702
703    // Check if the cursor indicates the end of the current scan segment
704    if new_cursor == 0 {
705        // Update the scan state to move to the next address/node in the cluster
706        scan_state = scan_state
707            .create_updated_scan_state_for_completed_address(core, allow_non_covered_slots)
708            .await?;
709    }
710
711    // Verify if the entire cluster has been scanned
712    if scan_state.scan_status == ScanStateStage::Finished {
713        // Return the final scan state and the collected keys
714        return Ok((ScanStateRC::create_finished(), new_keys));
715    }
716
717    // Update the scan state with the new cursor and maintain the progress
718    scan_state = ScanState::new(
719        new_cursor,
720        scan_state.scanned_slots_map,
721        scan_state.address_in_scan,
722        scan_state.address_epoch,
723        ScanStateStage::InProgress,
724    );
725
726    // Return the updated scan state and the newly found keys
727    Ok((ScanStateRC::from_scan_state(scan_state), new_keys))
728}
729
730/// Sends the `SCAN` command to the specified address.
731///
732/// # Arguments
733///
734/// * `scan_state` - The current scan state.
735/// * `cluster_scan_args` - Arguments for the scan operation, including match pattern, count, object type, and allow_non_covered_slots.
736/// * `core` - The cluster connection.
737///
738/// # Returns
739///
740/// A `Result` containing the response from the `SCAN` command.
741async fn send_scan<C>(
742    scan_state: &ScanState,
743    cluster_scan_args: &ClusterScanArgs,
744    core: Arc<InnerCore<C>>,
745) -> Result<Value>
746where
747    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
748{
749    if let Some(conn_future) = core
750        .connection_for_address(&scan_state.address_in_scan)
751        .await
752    {
753        let mut conn = conn_future.await;
754        let mut scan_command = cmd("SCAN");
755        scan_command.arg(scan_state.cursor);
756        if let Some(match_pattern) = cluster_scan_args.match_pattern.as_ref() {
757            scan_command.arg("MATCH").arg(match_pattern);
758        }
759        if let Some(count) = cluster_scan_args.count {
760            scan_command.arg("COUNT").arg(count);
761        }
762        if let Some(object_type) = &cluster_scan_args.object_type {
763            scan_command.arg("TYPE").arg(object_type.to_string());
764        }
765        conn.req_packed_command(&scan_command).await
766    } else {
767        Err(Error::from((
768            ErrorKind::ConnectionNotFoundForRoute,
769            "Cluster scan failed. No connection available for address: ",
770            format!("{}", scan_state.address_in_scan),
771        )))
772    }
773}
774
775/// Checks if the error is retryable during scanning.
776/// Retryable errors include network issues, cluster topology changes, and unavailable connections.
777/// Scan operations are not keyspace operations, so they are not affected by keyspace errors like `MOVED`.
778fn is_scanwise_retryable_error(err: &Error) -> bool {
779    matches!(
780        err.kind(),
781        ErrorKind::IoError
782            | ErrorKind::AllConnectionsUnavailable
783            | ErrorKind::ConnectionNotFoundForRoute
784            | ErrorKind::ClusterDown
785            | ErrorKind::FatalSendError
786    )
787}
788
789/// Gets the next scan state by finding the next address to scan.
790/// The method updates the scanned slots map and retrieves the next address to scan.
791/// If the address epoch has changed, the method creates a new scan state without updating the scanned slots map.
792/// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address.
793/// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch.
794/// The method is used to continue scanning the cluster after completing a scan segment.
795async fn next_scan_state<C>(
796    core: &Arc<InnerCore<C>>,
797    scan_state: &ScanState,
798    cluster_scan_args: &ClusterScanArgs,
799) -> Result<Option<ScanState>>
800where
801    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
802{
803    let next_slot = next_slot(&scan_state.scanned_slots_map).unwrap_or(0);
804    let mut scanned_slots_map = scan_state.scanned_slots_map;
805    match next_address_to_scan(
806        core,
807        next_slot,
808        &mut scanned_slots_map,
809        cluster_scan_args.allow_non_covered_slots,
810    )
811    .await
812    {
813        Ok(NextNodeResult::Address(new_address)) => {
814            let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0);
815            Ok(Some(ScanState::new(
816                0,
817                scanned_slots_map,
818                new_address,
819                new_epoch,
820                ScanStateStage::InProgress,
821            )))
822        }
823        Ok(NextNodeResult::AllSlotsCompleted) => Ok(None),
824        Err(err) => Err(err),
825    }
826}
827
828/// Attempts to scan the cluster for keys based on the current scan state.
829/// Sends the `SCAN` command to the current address and processes the response.
830/// On retryable errors, refreshes the cluster topology and retries the scan.
831/// Returns the new cursor and keys found upon success.
832async fn try_scan<C>(
833    scan_state: &ScanState,
834    cluster_scan_args: &ClusterScanArgs,
835    core: Arc<InnerCore<C>>,
836) -> Result<((u64, Vec<Value>), ScanState)>
837where
838    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
839{
840    let mut new_scan_state = scan_state.clone();
841    const MAX_SCAN_RETRIES: usize = 10;
842    let mut retries = 0;
843
844    loop {
845        match send_scan(&new_scan_state, cluster_scan_args, core.clone()).await {
846            Ok(scan_response) => {
847                let (new_cursor, new_keys) =
848                    from_value::<(u64, Vec<Value>)>(&scan_response)?;
849                return Ok(((new_cursor, new_keys), new_scan_state));
850            }
851            Err(err) if is_scanwise_retryable_error(&err) => {
852                retries += 1;
853                if retries > MAX_SCAN_RETRIES {
854                    return Err(Error::from((
855                        ErrorKind::AllConnectionsUnavailable,
856                        "Cluster scan exceeded maximum retry count",
857                        format!(
858                            "Failed after {} retries. Last error: {}",
859                            MAX_SCAN_RETRIES, err
860                        ),
861                    )));
862                }
863
864                ClusterConnInner::check_topology_and_refresh_if_diff(
865                    core.clone(),
866                    &RefreshPolicy::NotThrottable,
867                )
868                .await?;
869
870                if let Some(next_scan_state) =
871                    next_scan_state(&core, &new_scan_state, cluster_scan_args).await?
872                {
873                    new_scan_state = next_scan_state;
874                } else {
875                    return Ok(((0, Vec::new()), ScanState::create_finished_state()));
876                }
877            }
878            Err(err) => return Err(err),
879        }
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886
887    #[tokio::test]
888    async fn test_cluster_scan_args_builder() {
889        let args = ClusterScanArgs::builder()
890            .with_match_pattern("user:*")
891            .with_count(100)
892            .with_object_type(ObjectType::Hash)
893            .allow_non_covered_slots(true)
894            .build();
895
896        assert_eq!(args.match_pattern, Some(b"user:*".to_vec()));
897        assert_eq!(args.count, Some(100));
898        assert_eq!(args.object_type, Some(ObjectType::Hash));
899        assert!(args.allow_non_covered_slots);
900    }
901
902    #[tokio::test]
903    async fn test_scan_state_new() {
904        let address = Arc::new("127.0.0.1:6379".to_string());
905        let scan_state = ScanState::new(
906            0,
907            [0; BITS_ARRAY_SIZE as usize],
908            address.clone(),
909            1,
910            ScanStateStage::InProgress,
911        );
912
913        assert_eq!(scan_state.cursor, 0);
914        assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]);
915        assert_eq!(scan_state.address_in_scan, address);
916        assert_eq!(scan_state.address_epoch, 1);
917        assert_eq!(scan_state.scan_status, ScanStateStage::InProgress);
918    }
919
920    #[tokio::test]
921    async fn test_scan_state_create_finished() {
922        let scan_state = ScanState::create_finished_state();
923
924        assert_eq!(scan_state.cursor, 0);
925        assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]);
926        assert_eq!(scan_state.address_in_scan, Arc::new(String::new()));
927        assert_eq!(scan_state.address_epoch, 0);
928        assert_eq!(scan_state.scan_status, ScanStateStage::Finished);
929    }
930
931    #[tokio::test]
932    async fn test_mark_slot_as_scanned() {
933        let mut scanned_slots_map = [0; BITS_ARRAY_SIZE as usize];
934        mark_slot_as_scanned(&mut scanned_slots_map, 5);
935
936        assert_eq!(scanned_slots_map[0], 1 << 5);
937    }
938
939    #[test]
940    fn test_hashtag_from_pattern() {
941        assert_eq!(hashtag_from_pattern(b"ff:idx:{abc}:lane:*"), Some(&b"abc"[..]));
942        assert_eq!(hashtag_from_pattern(b"no-tag-here:*"), None);
943        assert_eq!(hashtag_from_pattern(b"{}empty"), None);
944        assert_eq!(hashtag_from_pattern(b"{only-open"), None);
945        // Only first occurrence considered.
946        assert_eq!(hashtag_from_pattern(b"{first}and{second}"), Some(&b"first"[..]));
947    }
948
949    #[test]
950    fn test_unmark_slot_as_scanned() {
951        let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
952        unmark_slot_as_scanned(&mut map, 5);
953        // Slot 5 cleared, everything else set.
954        assert_eq!(map[0], !(1u64 << 5));
955        for word in &map[1..] {
956            assert_eq!(*word, u64::MAX);
957        }
958        // next_slot should now return exactly 5.
959        assert_eq!(next_slot(&map), Some(5));
960    }
961
962    #[test]
963    fn test_hashtag_scoped_bitmap_finishes_after_pinned_slot() {
964        // Simulate the hash-tag-scoped initial bitmap: every slot
965        // marked scanned except the pinned one. Once the pinned
966        // node's slots (a super-set including the pinned slot)
967        // get marked scanned by the completed-address path,
968        // next_slot returns END_OF_SCAN so the scan finishes on
969        // a single node rather than iterating every primary.
970        let pinned = get_slot(b"abc");
971        let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
972        unmark_slot_as_scanned(&mut map, pinned);
973        assert_eq!(next_slot(&map), Some(pinned));
974        mark_slot_as_scanned(&mut map, pinned);
975        assert_eq!(next_slot(&map), Some(END_OF_SCAN));
976    }
977
978    #[tokio::test]
979    async fn test_next_slot() {
980        let scan_state = ScanState::new(
981            0,
982            [0; BITS_ARRAY_SIZE as usize],
983            Arc::new("127.0.0.1:6379".to_string()),
984            1,
985            ScanStateStage::InProgress,
986        );
987        let next_slot = next_slot(&scan_state.scanned_slots_map);
988
989        assert_eq!(next_slot, Some(0));
990    }
991}