ferriskey/cluster/scan.rs
1//! This module implements cluster-wide scanning operations for clusters.
2//!
3//! # Overview
4//!
5//! The module provides functionality to scan keys across all nodes in a cluster,
6//! handling topology changes, failovers, and partial cluster coverage scenarios.
7//! It maintains state between scan iterations and ensures consistent scanning even
8//! when cluster topology changes.
9//!
10//! # Key Components
11//!
12//! - [`ClusterScanArgs`]: Configuration for scan operations including filtering and behavior options
13//! - [`ScanStateRC`]: Thread-safe reference-counted wrapper for scan state management
14//! - [`ScanState`]: Internal state tracking for cluster-wide scanning progress
15//! - [`ObjectType`]: Supported data types for filtering scan results
16//!
17//! # Key Features
18//!
19//! - Resumable scanning across cluster nodes
20//! - Automatic handling of cluster topology changes
21//! - Support for all regular SCAN options
22//! - Resilient to node failures and resharding
23//!
24//! # Implementation Details
25//!
26//! The scanning process is implemented using a bitmap to track scanned slots and
27//! maintains epoch information to handle topology changes. The implementation:
28//!
29//! - Uses a 64-bit aligned bitmap for efficient slot tracking
30//! - Maintains cursor position per node
31//! - Handles partial cluster coverage scenarios
32//! - Provides automatic recovery from node failures
33//! - Ensures consistent scanning across topology changes
34//!
35//! # Error Handling
36//!
37//! The module handles various error scenarios including:
38//! - Node failures during scanning
39//! - Cluster topology changes
40//! - Network connectivity issues
41//! - Invalid routing scenarios
42
43use crate::cluster::routing::SlotAddr;
44use crate::cluster::topology::{SLOT_SIZE, get_slot};
45use crate::cluster::{ClusterConnInner, Connect, InnerCore, RefreshPolicy};
46use crate::cmd::cmd;
47use crate::connection::ConnectionLike;
48use crate::value::{ErrorKind, Error, Result, Value, from_value};
49use std::sync::Arc;
50use strum_macros::{Display, EnumString};
51
52const BITS_PER_U64: u16 = u64::BITS as u16;
53const NUM_OF_SLOTS: u16 = SLOT_SIZE;
54const BITS_ARRAY_SIZE: u16 = NUM_OF_SLOTS / BITS_PER_U64;
55const END_OF_SCAN: u16 = NUM_OF_SLOTS;
56type SlotsBitsArray = [u64; BITS_ARRAY_SIZE as usize];
57
58/// Holds configuration for a cluster scan operation.
59///
60/// # Fields
61/// - `scan_state_cursor`: Internal state tracking scan progress
62/// - `match_pattern`: Optional pattern to filter keys
63/// - `count`: Optional limit on number of keys returned per iteration
64/// - `object_type`: Optional filter for specific data types
65/// - `allow_non_covered_slots`: Whether to continue if some slots are uncovered
66///
67/// See examples below for usage with the builder pattern.
68/// # Examples
69///
70/// Create a new `ClusterScanArgs` instance using the builder pattern:
71///
72/// ```rust,no_run,ignore
73/// use ferriskey::ClusterScanArgs;
74/// use ferriskey::ObjectType;
75///
76/// // Create basic scan args with defaults
77/// let basic_scan = ClusterScanArgs::builder().build();
78///
79/// // Create scan args with custom options
80/// let custom_scan = ClusterScanArgs::builder()
81/// .with_match_pattern("user:*") // Match keys starting with "user:"
82/// .with_count(100) // Return 100 keys per iteration
83/// .with_object_type(ObjectType::Hash) // Only scan hash objects
84/// .allow_non_covered_slots(true) // Continue scanning even if some slots aren't covered
85/// .build();
86///
87/// // The builder can be used to create multiple configurations
88/// let another_scan = ClusterScanArgs::builder()
89/// .with_match_pattern("session:*")
90/// .with_object_type(ObjectType::String)
91/// .build();
92/// ```ignore
93
94#[derive(Clone, Default)]
95pub struct ClusterScanArgs {
96 /// Reference-counted scan state cursor, managed internally.
97 pub scan_state_cursor: ScanStateRC,
98
99 /// Optional pattern to match keys during the scan.
100 pub match_pattern: Option<Vec<u8>>,
101
102 /// A "hint" to the cluster of how much keys to return per scan iteration, if none is sent to the server, the default value is 10.
103 pub count: Option<u32>,
104
105 /// Optional filter to include only keys of a specific data type.
106 pub object_type: Option<ObjectType>,
107
108 /// Flag indicating whether to allow scanning when there are slots not covered by the cluster, by default it is set to false and the scan will stop if some slots are not covered.
109 pub allow_non_covered_slots: bool,
110}
111
112impl ClusterScanArgs {
113 /// Creates a new [`ClusterScanArgsBuilder`] instance.
114 ///
115 /// # Returns
116 ///
117 /// A [`ClusterScanArgsBuilder`] instance for configuring cluster scan arguments.
118 pub fn builder() -> ClusterScanArgsBuilder {
119 ClusterScanArgsBuilder::default()
120 }
121 pub(crate) fn set_scan_state_cursor(&mut self, scan_state_cursor: ScanStateRC) {
122 self.scan_state_cursor = scan_state_cursor;
123 }
124}
125
126#[derive(Default)]
127/// Builder pattern for creating cluster scan arguments.
128///
129/// This struct allows configuring various parameters for scanning keys in a cluster:
130/// * Pattern matching for key filtering
131/// * Count limit for returned keys
132/// * Filtering by object type
133/// * Control over scanning non-covered slots
134///
135/// # Example
136/// ```ignore
137/// use ferriskey::{ClusterScanArgs, ObjectType};
138///
139/// let args = ClusterScanArgs::builder()
140/// .with_match_pattern(b"user:*")
141/// .with_count(100)
142/// .with_object_type(ObjectType::Hash)
143/// .build();
144/// ```ignore
145pub struct ClusterScanArgsBuilder {
146 /// By default, the match pattern is set to `None` and no filtering is applied.
147 match_pattern: Option<Vec<u8>>,
148 /// A "hint" to the cluster of how much keys to return per scan iteration, by default none is sent to the server, the default value is 10.
149 count: Option<u32>,
150 /// By default, the object type is set to `None` and no filtering is applied.
151 object_type: Option<ObjectType>,
152 /// By default, the flag to allow scanning non-covered slots is set to `false`, meaning scanning will stop if some slots are not covered.
153 allow_non_covered_slots: Option<bool>,
154}
155
156impl ClusterScanArgsBuilder {
157 /// Sets the match pattern for the scan operation.
158 ///
159 /// # Arguments
160 ///
161 /// * `pattern` - The pattern to match keys against.
162 ///
163 /// # Returns
164 ///
165 /// The updated [`ClusterScanArgsBuilder`] instance.
166 pub fn with_match_pattern<T: Into<Vec<u8>>>(mut self, pattern: T) -> Self {
167 self.match_pattern = Some(pattern.into());
168 self
169 }
170
171 /// Sets the count for the scan operation.
172 ///
173 /// # Arguments
174 ///
175 /// * `count` - A "hint" to the cluster of how much keys to return per scan iteration.
176 ///
177 /// The actual number of keys returned may be less or more than the count specified.
178 ///
179 /// 4,294,967,295 is the maximum keys possible in a cluster, so higher values will be capped.
180 /// Hence the count is represented as a `u32` instead of `usize`.
181 ///
182 /// The default value is 10, if nothing is sent to the server, meaning nothing set in the builder.
183 ///
184 /// # Returns
185 ///
186 /// The updated [`ClusterScanArgsBuilder`] instance.
187 pub fn with_count(mut self, count: u32) -> Self {
188 self.count = Some(count);
189 self
190 }
191
192 /// Sets the object type for the scan operation.
193 ///
194 /// # Arguments
195 ///
196 /// * `object_type` - The type of object to filter keys by.
197 ///
198 /// See [`ObjectType`] for supported data types.
199 ///
200 /// # Returns
201 ///
202 /// The updated [`ClusterScanArgsBuilder`] instance.
203 pub fn with_object_type(mut self, object_type: ObjectType) -> Self {
204 self.object_type = Some(object_type);
205 self
206 }
207
208 /// Sets the flag to allow scanning non-covered slots.
209 ///
210 /// # Arguments
211 ///
212 /// * `allow` - A boolean flag indicating whether to allow scanning non-covered slots.
213 ///
214 /// # Returns
215 ///
216 /// The updated [`ClusterScanArgsBuilder`] instance.
217 pub fn allow_non_covered_slots(mut self, allow: bool) -> Self {
218 self.allow_non_covered_slots = Some(allow);
219 self
220 }
221
222 /// Builds the [`ClusterScanArgs`] instance with the provided configuration.
223 ///
224 /// # Returns
225 ///
226 /// A [`ClusterScanArgs`] instance with the configured options.
227 pub fn build(self) -> ClusterScanArgs {
228 ClusterScanArgs {
229 scan_state_cursor: ScanStateRC::new(),
230 match_pattern: self.match_pattern,
231 count: self.count,
232 object_type: self.object_type,
233 allow_non_covered_slots: self.allow_non_covered_slots.unwrap_or(false),
234 }
235 }
236}
237
238/// Represents the type of an object used to filter keys by data type.
239///
240/// This enum is used with the `TYPE` option in the `SCAN` command to
241/// filter keys by their data type.
242#[derive(Debug, Clone, Display, PartialEq, EnumString)]
243pub enum ObjectType {
244 /// String data type.
245 String,
246 /// List data type.
247 List,
248 /// Set data type.
249 Set,
250 /// Sorted set data type.
251 ZSet,
252 /// Hash data type.
253 Hash,
254 /// Stream data type.
255 Stream,
256}
257
258impl From<String> for ObjectType {
259 fn from(s: String) -> Self {
260 match s.to_lowercase().as_str() {
261 "string" => ObjectType::String,
262 "list" => ObjectType::List,
263 "set" => ObjectType::Set,
264 "zset" => ObjectType::ZSet,
265 "hash" => ObjectType::Hash,
266 "stream" => ObjectType::Stream,
267 _ => ObjectType::String,
268 }
269 }
270}
271
272#[derive(PartialEq, Debug, Clone, Default)]
273pub enum ScanStateStage {
274 #[default]
275 Initiating,
276 InProgress,
277 Finished,
278}
279
280/// Wrapper struct for managing the state of a cluster scan operation.
281///
282/// This struct holds an `Arc` to the actual scan state and a status indicating
283/// whether the scan is initiating, in progress, or finished.
284#[derive(Debug, Clone, Default)]
285pub struct ScanStateRC {
286 scan_state_rc: Arc<Option<ScanState>>,
287 status: ScanStateStage,
288}
289
290impl ScanStateRC {
291 /// Creates a new instance of [`ScanStateRC`] from a given [`ScanState`].
292 fn from_scan_state(scan_state: ScanState) -> Self {
293 Self {
294 scan_state_rc: Arc::new(Some(scan_state)),
295 status: ScanStateStage::InProgress,
296 }
297 }
298
299 /// Creates a new instance of [`ScanStateRC`].
300 ///
301 /// This method initializes the [`ScanStateRC`] with a reference to a [`ScanState`] that is initially set to `None`.
302 /// An empty ScanState is equivalent to a 0 cursor.
303 pub fn new() -> Self {
304 Self {
305 scan_state_rc: Arc::new(None),
306 status: ScanStateStage::Initiating,
307 }
308 }
309 /// create a new instance of [`ScanStateRC`] with finished state and empty scan state.
310 fn create_finished() -> Self {
311 Self {
312 scan_state_rc: Arc::new(None),
313 status: ScanStateStage::Finished,
314 }
315 }
316 /// Returns `true` if the scan state is finished.
317 pub fn is_finished(&self) -> bool {
318 self.status == ScanStateStage::Finished
319 }
320
321 /// Returns a clone of the scan state, if it exist.
322 pub(crate) fn state_from_wrapper(&self) -> Option<ScanState> {
323 if self.status == ScanStateStage::Initiating || self.status == ScanStateStage::Finished {
324 None
325 } else {
326 self.scan_state_rc.as_ref().clone()
327 }
328 }
329}
330
331/// Represents the state of a cluster scan operation.
332///
333/// This struct keeps track of the current cursor, which slots have been scanned,
334/// the address currently being scanned, and the epoch of that address.
335#[derive(PartialEq, Debug, Clone)]
336pub(crate) struct ScanState {
337 // the real cursor in the scan operation
338 cursor: u64,
339 // a map of the slots that have been scanned
340 scanned_slots_map: SlotsBitsArray,
341 // the address that is being scanned currently, based on the next slot set to 0 in the scanned_slots_map, and the address that "owns" the slot
342 // in the SlotMap
343 pub(crate) address_in_scan: Arc<String>,
344 // epoch represent the version of the address, when a failover happens or slots migrate in the epoch will be updated to +1
345 address_epoch: u64,
346 // the status of the scan operation
347 scan_status: ScanStateStage,
348}
349
350impl ScanState {
351 /// Create a new instance of ScanState.
352 ///
353 /// # Arguments
354 ///
355 /// * `cursor` - The cursor position.
356 /// * `scanned_slots_map` - The scanned slots map.
357 /// * `address_in_scan` - The address being scanned.
358 /// * `address_epoch` - The epoch of the address being scanned.
359 /// * `scan_status` - The status of the scan operation.
360 ///
361 /// # Returns
362 ///
363 /// A new instance of ScanState.
364 pub fn new(
365 cursor: u64,
366 scanned_slots_map: SlotsBitsArray,
367 address_in_scan: Arc<String>,
368 address_epoch: u64,
369 scan_status: ScanStateStage,
370 ) -> Self {
371 Self {
372 cursor,
373 scanned_slots_map,
374 address_in_scan,
375 address_epoch,
376 scan_status,
377 }
378 }
379
380 fn create_finished_state() -> Self {
381 Self {
382 cursor: 0,
383 scanned_slots_map: [0; BITS_ARRAY_SIZE as usize],
384 address_in_scan: Default::default(),
385 address_epoch: 0,
386 scan_status: ScanStateStage::Finished,
387 }
388 }
389
390 /// Initialize a new scan operation.
391 /// This method creates a new scan state with the cursor set to 0, the scanned slots map initialized to 0,
392 /// and the address set to the address associated with slot 0.
393 /// The address epoch is set to the epoch of the address.
394 /// If the address epoch cannot be retrieved, the method returns an error.
395 async fn initiate_scan<C>(
396 core: &InnerCore<C>,
397 allow_non_covered_slots: bool,
398 match_pattern: Option<&[u8]>,
399 ) -> Result<ScanState>
400 where
401 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
402 {
403 // Hash-tag-scoped scan: if the match pattern embeds a
404 // Valkey hash tag (`{tag}`), every matching key hashes
405 // into the single slot owned by that tag. Pre-mark every
406 // other slot as scanned so the scan only traverses the
407 // one primary that owns the tag's slot and finishes
408 // after that node's cursor wraps. Without this, ferriskey
409 // iterates every primary serially — correct, but wasteful
410 // when the caller has already pinned a hash tag.
411 let (mut new_scanned_slots_map, start_slot) = match match_pattern
412 .and_then(hashtag_from_pattern)
413 {
414 Some(tag) => {
415 let pinned = get_slot(tag);
416 let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
417 unmark_slot_as_scanned(&mut map, pinned);
418 (map, pinned)
419 }
420 None => ([0; BITS_ARRAY_SIZE as usize], 0),
421 };
422 let new_cursor = 0;
423 let address = next_address_to_scan(
424 core,
425 start_slot,
426 &mut new_scanned_slots_map,
427 allow_non_covered_slots,
428 )
429 .await?;
430
431 match address {
432 NextNodeResult::AllSlotsCompleted => Ok(ScanState::create_finished_state()),
433 NextNodeResult::Address(address) => {
434 let address_epoch = core.address_epoch(&address).await.unwrap_or(0);
435 Ok(ScanState::new(
436 new_cursor,
437 new_scanned_slots_map,
438 address,
439 address_epoch,
440 ScanStateStage::InProgress,
441 ))
442 }
443 }
444 }
445
446 /// Update the scan state without updating the scanned slots map.
447 /// This method is used when the address epoch has changed, and we can't determine which slots are new.
448 /// In this case, we skip updating the scanned slots map and only update the address and cursor.
449 async fn new_scan_state<C>(
450 &self,
451 core: Arc<InnerCore<C>>,
452 allow_non_covered_slots: bool,
453 new_scanned_slots_map: Option<SlotsBitsArray>,
454 ) -> Result<ScanState>
455 where
456 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
457 {
458 // If the new scanned slots map is not provided, use the current scanned slots map.
459 // The new scanned slots map is provided in the general case when the address epoch has not changed,
460 // meaning that we could safely update the scanned slots map with the slots owned by the node.
461 // Epoch change means that some slots are new, and we can't determine which slots been there from the beginning and which are new.
462 let mut scanned_slots_map = new_scanned_slots_map.unwrap_or(self.scanned_slots_map);
463 let next_slot = next_slot(&scanned_slots_map).unwrap_or(0);
464 match next_address_to_scan(
465 &core,
466 next_slot,
467 &mut scanned_slots_map,
468 allow_non_covered_slots,
469 )
470 .await
471 {
472 Ok(NextNodeResult::Address(new_address)) => {
473 let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0);
474 Ok(ScanState::new(
475 0,
476 scanned_slots_map,
477 new_address,
478 new_epoch,
479 ScanStateStage::InProgress,
480 ))
481 }
482 Ok(NextNodeResult::AllSlotsCompleted) => Ok(ScanState::create_finished_state()),
483 Err(err) => Err(err),
484 }
485 }
486
487 /// Update the scan state and get the next address to scan.
488 /// This method is called when the cursor reaches 0, indicating that the current address has been scanned.
489 /// This method updates the scan state based on the scanned slots map and retrieves the next address to scan.
490 /// If the address epoch has changed, the method skips updating the scanned slots map and only updates the address and cursor.
491 /// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address.
492 /// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch.
493 async fn create_updated_scan_state_for_completed_address<C>(
494 &mut self,
495 core: Arc<InnerCore<C>>,
496 allow_non_covered_slots: bool,
497 ) -> Result<ScanState>
498 where
499 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
500 {
501 ClusterConnInner::check_topology_and_refresh_if_diff(
502 core.clone(),
503 &RefreshPolicy::NotThrottable,
504 )
505 .await?;
506
507 let mut scanned_slots_map = self.scanned_slots_map;
508 // If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
509 // In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
510 let new_address_epoch = core.address_epoch(&self.address_in_scan).await.unwrap_or(0);
511 if new_address_epoch != self.address_epoch {
512 return self
513 .new_scan_state(core, allow_non_covered_slots, None)
514 .await;
515 }
516 // If epoch wasn't changed, the slots owned by the address after the refresh are all valid as slots that been scanned
517 // So we will update the scanned_slots_map with the slots owned by the address
518 let slots_scanned = core.slots_of_address(self.address_in_scan.clone()).await;
519 for slot in slots_scanned {
520 mark_slot_as_scanned(&mut scanned_slots_map, slot);
521 }
522 // Get the next address to scan and its param base on the next slot set to 0 in the scanned_slots_map
523 self.new_scan_state(core, allow_non_covered_slots, Some(scanned_slots_map))
524 .await
525 }
526}
527
528fn mark_slot_as_scanned(scanned_slots_map: &mut SlotsBitsArray, slot: u16) {
529 let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize;
530 let slot_bit = slot as u64 % (BITS_PER_U64 as u64);
531 scanned_slots_map[slot_index] |= 1 << slot_bit;
532}
533
534/// Clear the bit for `slot` in the bitmap, marking it as NOT
535/// scanned. Used by hash-tag-scoped initiation, where the bitmap
536/// starts with every slot marked scanned except the pinned slot
537/// so `next_address_to_scan` locates only the primary that owns
538/// the hash tag.
539fn unmark_slot_as_scanned(scanned_slots_map: &mut SlotsBitsArray, slot: u16) {
540 let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize;
541 let slot_bit = slot as u64 % (BITS_PER_U64 as u64);
542 scanned_slots_map[slot_index] &= !(1 << slot_bit);
543}
544
545/// Extract the bytes of the first Valkey hash tag (`{...}`) in a
546/// MATCH pattern, or `None` if absent or empty. Matches the
547/// tag-extraction logic in [`crate::cluster::topology`] used for
548/// slot routing so scan pinning and key routing agree.
549fn hashtag_from_pattern(pattern: &[u8]) -> Option<&[u8]> {
550 let open = pattern.iter().position(|b| *b == b'{')?;
551 let close_rel = pattern[open + 1..].iter().position(|b| *b == b'}')?;
552 let tag = &pattern[open + 1..open + 1 + close_rel];
553 (!tag.is_empty()).then_some(tag)
554}
555
556#[derive(PartialEq, Debug, Clone)]
557/// The address type representing a connection address
558///
559/// # Fields
560///
561/// * `Address` - A thread-safe shared string containing the server address
562/// * `AllSlotsCompleted` - Indicates that all slots have been scanned
563enum NextNodeResult {
564 Address(Arc<String>),
565 AllSlotsCompleted,
566}
567
568/// Determines the next node address to scan within the cluster.
569///
570/// This asynchronous function iterates through cluster slots to find the next available
571/// node responsible for scanning. If a slot is not covered and `allow_non_covered_slots`
572/// is enabled, it marks the slot as scanned and proceeds to the next one. The process
573/// continues until a valid address is found or all slots have been scanned.
574///
575/// # Arguments
576///
577/// * `core` - Reference to the cluster's inner core connection.
578/// * `slot` - The current slot number to scan.
579/// * `scanned_slots_map` - Mutable reference to the bitmap tracking scanned slots.
580/// * `allow_non_covered_slots` - Flag indicating whether to allow scanning of uncovered slots.
581///
582/// # Returns
583///
584/// * `Result<NextNodeResult>` - Returns the next node address to scan or indicates completion.
585///
586/// # Type Parameters
587///
588/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`.
589///
590async fn next_address_to_scan<C>(
591 core: &InnerCore<C>,
592 mut slot: u16,
593 scanned_slots_map: &mut SlotsBitsArray,
594 allow_non_covered_slots: bool,
595) -> Result<NextNodeResult>
596where
597 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
598{
599 loop {
600 if slot == END_OF_SCAN {
601 return Ok(NextNodeResult::AllSlotsCompleted);
602 }
603
604 if let Some(addr) = core
605 .conn_lock
606 .read()
607 .slot_map
608 .node_address_for_slot(slot, SlotAddr::ReplicaRequired)
609 {
610 // Found a valid address for the slot
611 return Ok(NextNodeResult::Address(addr));
612 } else if allow_non_covered_slots {
613 // Mark the current slot as scanned
614 mark_slot_as_scanned(scanned_slots_map, slot);
615 slot = next_slot(scanned_slots_map).unwrap();
616 } else {
617 // Error if slots are not covered and scanning is not allowed
618 return Err(Error::from((
619 ErrorKind::NotAllSlotsCovered,
620 "Could not find an address covering a slot, SCAN operation cannot continue \n
621 If you want to continue scanning even if some slots are not covered, set allow_non_covered_slots to true \n
622 Note that this may lead to incomplete scanning, and the SCAN operation lose its all guarantees ",
623 )));
624 }
625 }
626}
627
628/// Get the next slot to be scanned based on the scanned slots map.
629/// If all slots have been scanned, the method returns [`END_OF_SCAN`].
630fn next_slot(scanned_slots_map: &SlotsBitsArray) -> Option<u16> {
631 let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX);
632 if all_slots_scanned {
633 return Some(END_OF_SCAN);
634 }
635 for (i, slot) in scanned_slots_map.iter().enumerate() {
636 let mut mask = 1;
637 for j in 0..BITS_PER_U64 {
638 if (slot & mask) == 0 {
639 return Some(i as u16 * BITS_PER_U64 + j);
640 }
641 mask <<= 1;
642 }
643 }
644 None
645}
646
647/// Performs a cluster-wide `SCAN` operation.
648///
649/// This function scans the cluster for keys based on the provided arguments.
650/// It handles the initiation of a new scan or continues an existing scan, manages
651/// scan state, handles routing failures, and ensures consistent scanning across
652/// cluster topology changes.
653///
654/// # Arguments
655///
656/// * `core` - An `Arc`-wrapped reference to the cluster connection (`InnerCore<C>`).
657/// * `cluster_scan_args` - Configuration and arguments for the scan operation.
658///
659/// # Returns
660///
661/// * `Result<(ScanStateRC, Vec<Value>)>` -
662/// - On success: A tuple containing the updated scan state (`ScanStateRC`) and a vector of `Value`s representing the found keys.
663/// - On failure: A `Error` detailing the reason for the failure.
664///
665/// # Type Parameters
666///
667/// * `C`: The connection type that must implement `ConnectionLike`, `Connect`, `Clone`, `Send`, `Sync`, and `'static`.
668///
669pub(crate) async fn cluster_scan<C>(
670 core: Arc<InnerCore<C>>,
671 cluster_scan_args: ClusterScanArgs,
672) -> Result<(ScanStateRC, Vec<Value>)>
673where
674 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
675{
676 // Extract the current scan state cursor and the flag for non-covered slots
677 let scan_state_cursor = &cluster_scan_args.scan_state_cursor;
678 let allow_non_covered_slots = cluster_scan_args.allow_non_covered_slots;
679
680 // Determine the current scan state:
681 // - If an existing scan state is present, use it.
682 // - Otherwise, initiate a new scan.
683 let scan_state = match scan_state_cursor.state_from_wrapper() {
684 Some(state) => state,
685 None => match ScanState::initiate_scan(
686 &core,
687 allow_non_covered_slots,
688 cluster_scan_args.match_pattern.as_deref(),
689 )
690 .await
691 {
692 Ok(state) => state,
693 Err(err) => {
694 // Early return if initiating the scan fails
695 return Err(err);
696 }
697 },
698 };
699 // Send the SCAN command using the current scan state and scan arguments
700 let ((new_cursor, new_keys), mut scan_state) =
701 try_scan(&scan_state, &cluster_scan_args, core.clone()).await?;
702
703 // Check if the cursor indicates the end of the current scan segment
704 if new_cursor == 0 {
705 // Update the scan state to move to the next address/node in the cluster
706 scan_state = scan_state
707 .create_updated_scan_state_for_completed_address(core, allow_non_covered_slots)
708 .await?;
709 }
710
711 // Verify if the entire cluster has been scanned
712 if scan_state.scan_status == ScanStateStage::Finished {
713 // Return the final scan state and the collected keys
714 return Ok((ScanStateRC::create_finished(), new_keys));
715 }
716
717 // Update the scan state with the new cursor and maintain the progress
718 scan_state = ScanState::new(
719 new_cursor,
720 scan_state.scanned_slots_map,
721 scan_state.address_in_scan,
722 scan_state.address_epoch,
723 ScanStateStage::InProgress,
724 );
725
726 // Return the updated scan state and the newly found keys
727 Ok((ScanStateRC::from_scan_state(scan_state), new_keys))
728}
729
730/// Sends the `SCAN` command to the specified address.
731///
732/// # Arguments
733///
734/// * `scan_state` - The current scan state.
735/// * `cluster_scan_args` - Arguments for the scan operation, including match pattern, count, object type, and allow_non_covered_slots.
736/// * `core` - The cluster connection.
737///
738/// # Returns
739///
740/// A `Result` containing the response from the `SCAN` command.
741async fn send_scan<C>(
742 scan_state: &ScanState,
743 cluster_scan_args: &ClusterScanArgs,
744 core: Arc<InnerCore<C>>,
745) -> Result<Value>
746where
747 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
748{
749 if let Some(conn_future) = core
750 .connection_for_address(&scan_state.address_in_scan)
751 .await
752 {
753 let mut conn = conn_future.await;
754 let mut scan_command = cmd("SCAN");
755 scan_command.arg(scan_state.cursor);
756 if let Some(match_pattern) = cluster_scan_args.match_pattern.as_ref() {
757 scan_command.arg("MATCH").arg(match_pattern);
758 }
759 if let Some(count) = cluster_scan_args.count {
760 scan_command.arg("COUNT").arg(count);
761 }
762 if let Some(object_type) = &cluster_scan_args.object_type {
763 scan_command.arg("TYPE").arg(object_type.to_string());
764 }
765 conn.req_packed_command(&scan_command).await
766 } else {
767 Err(Error::from((
768 ErrorKind::ConnectionNotFoundForRoute,
769 "Cluster scan failed. No connection available for address: ",
770 format!("{}", scan_state.address_in_scan),
771 )))
772 }
773}
774
775/// Checks if the error is retryable during scanning.
776/// Retryable errors include network issues, cluster topology changes, and unavailable connections.
777/// Scan operations are not keyspace operations, so they are not affected by keyspace errors like `MOVED`.
778fn is_scanwise_retryable_error(err: &Error) -> bool {
779 matches!(
780 err.kind(),
781 ErrorKind::IoError
782 | ErrorKind::AllConnectionsUnavailable
783 | ErrorKind::ConnectionNotFoundForRoute
784 | ErrorKind::ClusterDown
785 | ErrorKind::FatalSendError
786 )
787}
788
789/// Gets the next scan state by finding the next address to scan.
790/// The method updates the scanned slots map and retrieves the next address to scan.
791/// If the address epoch has changed, the method creates a new scan state without updating the scanned slots map.
792/// If the address epoch has not changed, the method updates the scanned slots map with the slots owned by the address.
793/// The method returns the new scan state with the updated cursor, scanned slots map, address, and epoch.
794/// The method is used to continue scanning the cluster after completing a scan segment.
795async fn next_scan_state<C>(
796 core: &Arc<InnerCore<C>>,
797 scan_state: &ScanState,
798 cluster_scan_args: &ClusterScanArgs,
799) -> Result<Option<ScanState>>
800where
801 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
802{
803 let next_slot = next_slot(&scan_state.scanned_slots_map).unwrap_or(0);
804 let mut scanned_slots_map = scan_state.scanned_slots_map;
805 match next_address_to_scan(
806 core,
807 next_slot,
808 &mut scanned_slots_map,
809 cluster_scan_args.allow_non_covered_slots,
810 )
811 .await
812 {
813 Ok(NextNodeResult::Address(new_address)) => {
814 let new_epoch = core.address_epoch(&new_address).await.unwrap_or(0);
815 Ok(Some(ScanState::new(
816 0,
817 scanned_slots_map,
818 new_address,
819 new_epoch,
820 ScanStateStage::InProgress,
821 )))
822 }
823 Ok(NextNodeResult::AllSlotsCompleted) => Ok(None),
824 Err(err) => Err(err),
825 }
826}
827
828/// Attempts to scan the cluster for keys based on the current scan state.
829/// Sends the `SCAN` command to the current address and processes the response.
830/// On retryable errors, refreshes the cluster topology and retries the scan.
831/// Returns the new cursor and keys found upon success.
832async fn try_scan<C>(
833 scan_state: &ScanState,
834 cluster_scan_args: &ClusterScanArgs,
835 core: Arc<InnerCore<C>>,
836) -> Result<((u64, Vec<Value>), ScanState)>
837where
838 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
839{
840 let mut new_scan_state = scan_state.clone();
841 const MAX_SCAN_RETRIES: usize = 10;
842 let mut retries = 0;
843
844 loop {
845 match send_scan(&new_scan_state, cluster_scan_args, core.clone()).await {
846 Ok(scan_response) => {
847 let (new_cursor, new_keys) =
848 from_value::<(u64, Vec<Value>)>(&scan_response)?;
849 return Ok(((new_cursor, new_keys), new_scan_state));
850 }
851 Err(err) if is_scanwise_retryable_error(&err) => {
852 retries += 1;
853 if retries > MAX_SCAN_RETRIES {
854 return Err(Error::from((
855 ErrorKind::AllConnectionsUnavailable,
856 "Cluster scan exceeded maximum retry count",
857 format!(
858 "Failed after {} retries. Last error: {}",
859 MAX_SCAN_RETRIES, err
860 ),
861 )));
862 }
863
864 ClusterConnInner::check_topology_and_refresh_if_diff(
865 core.clone(),
866 &RefreshPolicy::NotThrottable,
867 )
868 .await?;
869
870 if let Some(next_scan_state) =
871 next_scan_state(&core, &new_scan_state, cluster_scan_args).await?
872 {
873 new_scan_state = next_scan_state;
874 } else {
875 return Ok(((0, Vec::new()), ScanState::create_finished_state()));
876 }
877 }
878 Err(err) => return Err(err),
879 }
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886
887 #[tokio::test]
888 async fn test_cluster_scan_args_builder() {
889 let args = ClusterScanArgs::builder()
890 .with_match_pattern("user:*")
891 .with_count(100)
892 .with_object_type(ObjectType::Hash)
893 .allow_non_covered_slots(true)
894 .build();
895
896 assert_eq!(args.match_pattern, Some(b"user:*".to_vec()));
897 assert_eq!(args.count, Some(100));
898 assert_eq!(args.object_type, Some(ObjectType::Hash));
899 assert!(args.allow_non_covered_slots);
900 }
901
902 #[tokio::test]
903 async fn test_scan_state_new() {
904 let address = Arc::new("127.0.0.1:6379".to_string());
905 let scan_state = ScanState::new(
906 0,
907 [0; BITS_ARRAY_SIZE as usize],
908 address.clone(),
909 1,
910 ScanStateStage::InProgress,
911 );
912
913 assert_eq!(scan_state.cursor, 0);
914 assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]);
915 assert_eq!(scan_state.address_in_scan, address);
916 assert_eq!(scan_state.address_epoch, 1);
917 assert_eq!(scan_state.scan_status, ScanStateStage::InProgress);
918 }
919
920 #[tokio::test]
921 async fn test_scan_state_create_finished() {
922 let scan_state = ScanState::create_finished_state();
923
924 assert_eq!(scan_state.cursor, 0);
925 assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE as usize]);
926 assert_eq!(scan_state.address_in_scan, Arc::new(String::new()));
927 assert_eq!(scan_state.address_epoch, 0);
928 assert_eq!(scan_state.scan_status, ScanStateStage::Finished);
929 }
930
931 #[tokio::test]
932 async fn test_mark_slot_as_scanned() {
933 let mut scanned_slots_map = [0; BITS_ARRAY_SIZE as usize];
934 mark_slot_as_scanned(&mut scanned_slots_map, 5);
935
936 assert_eq!(scanned_slots_map[0], 1 << 5);
937 }
938
939 #[test]
940 fn test_hashtag_from_pattern() {
941 assert_eq!(hashtag_from_pattern(b"ff:idx:{abc}:lane:*"), Some(&b"abc"[..]));
942 assert_eq!(hashtag_from_pattern(b"no-tag-here:*"), None);
943 assert_eq!(hashtag_from_pattern(b"{}empty"), None);
944 assert_eq!(hashtag_from_pattern(b"{only-open"), None);
945 // Only first occurrence considered.
946 assert_eq!(hashtag_from_pattern(b"{first}and{second}"), Some(&b"first"[..]));
947 }
948
949 #[test]
950 fn test_unmark_slot_as_scanned() {
951 let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
952 unmark_slot_as_scanned(&mut map, 5);
953 // Slot 5 cleared, everything else set.
954 assert_eq!(map[0], !(1u64 << 5));
955 for word in &map[1..] {
956 assert_eq!(*word, u64::MAX);
957 }
958 // next_slot should now return exactly 5.
959 assert_eq!(next_slot(&map), Some(5));
960 }
961
962 #[test]
963 fn test_hashtag_scoped_bitmap_finishes_after_pinned_slot() {
964 // Simulate the hash-tag-scoped initial bitmap: every slot
965 // marked scanned except the pinned one. Once the pinned
966 // node's slots (a super-set including the pinned slot)
967 // get marked scanned by the completed-address path,
968 // next_slot returns END_OF_SCAN so the scan finishes on
969 // a single node rather than iterating every primary.
970 let pinned = get_slot(b"abc");
971 let mut map: SlotsBitsArray = [u64::MAX; BITS_ARRAY_SIZE as usize];
972 unmark_slot_as_scanned(&mut map, pinned);
973 assert_eq!(next_slot(&map), Some(pinned));
974 mark_slot_as_scanned(&mut map, pinned);
975 assert_eq!(next_slot(&map), Some(END_OF_SCAN));
976 }
977
978 #[tokio::test]
979 async fn test_next_slot() {
980 let scan_state = ScanState::new(
981 0,
982 [0; BITS_ARRAY_SIZE as usize],
983 Arc::new("127.0.0.1:6379".to_string()),
984 1,
985 ScanStateStage::InProgress,
986 );
987 let next_slot = next_slot(&scan_state.scanned_slots_map);
988
989 assert_eq!(next_slot, Some(0));
990 }
991}