Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	common::CommitVersion,
11	delta::Delta,
12	encoded::{
13		key::{EncodedKey, EncodedKeyRange},
14		row::EncodedRow,
15	},
16	event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17	interface::store::{
18		MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19		MultiVersionRow, MultiVersionStore,
20	},
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::{instrument, warn};
24
25use super::{
26	StandardMultiStore,
27	router::{classify_key, classify_range, is_single_version_semantics_key},
28	version::{VersionedGetResult, get_at_version},
29	worker::{DropMessage, DropRequest},
30};
31use crate::{
32	Result,
33	tier::{EntryKind, EntryKind::Multi, RangeCursor, TierBatch, TierStorage},
34};
35
36/// Fixed chunk size for internal tier scans.
37/// This is the number of versioned entries fetched per tier per iteration.
38const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42	fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43		let table = classify_key(key);
44
45		// Try hot tier first
46		if let Some(hot) = &self.hot {
47			match get_at_version(hot, table, key.as_ref(), version)? {
48				VersionedGetResult::Value {
49					value,
50					version: v,
51				} => {
52					return Ok(Some(MultiVersionRow {
53						key: key.clone(),
54						row: EncodedRow(value),
55						version: v,
56					}));
57				}
58				VersionedGetResult::Tombstone => return Ok(None),
59				VersionedGetResult::NotFound => {}
60			}
61		}
62
63		// Try warm tier
64		if let Some(warm) = &self.warm {
65			match get_at_version(warm, table, key.as_ref(), version)? {
66				VersionedGetResult::Value {
67					value,
68					version: v,
69				} => {
70					return Ok(Some(MultiVersionRow {
71						key: key.clone(),
72						row: EncodedRow(value),
73						version: v,
74					}));
75				}
76				VersionedGetResult::Tombstone => return Ok(None),
77				VersionedGetResult::NotFound => {}
78			}
79		}
80
81		// Try cold tier
82		if let Some(cold) = &self.cold {
83			match get_at_version(cold, table, key.as_ref(), version)? {
84				VersionedGetResult::Value {
85					value,
86					version: v,
87				} => {
88					return Ok(Some(MultiVersionRow {
89						key: key.clone(),
90						row: EncodedRow(value),
91						version: v,
92					}));
93				}
94				VersionedGetResult::Tombstone => return Ok(None),
95				VersionedGetResult::NotFound => {}
96			}
97		}
98
99		Ok(None)
100	}
101}
102
103impl MultiVersionContains for StandardMultiStore {
104	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106		Ok(MultiVersionGet::get(self, key, version)?.is_some())
107	}
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113		// Get the hot storage tier (warm and cold are placeholders for now)
114		let Some(storage) = &self.hot else {
115			return Ok(());
116		};
117
118		let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119		let mut writes: Vec<StorageWrite> = Vec::new();
120		let mut deletes: Vec<StorageDelete> = Vec::new();
121		let mut batches: TierBatch = HashMap::new();
122		let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
123
124		for delta in deltas.iter() {
125			let key = delta.key();
126
127			let table = classify_key(key);
128			let is_single_version = is_single_version_semantics_key(key);
129
130			match delta {
131				Delta::Set {
132					key,
133					row,
134				} => {
135					if is_single_version {
136						pending_set_keys.insert(key.0.clone());
137					}
138
139					writes.push(StorageWrite {
140						key: key.clone(),
141						value_bytes: row.len() as u64,
142					});
143
144					batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
145				}
146				Delta::Unset {
147					key,
148					row,
149				} => {
150					deletes.push(StorageDelete {
151						key: key.clone(),
152						value_bytes: row.len() as u64,
153					});
154
155					batches.entry(table).or_default().push((key.0.clone(), None));
156				}
157				Delta::Remove {
158					key,
159				} => {
160					batches.entry(table).or_default().push((key.0.clone(), None));
161				}
162				Delta::Drop {
163					key,
164					up_to_version,
165					keep_last_versions,
166				} => {
167					explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
168				}
169			}
170		}
171
172		// Process explicit drops now that pending_set_keys is complete
173		let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
174		for (table, key, up_to_version, keep_last_versions) in explicit_drops {
175			let pending_version = if pending_set_keys.contains(key.as_ref()) {
176				Some(version)
177			} else {
178				None
179			};
180
181			drop_batch.push(DropRequest {
182				table,
183				key: key.0.clone(),
184				up_to_version,
185				keep_last_versions,
186				commit_version: version,
187				pending_version,
188			});
189		}
190
191		// Add implicit drops for single-version-semantics keys
192		for key in pending_set_keys.iter() {
193			let table = classify_key(&EncodedKey(key.clone()));
194			drop_batch.push(DropRequest {
195				table,
196				key: key.clone(),
197				up_to_version: None,
198				keep_last_versions: Some(1),
199				commit_version: version,
200				pending_version: Some(version),
201			});
202		}
203
204		if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
205			warn!("Failed to send drop batch");
206		}
207
208		// Pass version explicitly to storage
209		storage.set(version, batches)?;
210
211		// Emit storage stats event for this commit
212		if !writes.is_empty() || !deletes.is_empty() {
213			self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
214		}
215
216		Ok(())
217	}
218}
219
220/// Cursor state for multi-version range streaming.
221///
222/// Tracks position in each tier independently, allowing the scan to continue
223/// until enough unique logical keys are collected.
224#[derive(Debug, Clone, Default)]
225pub struct MultiVersionRangeCursor {
226	/// Cursor for hot tier
227	pub hot: RangeCursor,
228	/// Cursor for warm tier
229	pub warm: RangeCursor,
230	/// Cursor for cold tier
231	pub cold: RangeCursor,
232	/// Whether all tiers are exhausted
233	pub exhausted: bool,
234}
235
236impl MultiVersionRangeCursor {
237	/// Create a new cursor at the start.
238	pub fn new() -> Self {
239		Self::default()
240	}
241
242	/// Check if all tiers are exhausted.
243	pub fn is_exhausted(&self) -> bool {
244		self.exhausted
245	}
246}
247
248/// Parameters shared by tier scan operations (forward and reverse).
249struct TierScanQuery<'a> {
250	table: EntryKind,
251	start: &'a [u8],
252	end: &'a [u8],
253	version: CommitVersion,
254	range: &'a EncodedKeyRange,
255}
256
257impl StandardMultiStore {
258	/// Fetch the next batch of entries, continuing from cursor position.
259	///
260	/// This properly handles high version density by scanning until `batch_size`
261	/// unique logical keys are collected OR all tiers are exhausted.
262	pub fn range_next(
263		&self,
264		cursor: &mut MultiVersionRangeCursor,
265		range: EncodedKeyRange,
266		version: CommitVersion,
267		batch_size: u64,
268	) -> Result<MultiVersionBatch> {
269		if cursor.exhausted {
270			return Ok(MultiVersionBatch {
271				items: Vec::new(),
272				has_more: false,
273			});
274		}
275
276		let table = classify_key_range(&range);
277		let (start, end) = make_range_bounds(&range);
278		let batch_size = batch_size as usize;
279		let scan = TierScanQuery {
280			table,
281			start: &start,
282			end: &end,
283			version,
284			range: &range,
285		};
286
287		// Collected entries: logical_key -> (version, value)
288		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
289
290		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
291		while collected.len() < batch_size {
292			let mut any_progress = false;
293
294			// Scan chunk from hot tier
295			if let Some(hot) = &self.hot
296				&& !cursor.hot.exhausted
297			{
298				let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
299				any_progress |= progress;
300			}
301
302			// Scan chunk from warm tier
303			if let Some(warm) = &self.warm
304				&& !cursor.warm.exhausted
305			{
306				let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
307				any_progress |= progress;
308			}
309
310			// Scan chunk from cold tier
311			if let Some(cold) = &self.cold
312				&& !cursor.cold.exhausted
313			{
314				let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
315				any_progress |= progress;
316			}
317
318			if !any_progress {
319				// All tiers exhausted
320				cursor.exhausted = true;
321				break;
322			}
323		}
324
325		// Convert to MultiVersionRow in sorted key order, filtering out tombstones
326		let items: Vec<MultiVersionRow> = collected
327			.into_iter()
328			.take(batch_size)
329			.filter_map(|(key_bytes, (v, value))| {
330				value.map(|val| MultiVersionRow {
331					key: EncodedKey(CowVec::new(key_bytes)),
332					row: EncodedRow(val),
333					version: v,
334				})
335			})
336			.collect();
337
338		let has_more = items.len() >= batch_size || !cursor.exhausted;
339
340		Ok(MultiVersionBatch {
341			items,
342			has_more,
343		})
344	}
345
346	/// Scan a chunk from a single tier and merge into collected entries.
347	/// Returns true if any entries were processed (i.e., made progress).
348	fn scan_tier_chunk<S: TierStorage>(
349		storage: &S,
350		cursor: &mut RangeCursor,
351		scan: &TierScanQuery,
352		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
353	) -> Result<bool> {
354		let batch = storage.range_next(
355			scan.table,
356			cursor,
357			Bound::Included(scan.start),
358			Bound::Included(scan.end),
359			scan.version,
360			TIER_SCAN_CHUNK_SIZE,
361		)?;
362
363		if batch.entries.is_empty() {
364			return Ok(false);
365		}
366
367		for entry in batch.entries {
368			// Entry key is already the logical key, entry.version is the version
369			let original_key = entry.key.as_slice().to_vec();
370			let entry_version = entry.version;
371
372			// Skip if key is not within the requested logical range
373			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
374			if !scan.range.contains(&original_key_encoded) {
375				continue;
376			}
377
378			// Update if no entry exists or this is a higher version
379			let should_update = match collected.get(&original_key) {
380				None => true,
381				Some((existing_version, _)) => entry_version > *existing_version,
382			};
383
384			if should_update {
385				collected.insert(original_key, (entry_version, entry.value));
386			}
387		}
388
389		Ok(true)
390	}
391
392	/// Create an iterator for forward range queries.
393	///
394	/// This properly handles high version density by scanning until batch_size
395	/// unique logical keys are collected. The iterator yields individual entries
396	/// and maintains cursor state internally.
397	pub fn range(
398		&self,
399		range: EncodedKeyRange,
400		version: CommitVersion,
401		batch_size: usize,
402	) -> MultiVersionRangeIter {
403		MultiVersionRangeIter {
404			store: self.clone(),
405			cursor: MultiVersionRangeCursor::new(),
406			range,
407			version,
408			batch_size,
409			current_batch: Vec::new(),
410			current_index: 0,
411		}
412	}
413
414	/// Create an iterator for reverse range queries.
415	///
416	/// This properly handles high version density by scanning until batch_size
417	/// unique logical keys are collected. The iterator yields individual entries
418	/// in reverse key order and maintains cursor state internally.
419	pub fn range_rev(
420		&self,
421		range: EncodedKeyRange,
422		version: CommitVersion,
423		batch_size: usize,
424	) -> MultiVersionRangeRevIter {
425		MultiVersionRangeRevIter {
426			store: self.clone(),
427			cursor: MultiVersionRangeCursor::new(),
428			range,
429			version,
430			batch_size,
431			current_batch: Vec::new(),
432			current_index: 0,
433		}
434	}
435
436	/// Fetch the next batch of entries in reverse order, continuing from cursor position.
437	///
438	/// This properly handles high version density by scanning until `batch_size`
439	/// unique logical keys are collected OR all tiers are exhausted.
440	fn range_rev_next(
441		&self,
442		cursor: &mut MultiVersionRangeCursor,
443		range: EncodedKeyRange,
444		version: CommitVersion,
445		batch_size: u64,
446	) -> Result<MultiVersionBatch> {
447		if cursor.exhausted {
448			return Ok(MultiVersionBatch {
449				items: Vec::new(),
450				has_more: false,
451			});
452		}
453
454		let table = classify_key_range(&range);
455		let (start, end) = make_range_bounds(&range);
456		let batch_size = batch_size as usize;
457		let scan = TierScanQuery {
458			table,
459			start: &start,
460			end: &end,
461			version,
462			range: &range,
463		};
464
465		// Collected entries: logical_key -> (version, value)
466		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
467
468		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
469		while collected.len() < batch_size {
470			let mut any_progress = false;
471
472			// Scan chunk from hot tier (reverse)
473			if let Some(hot) = &self.hot
474				&& !cursor.hot.exhausted
475			{
476				let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
477				any_progress |= progress;
478			}
479
480			// Scan chunk from warm tier (reverse)
481			if let Some(warm) = &self.warm
482				&& !cursor.warm.exhausted
483			{
484				let progress =
485					Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
486				any_progress |= progress;
487			}
488
489			// Scan chunk from cold tier (reverse)
490			if let Some(cold) = &self.cold
491				&& !cursor.cold.exhausted
492			{
493				let progress =
494					Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
495				any_progress |= progress;
496			}
497
498			if !any_progress {
499				// All tiers exhausted
500				cursor.exhausted = true;
501				break;
502			}
503		}
504
505		// Convert to MultiVersionRow in REVERSE sorted key order, filtering out tombstones
506		let items: Vec<MultiVersionRow> = collected
507			.into_iter()
508			.rev()
509			.take(batch_size)
510			.filter_map(|(key_bytes, (v, value))| {
511				value.map(|val| MultiVersionRow {
512					key: EncodedKey(CowVec::new(key_bytes)),
513					row: EncodedRow(val),
514					version: v,
515				})
516			})
517			.collect();
518
519		let has_more = items.len() >= batch_size || !cursor.exhausted;
520
521		Ok(MultiVersionBatch {
522			items,
523			has_more,
524		})
525	}
526
527	/// Scan a chunk from a single tier in reverse and merge into collected entries.
528	/// Returns true if any entries were processed (i.e., made progress).
529	fn scan_tier_chunk_rev<S: TierStorage>(
530		storage: &S,
531		cursor: &mut RangeCursor,
532		scan: &TierScanQuery,
533		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
534	) -> Result<bool> {
535		let batch = storage.range_rev_next(
536			scan.table,
537			cursor,
538			Bound::Included(scan.start),
539			Bound::Included(scan.end),
540			scan.version,
541			TIER_SCAN_CHUNK_SIZE,
542		)?;
543
544		if batch.entries.is_empty() {
545			return Ok(false);
546		}
547
548		for entry in batch.entries {
549			// Entry key is already the logical key, entry.version is the version
550			let original_key = entry.key.as_slice().to_vec();
551			let entry_version = entry.version;
552
553			// Skip if key is not within the requested logical range
554			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
555			if !scan.range.contains(&original_key_encoded) {
556				continue;
557			}
558
559			// Update if no entry exists or this is a higher version
560			let should_update = match collected.get(&original_key) {
561				None => true,
562				Some((existing_version, _)) => entry_version > *existing_version,
563			};
564
565			if should_update {
566				collected.insert(original_key, (entry_version, entry.value));
567			}
568		}
569
570		Ok(true)
571	}
572}
573
574impl MultiVersionGetPrevious for StandardMultiStore {
575	fn get_previous_version(
576		&self,
577		key: &EncodedKey,
578		before_version: CommitVersion,
579	) -> Result<Option<MultiVersionRow>> {
580		if before_version.0 == 0 {
581			return Ok(None);
582		}
583
584		// Hot storage must be available for version lookups
585		let storage = self.hot.as_ref().expect("hot storage required for version lookups");
586
587		let table = classify_key(key);
588		let prev_version = CommitVersion(before_version.0 - 1);
589
590		match get_at_version(storage, table, key.as_ref(), prev_version) {
591			Ok(VersionedGetResult::Value {
592				value,
593				version,
594			}) => Ok(Some(MultiVersionRow {
595				key: key.clone(),
596				row: EncodedRow(CowVec::new(value.to_vec())),
597				version,
598			})),
599			Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
600			Err(e) => Err(e),
601		}
602	}
603}
604
605impl MultiVersionStore for StandardMultiStore {}
606
607/// Iterator for forward multi-version range queries.
608pub struct MultiVersionRangeIter {
609	store: StandardMultiStore,
610	cursor: MultiVersionRangeCursor,
611	range: EncodedKeyRange,
612	version: CommitVersion,
613	batch_size: usize,
614	current_batch: Vec<MultiVersionRow>,
615	current_index: usize,
616}
617
618impl Iterator for MultiVersionRangeIter {
619	type Item = Result<MultiVersionRow>;
620
621	fn next(&mut self) -> Option<Self::Item> {
622		// If we have items in the current batch, return them
623		if self.current_index < self.current_batch.len() {
624			let item = self.current_batch[self.current_index].clone();
625			self.current_index += 1;
626			return Some(Ok(item));
627		}
628
629		// If cursor is exhausted, we're done
630		if self.cursor.exhausted {
631			return None;
632		}
633
634		// Fetch the next batch
635		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
636		{
637			Ok(batch) => {
638				if batch.items.is_empty() {
639					return None;
640				}
641				self.current_batch = batch.items;
642				self.current_index = 0;
643				self.next()
644			}
645			Err(e) => Some(Err(e)),
646		}
647	}
648}
649
650/// Iterator for reverse multi-version range queries.
651pub struct MultiVersionRangeRevIter {
652	store: StandardMultiStore,
653	cursor: MultiVersionRangeCursor,
654	range: EncodedKeyRange,
655	version: CommitVersion,
656	batch_size: usize,
657	current_batch: Vec<MultiVersionRow>,
658	current_index: usize,
659}
660
661impl Iterator for MultiVersionRangeRevIter {
662	type Item = Result<MultiVersionRow>;
663
664	fn next(&mut self) -> Option<Self::Item> {
665		// If we have items in the current batch, return them
666		if self.current_index < self.current_batch.len() {
667			let item = self.current_batch[self.current_index].clone();
668			self.current_index += 1;
669			return Some(Ok(item));
670		}
671
672		// If cursor is exhausted, we're done
673		if self.cursor.exhausted {
674			return None;
675		}
676
677		// Fetch the next batch
678		match self.store.range_rev_next(
679			&mut self.cursor,
680			self.range.clone(),
681			self.version,
682			self.batch_size as u64,
683		) {
684			Ok(batch) => {
685				if batch.items.is_empty() {
686					return None;
687				}
688				self.current_batch = batch.items;
689				self.current_index = 0;
690				self.next()
691			}
692			Err(e) => Some(Err(e)),
693		}
694	}
695}
696
697/// Classify a range to determine which table it belongs to.
698fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
699	classify_range(range).unwrap_or(Multi)
700}
701
702/// Create range bounds from an EncodedKeyRange.
703/// Returns the start and end byte slices for the range query.
704fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
705	let start = match &range.start {
706		Bound::Included(key) => key.as_ref().to_vec(),
707		Bound::Excluded(key) => key.as_ref().to_vec(),
708		Bound::Unbounded => vec![],
709	};
710
711	let end = match &range.end {
712		Bound::Included(key) => key.as_ref().to_vec(),
713		Bound::Excluded(key) => key.as_ref().to_vec(),
714		Bound::Unbounded => vec![0xFFu8; 256],
715	};
716
717	(start, end)
718}