Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	actors::drop::{DropMessage, DropRequest},
11	common::CommitVersion,
12	delta::Delta,
13	encoded::{
14		key::{EncodedKey, EncodedKeyRange},
15		row::EncodedRow,
16	},
17	event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18	interface::store::{
19		EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20		MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21	},
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27	StandardMultiStore,
28	router::{classify_key, classify_range, is_single_version_semantics_key},
29	version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32	Result,
33	tier::{RangeCursor, TierBatch, TierStorage},
34};
35
36/// Fixed chunk size for internal tier scans.
37/// This is the number of versioned entries fetched per tier per iteration.
38const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42	fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43		let table = classify_key(key);
44
45		// Try hot tier first
46		if let Some(hot) = &self.hot {
47			match get_at_version(hot, table, key.as_ref(), version)? {
48				VersionedGetResult::Value {
49					value,
50					version: v,
51				} => {
52					return Ok(Some(MultiVersionRow {
53						key: key.clone(),
54						row: EncodedRow(value),
55						version: v,
56					}));
57				}
58				VersionedGetResult::Tombstone => return Ok(None),
59				VersionedGetResult::NotFound => {}
60			}
61		}
62
63		// Try warm tier
64		if let Some(warm) = &self.warm {
65			match get_at_version(warm, table, key.as_ref(), version)? {
66				VersionedGetResult::Value {
67					value,
68					version: v,
69				} => {
70					return Ok(Some(MultiVersionRow {
71						key: key.clone(),
72						row: EncodedRow(value),
73						version: v,
74					}));
75				}
76				VersionedGetResult::Tombstone => return Ok(None),
77				VersionedGetResult::NotFound => {}
78			}
79		}
80
81		// Try cold tier
82		if let Some(cold) = &self.cold {
83			match get_at_version(cold, table, key.as_ref(), version)? {
84				VersionedGetResult::Value {
85					value,
86					version: v,
87				} => {
88					return Ok(Some(MultiVersionRow {
89						key: key.clone(),
90						row: EncodedRow(value),
91						version: v,
92					}));
93				}
94				VersionedGetResult::Tombstone => return Ok(None),
95				VersionedGetResult::NotFound => {}
96			}
97		}
98
99		Ok(None)
100	}
101}
102
103impl MultiVersionContains for StandardMultiStore {
104	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106		Ok(MultiVersionGet::get(self, key, version)?.is_some())
107	}
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113		// Get the hot storage tier (warm and cold are placeholders for now)
114		let Some(storage) = &self.hot else {
115			return Ok(());
116		};
117
118		let classified = classify_deltas(&deltas);
119		let drop_batch = build_drop_batch(classified.explicit_drops, &classified.pending_set_keys, version);
120		self.dispatch_drops(drop_batch);
121
122		storage.set(version, classified.batches)?;
123		self.emit_commit_metrics(classified.writes, classified.deletes, version);
124		Ok(())
125	}
126}
127
128/// `commit`'s per-delta classification: Set/Unset go to `batches` (and emit
129/// metric entries), Remove goes to `batches` only, Drop is queued for the
130/// drop-actor with optional pending-version tagging if the same key was Set
131/// in this commit (single-version-semantics keys).
132struct ClassifiedDeltas {
133	pending_set_keys: HashSet<CowVec<u8>>,
134	writes: Vec<MultiWrite>,
135	deletes: Vec<MultiDelete>,
136	batches: TierBatch,
137	explicit_drops: Vec<(EntryKind, EncodedKey)>,
138}
139
140#[inline]
141fn classify_deltas(deltas: &CowVec<Delta>) -> ClassifiedDeltas {
142	let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
143	let mut writes: Vec<MultiWrite> = Vec::new();
144	let mut deletes: Vec<MultiDelete> = Vec::new();
145	let mut batches: TierBatch = HashMap::new();
146	let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
147
148	for delta in deltas.iter() {
149		let key = delta.key();
150		let table = classify_key(key);
151		let is_single_version = is_single_version_semantics_key(key);
152
153		match delta {
154			Delta::Set {
155				key,
156				row,
157			} => {
158				if is_single_version {
159					pending_set_keys.insert(key.0.clone());
160				}
161				writes.push(MultiWrite {
162					key: key.clone(),
163					value_bytes: row.len() as u64,
164				});
165				batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
166			}
167			Delta::Unset {
168				key,
169				row,
170			} => {
171				deletes.push(MultiDelete {
172					key: key.clone(),
173					value_bytes: row.len() as u64,
174				});
175				batches.entry(table).or_default().push((key.0.clone(), None));
176			}
177			Delta::Remove {
178				key,
179			} => {
180				batches.entry(table).or_default().push((key.0.clone(), None));
181			}
182			Delta::Drop {
183				key,
184			} => {
185				explicit_drops.push((table, key.clone()));
186			}
187		}
188	}
189
190	ClassifiedDeltas {
191		pending_set_keys,
192		writes,
193		deletes,
194		batches,
195		explicit_drops,
196	}
197}
198
199/// Combine explicit `Delta::Drop` requests with implicit drops for
200/// single-version-semantics keys that were also Set in this commit. Both kinds
201/// share the same commit version; explicit drops carry a pending_version only
202/// when the same key was Set in this commit (overlap case).
203#[inline]
204fn build_drop_batch(
205	explicit_drops: Vec<(EntryKind, EncodedKey)>,
206	pending_set_keys: &HashSet<CowVec<u8>>,
207	version: CommitVersion,
208) -> Vec<DropRequest> {
209	let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
210	for (table, key) in explicit_drops {
211		let pending_version = if pending_set_keys.contains(key.as_ref()) {
212			Some(version)
213		} else {
214			None
215		};
216		drop_batch.push(DropRequest {
217			table,
218			key: key.0.clone(),
219			commit_version: version,
220			pending_version,
221		});
222	}
223	for key in pending_set_keys.iter() {
224		let table = classify_key(&EncodedKey(key.clone()));
225		drop_batch.push(DropRequest {
226			table,
227			key: key.clone(),
228			commit_version: version,
229			pending_version: Some(version),
230		});
231	}
232	drop_batch
233}
234
235impl StandardMultiStore {
236	#[inline]
237	fn dispatch_drops(&self, drop_batch: Vec<DropRequest>) {
238		if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
239			warn!("Failed to send drop batch");
240		}
241	}
242
243	#[inline]
244	fn emit_commit_metrics(&self, writes: Vec<MultiWrite>, deletes: Vec<MultiDelete>, version: CommitVersion) {
245		if writes.is_empty() && deletes.is_empty() {
246			return;
247		}
248		self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
249	}
250}
251
252/// Cursor state for multi-version range streaming.
253///
254/// Tracks position in each tier independently, allowing the scan to continue
255/// until enough unique logical keys are collected.
256#[derive(Debug, Clone, Default)]
257pub struct MultiVersionRangeCursor {
258	/// Cursor for hot tier
259	pub hot: RangeCursor,
260	/// Cursor for warm tier
261	pub warm: RangeCursor,
262	/// Cursor for cold tier
263	pub cold: RangeCursor,
264	/// Whether all tiers are exhausted
265	pub exhausted: bool,
266}
267
268impl MultiVersionRangeCursor {
269	/// Create a new cursor at the start.
270	pub fn new() -> Self {
271		Self::default()
272	}
273
274	/// Check if all tiers are exhausted.
275	pub fn is_exhausted(&self) -> bool {
276		self.exhausted
277	}
278}
279
280/// Parameters shared by tier scan operations (forward and reverse).
281struct TierScanQuery<'a> {
282	table: EntryKind,
283	start: &'a [u8],
284	end: &'a [u8],
285	version: CommitVersion,
286	range: &'a EncodedKeyRange,
287}
288
289impl StandardMultiStore {
290	/// Fetch the next batch of entries, continuing from cursor position.
291	///
292	/// This properly handles high version density by scanning until `batch_size`
293	/// unique logical keys are collected OR all tiers are exhausted.
294	pub fn range_next(
295		&self,
296		cursor: &mut MultiVersionRangeCursor,
297		range: EncodedKeyRange,
298		version: CommitVersion,
299		batch_size: u64,
300	) -> Result<MultiVersionBatch> {
301		if cursor.exhausted {
302			return Ok(MultiVersionBatch {
303				items: Vec::new(),
304				has_more: false,
305			});
306		}
307
308		let table = classify_key_range(&range);
309		let (start, end) = make_range_bounds(&range);
310		let batch_size = batch_size as usize;
311		let scan = TierScanQuery {
312			table,
313			start: &start,
314			end: &end,
315			version,
316			range: &range,
317		};
318
319		// Collected entries: logical_key -> (version, value)
320		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
321
322		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
323		while collected.len() < batch_size {
324			let mut any_progress = false;
325
326			// Scan chunk from hot tier
327			if let Some(hot) = &self.hot
328				&& !cursor.hot.exhausted
329			{
330				let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
331				any_progress |= progress;
332			}
333
334			// Scan chunk from warm tier
335			if let Some(warm) = &self.warm
336				&& !cursor.warm.exhausted
337			{
338				let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
339				any_progress |= progress;
340			}
341
342			// Scan chunk from cold tier
343			if let Some(cold) = &self.cold
344				&& !cursor.cold.exhausted
345			{
346				let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
347				any_progress |= progress;
348			}
349
350			if !any_progress {
351				// All tiers exhausted
352				cursor.exhausted = true;
353				break;
354			}
355		}
356
357		// Convert to MultiVersionRow in sorted key order, filtering out tombstones
358		let items: Vec<MultiVersionRow> = collected
359			.into_iter()
360			.take(batch_size)
361			.filter_map(|(key_bytes, (v, value))| {
362				value.map(|val| MultiVersionRow {
363					key: EncodedKey(CowVec::new(key_bytes)),
364					row: EncodedRow(val),
365					version: v,
366				})
367			})
368			.collect();
369
370		let has_more = items.len() >= batch_size || !cursor.exhausted;
371
372		Ok(MultiVersionBatch {
373			items,
374			has_more,
375		})
376	}
377
378	/// Scan a chunk from a single tier and merge into collected entries.
379	/// Returns true if any entries were processed (i.e., made progress).
380	fn scan_tier_chunk<S: TierStorage>(
381		storage: &S,
382		cursor: &mut RangeCursor,
383		scan: &TierScanQuery,
384		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
385	) -> Result<bool> {
386		let batch = storage.range_next(
387			scan.table,
388			cursor,
389			Bound::Included(scan.start),
390			Bound::Included(scan.end),
391			scan.version,
392			TIER_SCAN_CHUNK_SIZE,
393		)?;
394
395		if batch.entries.is_empty() {
396			return Ok(false);
397		}
398
399		for entry in batch.entries {
400			// Entry key is already the logical key, entry.version is the version
401			let original_key = entry.key.as_slice().to_vec();
402			let entry_version = entry.version;
403
404			// Skip if key is not within the requested logical range
405			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
406			if !scan.range.contains(&original_key_encoded) {
407				continue;
408			}
409
410			// Update if no entry exists or this is a higher version
411			let should_update = match collected.get(&original_key) {
412				None => true,
413				Some((existing_version, _)) => entry_version > *existing_version,
414			};
415
416			if should_update {
417				collected.insert(original_key, (entry_version, entry.value));
418			}
419		}
420
421		Ok(true)
422	}
423
424	/// Create an iterator for forward range queries.
425	///
426	/// This properly handles high version density by scanning until batch_size
427	/// unique logical keys are collected. The iterator yields individual entries
428	/// and maintains cursor state internally.
429	pub fn range(
430		&self,
431		range: EncodedKeyRange,
432		version: CommitVersion,
433		batch_size: usize,
434	) -> MultiVersionRangeIter {
435		MultiVersionRangeIter {
436			store: self.clone(),
437			cursor: MultiVersionRangeCursor::new(),
438			range,
439			version,
440			batch_size,
441			current_batch: Vec::new(),
442			current_index: 0,
443		}
444	}
445
446	/// Create an iterator for reverse range queries.
447	///
448	/// This properly handles high version density by scanning until batch_size
449	/// unique logical keys are collected. The iterator yields individual entries
450	/// in reverse key order and maintains cursor state internally.
451	pub fn range_rev(
452		&self,
453		range: EncodedKeyRange,
454		version: CommitVersion,
455		batch_size: usize,
456	) -> MultiVersionRangeRevIter {
457		MultiVersionRangeRevIter {
458			store: self.clone(),
459			cursor: MultiVersionRangeCursor::new(),
460			range,
461			version,
462			batch_size,
463			current_batch: Vec::new(),
464			current_index: 0,
465		}
466	}
467
468	/// Fetch the next batch of entries in reverse order, continuing from cursor position.
469	///
470	/// This properly handles high version density by scanning until `batch_size`
471	/// unique logical keys are collected OR all tiers are exhausted.
472	fn range_rev_next(
473		&self,
474		cursor: &mut MultiVersionRangeCursor,
475		range: EncodedKeyRange,
476		version: CommitVersion,
477		batch_size: u64,
478	) -> Result<MultiVersionBatch> {
479		if cursor.exhausted {
480			return Ok(MultiVersionBatch {
481				items: Vec::new(),
482				has_more: false,
483			});
484		}
485
486		let table = classify_key_range(&range);
487		let (start, end) = make_range_bounds(&range);
488		let batch_size = batch_size as usize;
489		let scan = TierScanQuery {
490			table,
491			start: &start,
492			end: &end,
493			version,
494			range: &range,
495		};
496
497		// Collected entries: logical_key -> (version, value)
498		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
499
500		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
501		while collected.len() < batch_size {
502			let mut any_progress = false;
503
504			// Scan chunk from hot tier (reverse)
505			if let Some(hot) = &self.hot
506				&& !cursor.hot.exhausted
507			{
508				let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
509				any_progress |= progress;
510			}
511
512			// Scan chunk from warm tier (reverse)
513			if let Some(warm) = &self.warm
514				&& !cursor.warm.exhausted
515			{
516				let progress =
517					Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
518				any_progress |= progress;
519			}
520
521			// Scan chunk from cold tier (reverse)
522			if let Some(cold) = &self.cold
523				&& !cursor.cold.exhausted
524			{
525				let progress =
526					Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
527				any_progress |= progress;
528			}
529
530			if !any_progress {
531				// All tiers exhausted
532				cursor.exhausted = true;
533				break;
534			}
535		}
536
537		// Convert to MultiVersionRow in REVERSE sorted key order, filtering out tombstones
538		let items: Vec<MultiVersionRow> = collected
539			.into_iter()
540			.rev()
541			.take(batch_size)
542			.filter_map(|(key_bytes, (v, value))| {
543				value.map(|val| MultiVersionRow {
544					key: EncodedKey(CowVec::new(key_bytes)),
545					row: EncodedRow(val),
546					version: v,
547				})
548			})
549			.collect();
550
551		let has_more = items.len() >= batch_size || !cursor.exhausted;
552
553		Ok(MultiVersionBatch {
554			items,
555			has_more,
556		})
557	}
558
559	/// Scan a chunk from a single tier in reverse and merge into collected entries.
560	/// Returns true if any entries were processed (i.e., made progress).
561	fn scan_tier_chunk_rev<S: TierStorage>(
562		storage: &S,
563		cursor: &mut RangeCursor,
564		scan: &TierScanQuery,
565		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
566	) -> Result<bool> {
567		let batch = storage.range_rev_next(
568			scan.table,
569			cursor,
570			Bound::Included(scan.start),
571			Bound::Included(scan.end),
572			scan.version,
573			TIER_SCAN_CHUNK_SIZE,
574		)?;
575
576		if batch.entries.is_empty() {
577			return Ok(false);
578		}
579
580		for entry in batch.entries {
581			// Entry key is already the logical key, entry.version is the version
582			let original_key = entry.key.as_slice().to_vec();
583			let entry_version = entry.version;
584
585			// Skip if key is not within the requested logical range
586			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
587			if !scan.range.contains(&original_key_encoded) {
588				continue;
589			}
590
591			// Update if no entry exists or this is a higher version
592			let should_update = match collected.get(&original_key) {
593				None => true,
594				Some((existing_version, _)) => entry_version > *existing_version,
595			};
596
597			if should_update {
598				collected.insert(original_key, (entry_version, entry.value));
599			}
600		}
601
602		Ok(true)
603	}
604}
605
606impl MultiVersionGetPrevious for StandardMultiStore {
607	fn get_previous_version(
608		&self,
609		key: &EncodedKey,
610		before_version: CommitVersion,
611	) -> Result<Option<MultiVersionRow>> {
612		if before_version.0 == 0 {
613			return Ok(None);
614		}
615
616		// Hot storage must be available for version lookups
617		let storage = self.hot.as_ref().expect("hot storage required for version lookups");
618
619		let table = classify_key(key);
620		let prev_version = CommitVersion(before_version.0 - 1);
621
622		match get_at_version(storage, table, key.as_ref(), prev_version) {
623			Ok(VersionedGetResult::Value {
624				value,
625				version,
626			}) => Ok(Some(MultiVersionRow {
627				key: key.clone(),
628				row: EncodedRow(CowVec::new(value.to_vec())),
629				version,
630			})),
631			Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
632			Err(e) => Err(e),
633		}
634	}
635}
636
637impl MultiVersionStore for StandardMultiStore {}
638
639/// Iterator for forward multi-version range queries.
640pub struct MultiVersionRangeIter {
641	store: StandardMultiStore,
642	cursor: MultiVersionRangeCursor,
643	range: EncodedKeyRange,
644	version: CommitVersion,
645	batch_size: usize,
646	current_batch: Vec<MultiVersionRow>,
647	current_index: usize,
648}
649
650impl Iterator for MultiVersionRangeIter {
651	type Item = Result<MultiVersionRow>;
652
653	fn next(&mut self) -> Option<Self::Item> {
654		// If we have items in the current batch, return them
655		if self.current_index < self.current_batch.len() {
656			let item = self.current_batch[self.current_index].clone();
657			self.current_index += 1;
658			return Some(Ok(item));
659		}
660
661		// If cursor is exhausted, we're done
662		if self.cursor.exhausted {
663			return None;
664		}
665
666		// Fetch the next batch
667		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
668		{
669			Ok(batch) => {
670				if batch.items.is_empty() {
671					return None;
672				}
673				self.current_batch = batch.items;
674				self.current_index = 0;
675				self.next()
676			}
677			Err(e) => Some(Err(e)),
678		}
679	}
680}
681
682/// Iterator for reverse multi-version range queries.
683pub struct MultiVersionRangeRevIter {
684	store: StandardMultiStore,
685	cursor: MultiVersionRangeCursor,
686	range: EncodedKeyRange,
687	version: CommitVersion,
688	batch_size: usize,
689	current_batch: Vec<MultiVersionRow>,
690	current_index: usize,
691}
692
693impl Iterator for MultiVersionRangeRevIter {
694	type Item = Result<MultiVersionRow>;
695
696	fn next(&mut self) -> Option<Self::Item> {
697		// If we have items in the current batch, return them
698		if self.current_index < self.current_batch.len() {
699			let item = self.current_batch[self.current_index].clone();
700			self.current_index += 1;
701			return Some(Ok(item));
702		}
703
704		// If cursor is exhausted, we're done
705		if self.cursor.exhausted {
706			return None;
707		}
708
709		// Fetch the next batch
710		match self.store.range_rev_next(
711			&mut self.cursor,
712			self.range.clone(),
713			self.version,
714			self.batch_size as u64,
715		) {
716			Ok(batch) => {
717				if batch.items.is_empty() {
718					return None;
719				}
720				self.current_batch = batch.items;
721				self.current_index = 0;
722				self.next()
723			}
724			Err(e) => Some(Err(e)),
725		}
726	}
727}
728
729/// Classify a range to determine which table it belongs to.
730fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
731	classify_range(range).unwrap_or(EntryKind::Multi)
732}
733
734/// Create range bounds from an EncodedKeyRange.
735/// Returns the start and end byte slices for the range query.
736fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
737	let start = match &range.start {
738		Bound::Included(key) => key.as_ref().to_vec(),
739		Bound::Excluded(key) => key.as_ref().to_vec(),
740		Bound::Unbounded => vec![],
741	};
742
743	let end = match &range.end {
744		Bound::Included(key) => key.as_ref().to_vec(),
745		Bound::Excluded(key) => key.as_ref().to_vec(),
746		Bound::Unbounded => vec![0xFFu8; 256],
747	};
748
749	(start, end)
750}