Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	common::CommitVersion,
11	delta::Delta,
12	encoded::{
13		encoded::EncodedValues,
14		key::{EncodedKey, EncodedKeyRange},
15	},
16	event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17	interface::store::{
18		MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19		MultiVersionStore, MultiVersionValues,
20	},
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::instrument;
24
25use super::{
26	StandardMultiStore,
27	router::{classify_key, is_single_version_semantics_key},
28	version::{VersionedGetResult, get_at_version},
29	worker::{DropMessage, DropRequest},
30};
31use crate::tier::{EntryKind, EntryKind::Multi, RangeCursor, TierStorage};
32
33/// Fixed chunk size for internal tier scans.
34/// This is the number of versioned entries fetched per tier per iteration.
35const TIER_SCAN_CHUNK_SIZE: usize = 4096;
36
37impl MultiVersionGet for StandardMultiStore {
38	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
39	fn get(&self, key: &EncodedKey, version: CommitVersion) -> crate::Result<Option<MultiVersionValues>> {
40		let table = classify_key(key);
41
42		// Try hot tier first
43		if let Some(hot) = &self.hot {
44			match get_at_version(hot, table, key.as_ref(), version)? {
45				VersionedGetResult::Value {
46					value,
47					version: v,
48				} => {
49					return Ok(Some(MultiVersionValues {
50						key: key.clone(),
51						values: EncodedValues(value),
52						version: v,
53					}));
54				}
55				VersionedGetResult::Tombstone => return Ok(None),
56				VersionedGetResult::NotFound => {}
57			}
58		}
59
60		// Try warm tier
61		if let Some(warm) = &self.warm {
62			match get_at_version(warm, table, key.as_ref(), version)? {
63				VersionedGetResult::Value {
64					value,
65					version: v,
66				} => {
67					return Ok(Some(MultiVersionValues {
68						key: key.clone(),
69						values: EncodedValues(value),
70						version: v,
71					}));
72				}
73				VersionedGetResult::Tombstone => return Ok(None),
74				VersionedGetResult::NotFound => {}
75			}
76		}
77
78		// Try cold tier
79		if let Some(cold) = &self.cold {
80			match get_at_version(cold, table, key.as_ref(), version)? {
81				VersionedGetResult::Value {
82					value,
83					version: v,
84				} => {
85					return Ok(Some(MultiVersionValues {
86						key: key.clone(),
87						values: EncodedValues(value),
88						version: v,
89					}));
90				}
91				VersionedGetResult::Tombstone => return Ok(None),
92				VersionedGetResult::NotFound => {}
93			}
94		}
95
96		Ok(None)
97	}
98}
99
100impl MultiVersionContains for StandardMultiStore {
101	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0), ret)]
102	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> crate::Result<bool> {
103		Ok(MultiVersionGet::get(self, key, version)?.is_some())
104	}
105}
106
107impl MultiVersionCommit for StandardMultiStore {
108	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
109	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> crate::Result<()> {
110		// Get the hot storage tier (warm and cold are placeholders for now)
111		let Some(storage) = &self.hot else {
112			return Ok(());
113		};
114
115		let mut pending_set_keys: HashSet<Vec<u8>> = HashSet::new();
116		let mut writes: Vec<StorageWrite> = Vec::new();
117		let mut deletes: Vec<StorageDelete> = Vec::new();
118		let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>> = HashMap::new();
119		let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
120
121		for delta in deltas.iter() {
122			let key = delta.key();
123
124			let table = classify_key(key);
125			let is_single_version = is_single_version_semantics_key(key);
126
127			match delta {
128				Delta::Set {
129					key,
130					values,
131				} => {
132					if is_single_version {
133						pending_set_keys.insert(key.as_ref().to_vec());
134					}
135
136					writes.push(StorageWrite {
137						key: key.clone(),
138						value_bytes: values.len() as u64,
139					});
140
141					// Pass the logical key directly - version is a separate parameter
142					let logical_key = CowVec::new(key.as_ref().to_vec());
143					batches.entry(table)
144						.or_default()
145						.push((logical_key, Some(CowVec::new(values.as_ref().to_vec()))));
146				}
147				Delta::Unset {
148					key,
149					values,
150				} => {
151					deletes.push(StorageDelete {
152						key: key.clone(),
153						value_bytes: values.len() as u64,
154					});
155
156					// Pass the logical key directly - tombstone with version
157					let logical_key = CowVec::new(key.as_ref().to_vec());
158					batches.entry(table).or_default().push((logical_key, None));
159				}
160				Delta::Remove {
161					key,
162				} => {
163					// Pass the logical key directly - tombstone with version
164					let logical_key = CowVec::new(key.as_ref().to_vec());
165					batches.entry(table).or_default().push((logical_key, None));
166				}
167				Delta::Drop {
168					key,
169					up_to_version,
170					keep_last_versions,
171				} => {
172					explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
173				}
174			}
175		}
176
177		// Process explicit drops now that pending_set_keys is complete
178		let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
179		for (table, key, up_to_version, keep_last_versions) in explicit_drops {
180			let pending_version = if pending_set_keys.contains(key.as_ref()) {
181				Some(version)
182			} else {
183				None
184			};
185
186			drop_batch.push(DropRequest {
187				table,
188				key: key.0.clone(),
189				up_to_version,
190				keep_last_versions,
191				commit_version: version,
192				pending_version,
193			});
194		}
195
196		// Add implicit drops for single-version-semantics keys
197		for key_bytes in pending_set_keys.iter() {
198			let key = CowVec::new(key_bytes.clone());
199			let table = classify_key(&EncodedKey(key.clone()));
200			drop_batch.push(DropRequest {
201				table,
202				key,
203				up_to_version: None,
204				keep_last_versions: Some(1),
205				commit_version: version,
206				pending_version: Some(version),
207			});
208		}
209
210		if !drop_batch.is_empty() {
211			let _ = self.drop_actor.send(DropMessage::Batch(drop_batch));
212		}
213
214		// Pass version explicitly to storage
215		storage.set(version, batches)?;
216
217		// Emit storage stats event for this commit
218		if !writes.is_empty() || !deletes.is_empty() {
219			self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
220		}
221
222		Ok(())
223	}
224}
225
226/// Cursor state for multi-version range streaming.
227///
228/// Tracks position in each tier independently, allowing the scan to continue
229/// until enough unique logical keys are collected.
230#[derive(Debug, Clone, Default)]
231pub struct MultiVersionRangeCursor {
232	/// Cursor for hot tier
233	pub hot: RangeCursor,
234	/// Cursor for warm tier
235	pub warm: RangeCursor,
236	/// Cursor for cold tier
237	pub cold: RangeCursor,
238	/// Whether all tiers are exhausted
239	pub exhausted: bool,
240}
241
242impl MultiVersionRangeCursor {
243	/// Create a new cursor at the start.
244	pub fn new() -> Self {
245		Self::default()
246	}
247
248	/// Check if all tiers are exhausted.
249	pub fn is_exhausted(&self) -> bool {
250		self.exhausted
251	}
252}
253
254impl StandardMultiStore {
255	/// Fetch the next batch of entries, continuing from cursor position.
256	///
257	/// This properly handles high version density by scanning until `batch_size`
258	/// unique logical keys are collected OR all tiers are exhausted.
259	pub fn range_next(
260		&self,
261		cursor: &mut MultiVersionRangeCursor,
262		range: EncodedKeyRange,
263		version: CommitVersion,
264		batch_size: u64,
265	) -> crate::Result<MultiVersionBatch> {
266		if cursor.exhausted {
267			return Ok(MultiVersionBatch {
268				items: Vec::new(),
269				has_more: false,
270			});
271		}
272
273		let table = classify_key_range(&range);
274		let (start, end) = make_range_bounds(&range);
275		let batch_size = batch_size as usize;
276
277		// Collected entries: logical_key -> (version, value)
278		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
279
280		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
281		while collected.len() < batch_size {
282			let mut any_progress = false;
283
284			// Scan chunk from hot tier
285			if let Some(hot) = &self.hot {
286				if !cursor.hot.exhausted {
287					let progress = Self::scan_tier_chunk(
288						hot,
289						table,
290						&mut cursor.hot,
291						&start,
292						&end,
293						version,
294						&range,
295						&mut collected,
296					)?;
297					any_progress |= progress;
298				}
299			}
300
301			// Scan chunk from warm tier
302			if let Some(warm) = &self.warm {
303				if !cursor.warm.exhausted {
304					let progress = Self::scan_tier_chunk(
305						warm,
306						table,
307						&mut cursor.warm,
308						&start,
309						&end,
310						version,
311						&range,
312						&mut collected,
313					)?;
314					any_progress |= progress;
315				}
316			}
317
318			// Scan chunk from cold tier
319			if let Some(cold) = &self.cold {
320				if !cursor.cold.exhausted {
321					let progress = Self::scan_tier_chunk(
322						cold,
323						table,
324						&mut cursor.cold,
325						&start,
326						&end,
327						version,
328						&range,
329						&mut collected,
330					)?;
331					any_progress |= progress;
332				}
333			}
334
335			if !any_progress {
336				// All tiers exhausted
337				cursor.exhausted = true;
338				break;
339			}
340		}
341
342		// Convert to MultiVersionValues in sorted key order, filtering out tombstones
343		let items: Vec<MultiVersionValues> = collected
344			.into_iter()
345			.take(batch_size)
346			.filter_map(|(key_bytes, (v, value))| {
347				value.map(|val| MultiVersionValues {
348					key: EncodedKey(CowVec::new(key_bytes)),
349					values: EncodedValues(val),
350					version: v,
351				})
352			})
353			.collect();
354
355		let has_more = items.len() >= batch_size || !cursor.exhausted;
356
357		Ok(MultiVersionBatch {
358			items,
359			has_more,
360		})
361	}
362
363	/// Scan a chunk from a single tier and merge into collected entries.
364	/// Returns true if any entries were processed (i.e., made progress).
365	fn scan_tier_chunk<S: TierStorage>(
366		storage: &S,
367		table: EntryKind,
368		cursor: &mut RangeCursor,
369		start: &[u8],
370		end: &[u8],
371		version: CommitVersion,
372		range: &EncodedKeyRange,
373		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
374	) -> crate::Result<bool> {
375		let batch = storage.range_next(
376			table,
377			cursor,
378			Bound::Included(start),
379			Bound::Included(end),
380			version,
381			TIER_SCAN_CHUNK_SIZE,
382		)?;
383
384		if batch.entries.is_empty() {
385			return Ok(false);
386		}
387
388		for entry in batch.entries {
389			// Entry key is already the logical key, entry.version is the version
390			let original_key = entry.key.as_slice().to_vec();
391			let entry_version = entry.version;
392
393			// Skip if key is not within the requested logical range
394			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
395			if !range.contains(&original_key_encoded) {
396				continue;
397			}
398
399			// Update if no entry exists or this is a higher version
400			let should_update = match collected.get(&original_key) {
401				None => true,
402				Some((existing_version, _)) => entry_version > *existing_version,
403			};
404
405			if should_update {
406				collected.insert(original_key, (entry_version, entry.value));
407			}
408		}
409
410		Ok(true)
411	}
412
413	/// Create an iterator for forward range queries.
414	///
415	/// This properly handles high version density by scanning until batch_size
416	/// unique logical keys are collected. The iterator yields individual entries
417	/// and maintains cursor state internally.
418	pub fn range(
419		&self,
420		range: EncodedKeyRange,
421		version: CommitVersion,
422		batch_size: usize,
423	) -> MultiVersionRangeIter {
424		MultiVersionRangeIter {
425			store: self.clone(),
426			cursor: MultiVersionRangeCursor::new(),
427			range,
428			version,
429			batch_size,
430			current_batch: Vec::new(),
431			current_index: 0,
432		}
433	}
434
435	/// Create an iterator for reverse range queries.
436	///
437	/// This properly handles high version density by scanning until batch_size
438	/// unique logical keys are collected. The iterator yields individual entries
439	/// in reverse key order and maintains cursor state internally.
440	pub fn range_rev(
441		&self,
442		range: EncodedKeyRange,
443		version: CommitVersion,
444		batch_size: usize,
445	) -> MultiVersionRangeRevIter {
446		MultiVersionRangeRevIter {
447			store: self.clone(),
448			cursor: MultiVersionRangeCursor::new(),
449			range,
450			version,
451			batch_size,
452			current_batch: Vec::new(),
453			current_index: 0,
454		}
455	}
456
457	/// Fetch the next batch of entries in reverse order, continuing from cursor position.
458	///
459	/// This properly handles high version density by scanning until `batch_size`
460	/// unique logical keys are collected OR all tiers are exhausted.
461	fn range_rev_next(
462		&self,
463		cursor: &mut MultiVersionRangeCursor,
464		range: EncodedKeyRange,
465		version: CommitVersion,
466		batch_size: u64,
467	) -> crate::Result<MultiVersionBatch> {
468		if cursor.exhausted {
469			return Ok(MultiVersionBatch {
470				items: Vec::new(),
471				has_more: false,
472			});
473		}
474
475		let table = classify_key_range(&range);
476		let (start, end) = make_range_bounds(&range);
477		let batch_size = batch_size as usize;
478
479		// Collected entries: logical_key -> (version, value)
480		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
481
482		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
483		while collected.len() < batch_size {
484			let mut any_progress = false;
485
486			// Scan chunk from hot tier (reverse)
487			if let Some(hot) = &self.hot {
488				if !cursor.hot.exhausted {
489					let progress = Self::scan_tier_chunk_rev(
490						hot,
491						table,
492						&mut cursor.hot,
493						&start,
494						&end,
495						version,
496						&range,
497						&mut collected,
498					)?;
499					any_progress |= progress;
500				}
501			}
502
503			// Scan chunk from warm tier (reverse)
504			if let Some(warm) = &self.warm {
505				if !cursor.warm.exhausted {
506					let progress = Self::scan_tier_chunk_rev(
507						warm,
508						table,
509						&mut cursor.warm,
510						&start,
511						&end,
512						version,
513						&range,
514						&mut collected,
515					)?;
516					any_progress |= progress;
517				}
518			}
519
520			// Scan chunk from cold tier (reverse)
521			if let Some(cold) = &self.cold {
522				if !cursor.cold.exhausted {
523					let progress = Self::scan_tier_chunk_rev(
524						cold,
525						table,
526						&mut cursor.cold,
527						&start,
528						&end,
529						version,
530						&range,
531						&mut collected,
532					)?;
533					any_progress |= progress;
534				}
535			}
536
537			if !any_progress {
538				// All tiers exhausted
539				cursor.exhausted = true;
540				break;
541			}
542		}
543
544		// Convert to MultiVersionValues in REVERSE sorted key order, filtering out tombstones
545		let items: Vec<MultiVersionValues> = collected
546			.into_iter()
547			.rev()
548			.take(batch_size)
549			.filter_map(|(key_bytes, (v, value))| {
550				value.map(|val| MultiVersionValues {
551					key: EncodedKey(CowVec::new(key_bytes)),
552					values: EncodedValues(val),
553					version: v,
554				})
555			})
556			.collect();
557
558		let has_more = items.len() >= batch_size || !cursor.exhausted;
559
560		Ok(MultiVersionBatch {
561			items,
562			has_more,
563		})
564	}
565
566	/// Scan a chunk from a single tier in reverse and merge into collected entries.
567	/// Returns true if any entries were processed (i.e., made progress).
568	fn scan_tier_chunk_rev<S: TierStorage>(
569		storage: &S,
570		table: EntryKind,
571		cursor: &mut RangeCursor,
572		start: &[u8],
573		end: &[u8],
574		version: CommitVersion,
575		range: &EncodedKeyRange,
576		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
577	) -> crate::Result<bool> {
578		let batch = storage.range_rev_next(
579			table,
580			cursor,
581			Bound::Included(start),
582			Bound::Included(end),
583			version,
584			TIER_SCAN_CHUNK_SIZE,
585		)?;
586
587		if batch.entries.is_empty() {
588			return Ok(false);
589		}
590
591		for entry in batch.entries {
592			// Entry key is already the logical key, entry.version is the version
593			let original_key = entry.key.as_slice().to_vec();
594			let entry_version = entry.version;
595
596			// Skip if key is not within the requested logical range
597			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
598			if !range.contains(&original_key_encoded) {
599				continue;
600			}
601
602			// Update if no entry exists or this is a higher version
603			let should_update = match collected.get(&original_key) {
604				None => true,
605				Some((existing_version, _)) => entry_version > *existing_version,
606			};
607
608			if should_update {
609				collected.insert(original_key, (entry_version, entry.value));
610			}
611		}
612
613		Ok(true)
614	}
615}
616
617impl MultiVersionGetPrevious for StandardMultiStore {
618	fn get_previous_version(
619		&self,
620		key: &EncodedKey,
621		before_version: CommitVersion,
622	) -> crate::Result<Option<MultiVersionValues>> {
623		if before_version.0 == 0 {
624			return Ok(None);
625		}
626
627		// Hot storage must be available for version lookups
628		let storage = self.hot.as_ref().expect("hot storage required for version lookups");
629
630		let table = classify_key(key);
631		let prev_version = CommitVersion(before_version.0 - 1);
632
633		match get_at_version(storage, table, key.as_ref(), prev_version) {
634			Ok(VersionedGetResult::Value {
635				value,
636				version,
637			}) => Ok(Some(MultiVersionValues {
638				key: key.clone(),
639				values: EncodedValues(CowVec::new(value.to_vec())),
640				version,
641			})),
642			Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
643			Err(e) => Err(e),
644		}
645	}
646}
647
648impl MultiVersionStore for StandardMultiStore {}
649
650/// Iterator for forward multi-version range queries.
651pub struct MultiVersionRangeIter {
652	store: StandardMultiStore,
653	cursor: MultiVersionRangeCursor,
654	range: EncodedKeyRange,
655	version: CommitVersion,
656	batch_size: usize,
657	current_batch: Vec<MultiVersionValues>,
658	current_index: usize,
659}
660
661impl Iterator for MultiVersionRangeIter {
662	type Item = crate::Result<MultiVersionValues>;
663
664	fn next(&mut self) -> Option<Self::Item> {
665		// If we have items in the current batch, return them
666		if self.current_index < self.current_batch.len() {
667			let item = self.current_batch[self.current_index].clone();
668			self.current_index += 1;
669			return Some(Ok(item));
670		}
671
672		// If cursor is exhausted, we're done
673		if self.cursor.exhausted {
674			return None;
675		}
676
677		// Fetch the next batch
678		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
679		{
680			Ok(batch) => {
681				if batch.items.is_empty() {
682					return None;
683				}
684				self.current_batch = batch.items;
685				self.current_index = 0;
686				self.next()
687			}
688			Err(e) => Some(Err(e)),
689		}
690	}
691}
692
693/// Iterator for reverse multi-version range queries.
694pub struct MultiVersionRangeRevIter {
695	store: StandardMultiStore,
696	cursor: MultiVersionRangeCursor,
697	range: EncodedKeyRange,
698	version: CommitVersion,
699	batch_size: usize,
700	current_batch: Vec<MultiVersionValues>,
701	current_index: usize,
702}
703
704impl Iterator for MultiVersionRangeRevIter {
705	type Item = crate::Result<MultiVersionValues>;
706
707	fn next(&mut self) -> Option<Self::Item> {
708		// If we have items in the current batch, return them
709		if self.current_index < self.current_batch.len() {
710			let item = self.current_batch[self.current_index].clone();
711			self.current_index += 1;
712			return Some(Ok(item));
713		}
714
715		// If cursor is exhausted, we're done
716		if self.cursor.exhausted {
717			return None;
718		}
719
720		// Fetch the next batch
721		match self.store.range_rev_next(
722			&mut self.cursor,
723			self.range.clone(),
724			self.version,
725			self.batch_size as u64,
726		) {
727			Ok(batch) => {
728				if batch.items.is_empty() {
729					return None;
730				}
731				self.current_batch = batch.items;
732				self.current_index = 0;
733				self.next()
734			}
735			Err(e) => Some(Err(e)),
736		}
737	}
738}
739
740/// Classify a range to determine which table it belongs to.
741fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
742	use super::router::classify_range;
743
744	classify_range(range).unwrap_or(Multi)
745}
746
747/// Create range bounds from an EncodedKeyRange.
748/// Returns the start and end byte slices for the range query.
749fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
750	let start = match &range.start {
751		Bound::Included(key) => key.as_ref().to_vec(),
752		Bound::Excluded(key) => key.as_ref().to_vec(),
753		Bound::Unbounded => vec![],
754	};
755
756	let end = match &range.end {
757		Bound::Included(key) => key.as_ref().to_vec(),
758		Bound::Excluded(key) => key.as_ref().to_vec(),
759		Bound::Unbounded => vec![0xFFu8; 256],
760	};
761
762	(start, end)
763}