Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	actors::drop::{DropMessage, DropRequest},
11	common::CommitVersion,
12	delta::Delta,
13	encoded::{
14		key::{EncodedKey, EncodedKeyRange},
15		row::EncodedRow,
16	},
17	event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18	interface::store::{
19		EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20		MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21	},
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27	StandardMultiStore,
28	router::{classify_key, classify_range, is_single_version_semantics_key},
29	version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32	Result,
33	tier::{RangeCursor, TierBatch, TierStorage},
34};
35
36/// Fixed chunk size for internal tier scans.
37/// This is the number of versioned entries fetched per tier per iteration.
38const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42	fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43		let table = classify_key(key);
44
45		// Try hot tier first
46		if let Some(hot) = &self.hot {
47			match get_at_version(hot, table, key.as_ref(), version)? {
48				VersionedGetResult::Value {
49					value,
50					version: v,
51				} => {
52					return Ok(Some(MultiVersionRow {
53						key: key.clone(),
54						row: EncodedRow(value),
55						version: v,
56					}));
57				}
58				VersionedGetResult::Tombstone => return Ok(None),
59				VersionedGetResult::NotFound => {}
60			}
61		}
62
63		// Try warm tier
64		if let Some(warm) = &self.warm {
65			match get_at_version(warm, table, key.as_ref(), version)? {
66				VersionedGetResult::Value {
67					value,
68					version: v,
69				} => {
70					return Ok(Some(MultiVersionRow {
71						key: key.clone(),
72						row: EncodedRow(value),
73						version: v,
74					}));
75				}
76				VersionedGetResult::Tombstone => return Ok(None),
77				VersionedGetResult::NotFound => {}
78			}
79		}
80
81		// Try cold tier
82		if let Some(cold) = &self.cold {
83			match get_at_version(cold, table, key.as_ref(), version)? {
84				VersionedGetResult::Value {
85					value,
86					version: v,
87				} => {
88					return Ok(Some(MultiVersionRow {
89						key: key.clone(),
90						row: EncodedRow(value),
91						version: v,
92					}));
93				}
94				VersionedGetResult::Tombstone => return Ok(None),
95				VersionedGetResult::NotFound => {}
96			}
97		}
98
99		Ok(None)
100	}
101}
102
103impl MultiVersionContains for StandardMultiStore {
104	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106		Ok(MultiVersionGet::get(self, key, version)?.is_some())
107	}
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113		// Get the hot storage tier (warm and cold are placeholders for now)
114		let Some(storage) = &self.hot else {
115			return Ok(());
116		};
117
118		let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119		let mut writes: Vec<MultiWrite> = Vec::new();
120		let mut deletes: Vec<MultiDelete> = Vec::new();
121		let mut batches: TierBatch = HashMap::new();
122		let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
123
124		for delta in deltas.iter() {
125			let key = delta.key();
126
127			let table = classify_key(key);
128			let is_single_version = is_single_version_semantics_key(key);
129
130			match delta {
131				Delta::Set {
132					key,
133					row,
134				} => {
135					if is_single_version {
136						pending_set_keys.insert(key.0.clone());
137					}
138
139					writes.push(MultiWrite {
140						key: key.clone(),
141						value_bytes: row.len() as u64,
142					});
143
144					batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
145				}
146				Delta::Unset {
147					key,
148					row,
149				} => {
150					deletes.push(MultiDelete {
151						key: key.clone(),
152						value_bytes: row.len() as u64,
153					});
154
155					batches.entry(table).or_default().push((key.0.clone(), None));
156				}
157				Delta::Remove {
158					key,
159				} => {
160					batches.entry(table).or_default().push((key.0.clone(), None));
161				}
162				Delta::Drop {
163					key,
164				} => {
165					explicit_drops.push((table, key.clone()));
166				}
167			}
168		}
169
170		// Process explicit drops now that pending_set_keys is complete
171		let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
172		for (table, key) in explicit_drops {
173			let pending_version = if pending_set_keys.contains(key.as_ref()) {
174				Some(version)
175			} else {
176				None
177			};
178
179			drop_batch.push(DropRequest {
180				table,
181				key: key.0.clone(),
182				commit_version: version,
183				pending_version,
184			});
185		}
186
187		// Add implicit drops for single-version-semantics keys
188		for key in pending_set_keys.iter() {
189			let table = classify_key(&EncodedKey(key.clone()));
190			drop_batch.push(DropRequest {
191				table,
192				key: key.clone(),
193				commit_version: version,
194				pending_version: Some(version),
195			});
196		}
197
198		if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
199			warn!("Failed to send drop batch");
200		}
201
202		// Pass version explicitly to storage
203		storage.set(version, batches)?;
204
205		// Emit storage stats event for this commit
206		if !writes.is_empty() || !deletes.is_empty() {
207			self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
208		}
209
210		Ok(())
211	}
212}
213
214/// Cursor state for multi-version range streaming.
215///
216/// Tracks position in each tier independently, allowing the scan to continue
217/// until enough unique logical keys are collected.
218#[derive(Debug, Clone, Default)]
219pub struct MultiVersionRangeCursor {
220	/// Cursor for hot tier
221	pub hot: RangeCursor,
222	/// Cursor for warm tier
223	pub warm: RangeCursor,
224	/// Cursor for cold tier
225	pub cold: RangeCursor,
226	/// Whether all tiers are exhausted
227	pub exhausted: bool,
228}
229
230impl MultiVersionRangeCursor {
231	/// Create a new cursor at the start.
232	pub fn new() -> Self {
233		Self::default()
234	}
235
236	/// Check if all tiers are exhausted.
237	pub fn is_exhausted(&self) -> bool {
238		self.exhausted
239	}
240}
241
242/// Parameters shared by tier scan operations (forward and reverse).
243struct TierScanQuery<'a> {
244	table: EntryKind,
245	start: &'a [u8],
246	end: &'a [u8],
247	version: CommitVersion,
248	range: &'a EncodedKeyRange,
249}
250
251impl StandardMultiStore {
252	/// Fetch the next batch of entries, continuing from cursor position.
253	///
254	/// This properly handles high version density by scanning until `batch_size`
255	/// unique logical keys are collected OR all tiers are exhausted.
256	pub fn range_next(
257		&self,
258		cursor: &mut MultiVersionRangeCursor,
259		range: EncodedKeyRange,
260		version: CommitVersion,
261		batch_size: u64,
262	) -> Result<MultiVersionBatch> {
263		if cursor.exhausted {
264			return Ok(MultiVersionBatch {
265				items: Vec::new(),
266				has_more: false,
267			});
268		}
269
270		let table = classify_key_range(&range);
271		let (start, end) = make_range_bounds(&range);
272		let batch_size = batch_size as usize;
273		let scan = TierScanQuery {
274			table,
275			start: &start,
276			end: &end,
277			version,
278			range: &range,
279		};
280
281		// Collected entries: logical_key -> (version, value)
282		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
283
284		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
285		while collected.len() < batch_size {
286			let mut any_progress = false;
287
288			// Scan chunk from hot tier
289			if let Some(hot) = &self.hot
290				&& !cursor.hot.exhausted
291			{
292				let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
293				any_progress |= progress;
294			}
295
296			// Scan chunk from warm tier
297			if let Some(warm) = &self.warm
298				&& !cursor.warm.exhausted
299			{
300				let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
301				any_progress |= progress;
302			}
303
304			// Scan chunk from cold tier
305			if let Some(cold) = &self.cold
306				&& !cursor.cold.exhausted
307			{
308				let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
309				any_progress |= progress;
310			}
311
312			if !any_progress {
313				// All tiers exhausted
314				cursor.exhausted = true;
315				break;
316			}
317		}
318
319		// Convert to MultiVersionRow in sorted key order, filtering out tombstones
320		let items: Vec<MultiVersionRow> = collected
321			.into_iter()
322			.take(batch_size)
323			.filter_map(|(key_bytes, (v, value))| {
324				value.map(|val| MultiVersionRow {
325					key: EncodedKey(CowVec::new(key_bytes)),
326					row: EncodedRow(val),
327					version: v,
328				})
329			})
330			.collect();
331
332		let has_more = items.len() >= batch_size || !cursor.exhausted;
333
334		Ok(MultiVersionBatch {
335			items,
336			has_more,
337		})
338	}
339
340	/// Scan a chunk from a single tier and merge into collected entries.
341	/// Returns true if any entries were processed (i.e., made progress).
342	fn scan_tier_chunk<S: TierStorage>(
343		storage: &S,
344		cursor: &mut RangeCursor,
345		scan: &TierScanQuery,
346		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
347	) -> Result<bool> {
348		let batch = storage.range_next(
349			scan.table,
350			cursor,
351			Bound::Included(scan.start),
352			Bound::Included(scan.end),
353			scan.version,
354			TIER_SCAN_CHUNK_SIZE,
355		)?;
356
357		if batch.entries.is_empty() {
358			return Ok(false);
359		}
360
361		for entry in batch.entries {
362			// Entry key is already the logical key, entry.version is the version
363			let original_key = entry.key.as_slice().to_vec();
364			let entry_version = entry.version;
365
366			// Skip if key is not within the requested logical range
367			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
368			if !scan.range.contains(&original_key_encoded) {
369				continue;
370			}
371
372			// Update if no entry exists or this is a higher version
373			let should_update = match collected.get(&original_key) {
374				None => true,
375				Some((existing_version, _)) => entry_version > *existing_version,
376			};
377
378			if should_update {
379				collected.insert(original_key, (entry_version, entry.value));
380			}
381		}
382
383		Ok(true)
384	}
385
386	/// Create an iterator for forward range queries.
387	///
388	/// This properly handles high version density by scanning until batch_size
389	/// unique logical keys are collected. The iterator yields individual entries
390	/// and maintains cursor state internally.
391	pub fn range(
392		&self,
393		range: EncodedKeyRange,
394		version: CommitVersion,
395		batch_size: usize,
396	) -> MultiVersionRangeIter {
397		MultiVersionRangeIter {
398			store: self.clone(),
399			cursor: MultiVersionRangeCursor::new(),
400			range,
401			version,
402			batch_size,
403			current_batch: Vec::new(),
404			current_index: 0,
405		}
406	}
407
408	/// Create an iterator for reverse range queries.
409	///
410	/// This properly handles high version density by scanning until batch_size
411	/// unique logical keys are collected. The iterator yields individual entries
412	/// in reverse key order and maintains cursor state internally.
413	pub fn range_rev(
414		&self,
415		range: EncodedKeyRange,
416		version: CommitVersion,
417		batch_size: usize,
418	) -> MultiVersionRangeRevIter {
419		MultiVersionRangeRevIter {
420			store: self.clone(),
421			cursor: MultiVersionRangeCursor::new(),
422			range,
423			version,
424			batch_size,
425			current_batch: Vec::new(),
426			current_index: 0,
427		}
428	}
429
430	/// Fetch the next batch of entries in reverse order, continuing from cursor position.
431	///
432	/// This properly handles high version density by scanning until `batch_size`
433	/// unique logical keys are collected OR all tiers are exhausted.
434	fn range_rev_next(
435		&self,
436		cursor: &mut MultiVersionRangeCursor,
437		range: EncodedKeyRange,
438		version: CommitVersion,
439		batch_size: u64,
440	) -> Result<MultiVersionBatch> {
441		if cursor.exhausted {
442			return Ok(MultiVersionBatch {
443				items: Vec::new(),
444				has_more: false,
445			});
446		}
447
448		let table = classify_key_range(&range);
449		let (start, end) = make_range_bounds(&range);
450		let batch_size = batch_size as usize;
451		let scan = TierScanQuery {
452			table,
453			start: &start,
454			end: &end,
455			version,
456			range: &range,
457		};
458
459		// Collected entries: logical_key -> (version, value)
460		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
461
462		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
463		while collected.len() < batch_size {
464			let mut any_progress = false;
465
466			// Scan chunk from hot tier (reverse)
467			if let Some(hot) = &self.hot
468				&& !cursor.hot.exhausted
469			{
470				let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
471				any_progress |= progress;
472			}
473
474			// Scan chunk from warm tier (reverse)
475			if let Some(warm) = &self.warm
476				&& !cursor.warm.exhausted
477			{
478				let progress =
479					Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
480				any_progress |= progress;
481			}
482
483			// Scan chunk from cold tier (reverse)
484			if let Some(cold) = &self.cold
485				&& !cursor.cold.exhausted
486			{
487				let progress =
488					Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
489				any_progress |= progress;
490			}
491
492			if !any_progress {
493				// All tiers exhausted
494				cursor.exhausted = true;
495				break;
496			}
497		}
498
499		// Convert to MultiVersionRow in REVERSE sorted key order, filtering out tombstones
500		let items: Vec<MultiVersionRow> = collected
501			.into_iter()
502			.rev()
503			.take(batch_size)
504			.filter_map(|(key_bytes, (v, value))| {
505				value.map(|val| MultiVersionRow {
506					key: EncodedKey(CowVec::new(key_bytes)),
507					row: EncodedRow(val),
508					version: v,
509				})
510			})
511			.collect();
512
513		let has_more = items.len() >= batch_size || !cursor.exhausted;
514
515		Ok(MultiVersionBatch {
516			items,
517			has_more,
518		})
519	}
520
521	/// Scan a chunk from a single tier in reverse and merge into collected entries.
522	/// Returns true if any entries were processed (i.e., made progress).
523	fn scan_tier_chunk_rev<S: TierStorage>(
524		storage: &S,
525		cursor: &mut RangeCursor,
526		scan: &TierScanQuery,
527		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
528	) -> Result<bool> {
529		let batch = storage.range_rev_next(
530			scan.table,
531			cursor,
532			Bound::Included(scan.start),
533			Bound::Included(scan.end),
534			scan.version,
535			TIER_SCAN_CHUNK_SIZE,
536		)?;
537
538		if batch.entries.is_empty() {
539			return Ok(false);
540		}
541
542		for entry in batch.entries {
543			// Entry key is already the logical key, entry.version is the version
544			let original_key = entry.key.as_slice().to_vec();
545			let entry_version = entry.version;
546
547			// Skip if key is not within the requested logical range
548			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
549			if !scan.range.contains(&original_key_encoded) {
550				continue;
551			}
552
553			// Update if no entry exists or this is a higher version
554			let should_update = match collected.get(&original_key) {
555				None => true,
556				Some((existing_version, _)) => entry_version > *existing_version,
557			};
558
559			if should_update {
560				collected.insert(original_key, (entry_version, entry.value));
561			}
562		}
563
564		Ok(true)
565	}
566}
567
568impl MultiVersionGetPrevious for StandardMultiStore {
569	fn get_previous_version(
570		&self,
571		key: &EncodedKey,
572		before_version: CommitVersion,
573	) -> Result<Option<MultiVersionRow>> {
574		if before_version.0 == 0 {
575			return Ok(None);
576		}
577
578		// Hot storage must be available for version lookups
579		let storage = self.hot.as_ref().expect("hot storage required for version lookups");
580
581		let table = classify_key(key);
582		let prev_version = CommitVersion(before_version.0 - 1);
583
584		match get_at_version(storage, table, key.as_ref(), prev_version) {
585			Ok(VersionedGetResult::Value {
586				value,
587				version,
588			}) => Ok(Some(MultiVersionRow {
589				key: key.clone(),
590				row: EncodedRow(CowVec::new(value.to_vec())),
591				version,
592			})),
593			Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
594			Err(e) => Err(e),
595		}
596	}
597}
598
599impl MultiVersionStore for StandardMultiStore {}
600
601/// Iterator for forward multi-version range queries.
602pub struct MultiVersionRangeIter {
603	store: StandardMultiStore,
604	cursor: MultiVersionRangeCursor,
605	range: EncodedKeyRange,
606	version: CommitVersion,
607	batch_size: usize,
608	current_batch: Vec<MultiVersionRow>,
609	current_index: usize,
610}
611
612impl Iterator for MultiVersionRangeIter {
613	type Item = Result<MultiVersionRow>;
614
615	fn next(&mut self) -> Option<Self::Item> {
616		// If we have items in the current batch, return them
617		if self.current_index < self.current_batch.len() {
618			let item = self.current_batch[self.current_index].clone();
619			self.current_index += 1;
620			return Some(Ok(item));
621		}
622
623		// If cursor is exhausted, we're done
624		if self.cursor.exhausted {
625			return None;
626		}
627
628		// Fetch the next batch
629		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
630		{
631			Ok(batch) => {
632				if batch.items.is_empty() {
633					return None;
634				}
635				self.current_batch = batch.items;
636				self.current_index = 0;
637				self.next()
638			}
639			Err(e) => Some(Err(e)),
640		}
641	}
642}
643
644/// Iterator for reverse multi-version range queries.
645pub struct MultiVersionRangeRevIter {
646	store: StandardMultiStore,
647	cursor: MultiVersionRangeCursor,
648	range: EncodedKeyRange,
649	version: CommitVersion,
650	batch_size: usize,
651	current_batch: Vec<MultiVersionRow>,
652	current_index: usize,
653}
654
655impl Iterator for MultiVersionRangeRevIter {
656	type Item = Result<MultiVersionRow>;
657
658	fn next(&mut self) -> Option<Self::Item> {
659		// If we have items in the current batch, return them
660		if self.current_index < self.current_batch.len() {
661			let item = self.current_batch[self.current_index].clone();
662			self.current_index += 1;
663			return Some(Ok(item));
664		}
665
666		// If cursor is exhausted, we're done
667		if self.cursor.exhausted {
668			return None;
669		}
670
671		// Fetch the next batch
672		match self.store.range_rev_next(
673			&mut self.cursor,
674			self.range.clone(),
675			self.version,
676			self.batch_size as u64,
677		) {
678			Ok(batch) => {
679				if batch.items.is_empty() {
680					return None;
681				}
682				self.current_batch = batch.items;
683				self.current_index = 0;
684				self.next()
685			}
686			Err(e) => Some(Err(e)),
687		}
688	}
689}
690
691/// Classify a range to determine which table it belongs to.
692fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
693	classify_range(range).unwrap_or(EntryKind::Multi)
694}
695
696/// Create range bounds from an EncodedKeyRange.
697/// Returns the start and end byte slices for the range query.
698fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
699	let start = match &range.start {
700		Bound::Included(key) => key.as_ref().to_vec(),
701		Bound::Excluded(key) => key.as_ref().to_vec(),
702		Bound::Unbounded => vec![],
703	};
704
705	let end = match &range.end {
706		Bound::Included(key) => key.as_ref().to_vec(),
707		Bound::Excluded(key) => key.as_ref().to_vec(),
708		Bound::Unbounded => vec![0xFFu8; 256],
709	};
710
711	(start, end)
712}