Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	common::CommitVersion,
11	delta::Delta,
12	encoded::{
13		encoded::EncodedValues,
14		key::{EncodedKey, EncodedKeyRange},
15	},
16	event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17	interface::store::{
18		MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19		MultiVersionStore, MultiVersionValues,
20	},
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::{instrument, warn};
24
25use super::{
26	StandardMultiStore,
27	router::{classify_key, classify_range, is_single_version_semantics_key},
28	version::{VersionedGetResult, get_at_version},
29	worker::{DropMessage, DropRequest},
30};
31use crate::{
32	Result,
33	tier::{EntryKind, EntryKind::Multi, RangeCursor, TierStorage},
34};
35
36/// Fixed chunk size for internal tier scans.
37/// This is the number of versioned entries fetched per tier per iteration.
38const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42	fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionValues>> {
43		let table = classify_key(key);
44
45		// Try hot tier first
46		if let Some(hot) = &self.hot {
47			match get_at_version(hot, table, key.as_ref(), version)? {
48				VersionedGetResult::Value {
49					value,
50					version: v,
51				} => {
52					return Ok(Some(MultiVersionValues {
53						key: key.clone(),
54						values: EncodedValues(value),
55						version: v,
56					}));
57				}
58				VersionedGetResult::Tombstone => return Ok(None),
59				VersionedGetResult::NotFound => {}
60			}
61		}
62
63		// Try warm tier
64		if let Some(warm) = &self.warm {
65			match get_at_version(warm, table, key.as_ref(), version)? {
66				VersionedGetResult::Value {
67					value,
68					version: v,
69				} => {
70					return Ok(Some(MultiVersionValues {
71						key: key.clone(),
72						values: EncodedValues(value),
73						version: v,
74					}));
75				}
76				VersionedGetResult::Tombstone => return Ok(None),
77				VersionedGetResult::NotFound => {}
78			}
79		}
80
81		// Try cold tier
82		if let Some(cold) = &self.cold {
83			match get_at_version(cold, table, key.as_ref(), version)? {
84				VersionedGetResult::Value {
85					value,
86					version: v,
87				} => {
88					return Ok(Some(MultiVersionValues {
89						key: key.clone(),
90						values: EncodedValues(value),
91						version: v,
92					}));
93				}
94				VersionedGetResult::Tombstone => return Ok(None),
95				VersionedGetResult::NotFound => {}
96			}
97		}
98
99		Ok(None)
100	}
101}
102
103impl MultiVersionContains for StandardMultiStore {
104	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106		Ok(MultiVersionGet::get(self, key, version)?.is_some())
107	}
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113		// Get the hot storage tier (warm and cold are placeholders for now)
114		let Some(storage) = &self.hot else {
115			return Ok(());
116		};
117
118		let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119		let mut writes: Vec<StorageWrite> = Vec::new();
120		let mut deletes: Vec<StorageDelete> = Vec::new();
121		let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>> = HashMap::new();
122		let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
123
124		for delta in deltas.iter() {
125			let key = delta.key();
126
127			let table = classify_key(key);
128			let is_single_version = is_single_version_semantics_key(key);
129
130			match delta {
131				Delta::Set {
132					key,
133					values,
134				} => {
135					if is_single_version {
136						pending_set_keys.insert(key.0.clone());
137					}
138
139					writes.push(StorageWrite {
140						key: key.clone(),
141						value_bytes: values.len() as u64,
142					});
143
144					batches.entry(table).or_default().push((key.0.clone(), Some(values.0.clone())));
145				}
146				Delta::Unset {
147					key,
148					values,
149				} => {
150					deletes.push(StorageDelete {
151						key: key.clone(),
152						value_bytes: values.len() as u64,
153					});
154
155					batches.entry(table).or_default().push((key.0.clone(), None));
156				}
157				Delta::Remove {
158					key,
159				} => {
160					batches.entry(table).or_default().push((key.0.clone(), None));
161				}
162				Delta::Drop {
163					key,
164					up_to_version,
165					keep_last_versions,
166				} => {
167					explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
168				}
169			}
170		}
171
172		// Process explicit drops now that pending_set_keys is complete
173		let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
174		for (table, key, up_to_version, keep_last_versions) in explicit_drops {
175			let pending_version = if pending_set_keys.contains(key.as_ref()) {
176				Some(version)
177			} else {
178				None
179			};
180
181			drop_batch.push(DropRequest {
182				table,
183				key: key.0.clone(),
184				up_to_version,
185				keep_last_versions,
186				commit_version: version,
187				pending_version,
188			});
189		}
190
191		// Add implicit drops for single-version-semantics keys
192		for key in pending_set_keys.iter() {
193			let table = classify_key(&EncodedKey(key.clone()));
194			drop_batch.push(DropRequest {
195				table,
196				key: key.clone(),
197				up_to_version: None,
198				keep_last_versions: Some(1),
199				commit_version: version,
200				pending_version: Some(version),
201			});
202		}
203
204		if !drop_batch.is_empty() {
205			if self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
206				warn!("Failed to send drop batch");
207			}
208		}
209
210		// Pass version explicitly to storage
211		storage.set(version, batches)?;
212
213		// Emit storage stats event for this commit
214		if !writes.is_empty() || !deletes.is_empty() {
215			self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
216		}
217
218		Ok(())
219	}
220}
221
222/// Cursor state for multi-version range streaming.
223///
224/// Tracks position in each tier independently, allowing the scan to continue
225/// until enough unique logical keys are collected.
226#[derive(Debug, Clone, Default)]
227pub struct MultiVersionRangeCursor {
228	/// Cursor for hot tier
229	pub hot: RangeCursor,
230	/// Cursor for warm tier
231	pub warm: RangeCursor,
232	/// Cursor for cold tier
233	pub cold: RangeCursor,
234	/// Whether all tiers are exhausted
235	pub exhausted: bool,
236}
237
238impl MultiVersionRangeCursor {
239	/// Create a new cursor at the start.
240	pub fn new() -> Self {
241		Self::default()
242	}
243
244	/// Check if all tiers are exhausted.
245	pub fn is_exhausted(&self) -> bool {
246		self.exhausted
247	}
248}
249
250impl StandardMultiStore {
251	/// Fetch the next batch of entries, continuing from cursor position.
252	///
253	/// This properly handles high version density by scanning until `batch_size`
254	/// unique logical keys are collected OR all tiers are exhausted.
255	pub fn range_next(
256		&self,
257		cursor: &mut MultiVersionRangeCursor,
258		range: EncodedKeyRange,
259		version: CommitVersion,
260		batch_size: u64,
261	) -> Result<MultiVersionBatch> {
262		if cursor.exhausted {
263			return Ok(MultiVersionBatch {
264				items: Vec::new(),
265				has_more: false,
266			});
267		}
268
269		let table = classify_key_range(&range);
270		let (start, end) = make_range_bounds(&range);
271		let batch_size = batch_size as usize;
272
273		// Collected entries: logical_key -> (version, value)
274		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
275
276		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
277		while collected.len() < batch_size {
278			let mut any_progress = false;
279
280			// Scan chunk from hot tier
281			if let Some(hot) = &self.hot {
282				if !cursor.hot.exhausted {
283					let progress = Self::scan_tier_chunk(
284						hot,
285						table,
286						&mut cursor.hot,
287						&start,
288						&end,
289						version,
290						&range,
291						&mut collected,
292					)?;
293					any_progress |= progress;
294				}
295			}
296
297			// Scan chunk from warm tier
298			if let Some(warm) = &self.warm {
299				if !cursor.warm.exhausted {
300					let progress = Self::scan_tier_chunk(
301						warm,
302						table,
303						&mut cursor.warm,
304						&start,
305						&end,
306						version,
307						&range,
308						&mut collected,
309					)?;
310					any_progress |= progress;
311				}
312			}
313
314			// Scan chunk from cold tier
315			if let Some(cold) = &self.cold {
316				if !cursor.cold.exhausted {
317					let progress = Self::scan_tier_chunk(
318						cold,
319						table,
320						&mut cursor.cold,
321						&start,
322						&end,
323						version,
324						&range,
325						&mut collected,
326					)?;
327					any_progress |= progress;
328				}
329			}
330
331			if !any_progress {
332				// All tiers exhausted
333				cursor.exhausted = true;
334				break;
335			}
336		}
337
338		// Convert to MultiVersionValues in sorted key order, filtering out tombstones
339		let items: Vec<MultiVersionValues> = collected
340			.into_iter()
341			.take(batch_size)
342			.filter_map(|(key_bytes, (v, value))| {
343				value.map(|val| MultiVersionValues {
344					key: EncodedKey(CowVec::new(key_bytes)),
345					values: EncodedValues(val),
346					version: v,
347				})
348			})
349			.collect();
350
351		let has_more = items.len() >= batch_size || !cursor.exhausted;
352
353		Ok(MultiVersionBatch {
354			items,
355			has_more,
356		})
357	}
358
359	/// Scan a chunk from a single tier and merge into collected entries.
360	/// Returns true if any entries were processed (i.e., made progress).
361	fn scan_tier_chunk<S: TierStorage>(
362		storage: &S,
363		table: EntryKind,
364		cursor: &mut RangeCursor,
365		start: &[u8],
366		end: &[u8],
367		version: CommitVersion,
368		range: &EncodedKeyRange,
369		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
370	) -> Result<bool> {
371		let batch = storage.range_next(
372			table,
373			cursor,
374			Bound::Included(start),
375			Bound::Included(end),
376			version,
377			TIER_SCAN_CHUNK_SIZE,
378		)?;
379
380		if batch.entries.is_empty() {
381			return Ok(false);
382		}
383
384		for entry in batch.entries {
385			// Entry key is already the logical key, entry.version is the version
386			let original_key = entry.key.as_slice().to_vec();
387			let entry_version = entry.version;
388
389			// Skip if key is not within the requested logical range
390			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
391			if !range.contains(&original_key_encoded) {
392				continue;
393			}
394
395			// Update if no entry exists or this is a higher version
396			let should_update = match collected.get(&original_key) {
397				None => true,
398				Some((existing_version, _)) => entry_version > *existing_version,
399			};
400
401			if should_update {
402				collected.insert(original_key, (entry_version, entry.value));
403			}
404		}
405
406		Ok(true)
407	}
408
409	/// Create an iterator for forward range queries.
410	///
411	/// This properly handles high version density by scanning until batch_size
412	/// unique logical keys are collected. The iterator yields individual entries
413	/// and maintains cursor state internally.
414	pub fn range(
415		&self,
416		range: EncodedKeyRange,
417		version: CommitVersion,
418		batch_size: usize,
419	) -> MultiVersionRangeIter {
420		MultiVersionRangeIter {
421			store: self.clone(),
422			cursor: MultiVersionRangeCursor::new(),
423			range,
424			version,
425			batch_size,
426			current_batch: Vec::new(),
427			current_index: 0,
428		}
429	}
430
431	/// Create an iterator for reverse range queries.
432	///
433	/// This properly handles high version density by scanning until batch_size
434	/// unique logical keys are collected. The iterator yields individual entries
435	/// in reverse key order and maintains cursor state internally.
436	pub fn range_rev(
437		&self,
438		range: EncodedKeyRange,
439		version: CommitVersion,
440		batch_size: usize,
441	) -> MultiVersionRangeRevIter {
442		MultiVersionRangeRevIter {
443			store: self.clone(),
444			cursor: MultiVersionRangeCursor::new(),
445			range,
446			version,
447			batch_size,
448			current_batch: Vec::new(),
449			current_index: 0,
450		}
451	}
452
453	/// Fetch the next batch of entries in reverse order, continuing from cursor position.
454	///
455	/// This properly handles high version density by scanning until `batch_size`
456	/// unique logical keys are collected OR all tiers are exhausted.
457	fn range_rev_next(
458		&self,
459		cursor: &mut MultiVersionRangeCursor,
460		range: EncodedKeyRange,
461		version: CommitVersion,
462		batch_size: u64,
463	) -> Result<MultiVersionBatch> {
464		if cursor.exhausted {
465			return Ok(MultiVersionBatch {
466				items: Vec::new(),
467				has_more: false,
468			});
469		}
470
471		let table = classify_key_range(&range);
472		let (start, end) = make_range_bounds(&range);
473		let batch_size = batch_size as usize;
474
475		// Collected entries: logical_key -> (version, value)
476		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
477
478		// Keep scanning until we have batch_size unique logical keys OR all tiers exhausted
479		while collected.len() < batch_size {
480			let mut any_progress = false;
481
482			// Scan chunk from hot tier (reverse)
483			if let Some(hot) = &self.hot {
484				if !cursor.hot.exhausted {
485					let progress = Self::scan_tier_chunk_rev(
486						hot,
487						table,
488						&mut cursor.hot,
489						&start,
490						&end,
491						version,
492						&range,
493						&mut collected,
494					)?;
495					any_progress |= progress;
496				}
497			}
498
499			// Scan chunk from warm tier (reverse)
500			if let Some(warm) = &self.warm {
501				if !cursor.warm.exhausted {
502					let progress = Self::scan_tier_chunk_rev(
503						warm,
504						table,
505						&mut cursor.warm,
506						&start,
507						&end,
508						version,
509						&range,
510						&mut collected,
511					)?;
512					any_progress |= progress;
513				}
514			}
515
516			// Scan chunk from cold tier (reverse)
517			if let Some(cold) = &self.cold {
518				if !cursor.cold.exhausted {
519					let progress = Self::scan_tier_chunk_rev(
520						cold,
521						table,
522						&mut cursor.cold,
523						&start,
524						&end,
525						version,
526						&range,
527						&mut collected,
528					)?;
529					any_progress |= progress;
530				}
531			}
532
533			if !any_progress {
534				// All tiers exhausted
535				cursor.exhausted = true;
536				break;
537			}
538		}
539
540		// Convert to MultiVersionValues in REVERSE sorted key order, filtering out tombstones
541		let items: Vec<MultiVersionValues> = collected
542			.into_iter()
543			.rev()
544			.take(batch_size)
545			.filter_map(|(key_bytes, (v, value))| {
546				value.map(|val| MultiVersionValues {
547					key: EncodedKey(CowVec::new(key_bytes)),
548					values: EncodedValues(val),
549					version: v,
550				})
551			})
552			.collect();
553
554		let has_more = items.len() >= batch_size || !cursor.exhausted;
555
556		Ok(MultiVersionBatch {
557			items,
558			has_more,
559		})
560	}
561
562	/// Scan a chunk from a single tier in reverse and merge into collected entries.
563	/// Returns true if any entries were processed (i.e., made progress).
564	fn scan_tier_chunk_rev<S: TierStorage>(
565		storage: &S,
566		table: EntryKind,
567		cursor: &mut RangeCursor,
568		start: &[u8],
569		end: &[u8],
570		version: CommitVersion,
571		range: &EncodedKeyRange,
572		collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
573	) -> Result<bool> {
574		let batch = storage.range_rev_next(
575			table,
576			cursor,
577			Bound::Included(start),
578			Bound::Included(end),
579			version,
580			TIER_SCAN_CHUNK_SIZE,
581		)?;
582
583		if batch.entries.is_empty() {
584			return Ok(false);
585		}
586
587		for entry in batch.entries {
588			// Entry key is already the logical key, entry.version is the version
589			let original_key = entry.key.as_slice().to_vec();
590			let entry_version = entry.version;
591
592			// Skip if key is not within the requested logical range
593			let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
594			if !range.contains(&original_key_encoded) {
595				continue;
596			}
597
598			// Update if no entry exists or this is a higher version
599			let should_update = match collected.get(&original_key) {
600				None => true,
601				Some((existing_version, _)) => entry_version > *existing_version,
602			};
603
604			if should_update {
605				collected.insert(original_key, (entry_version, entry.value));
606			}
607		}
608
609		Ok(true)
610	}
611}
612
613impl MultiVersionGetPrevious for StandardMultiStore {
614	fn get_previous_version(
615		&self,
616		key: &EncodedKey,
617		before_version: CommitVersion,
618	) -> Result<Option<MultiVersionValues>> {
619		if before_version.0 == 0 {
620			return Ok(None);
621		}
622
623		// Hot storage must be available for version lookups
624		let storage = self.hot.as_ref().expect("hot storage required for version lookups");
625
626		let table = classify_key(key);
627		let prev_version = CommitVersion(before_version.0 - 1);
628
629		match get_at_version(storage, table, key.as_ref(), prev_version) {
630			Ok(VersionedGetResult::Value {
631				value,
632				version,
633			}) => Ok(Some(MultiVersionValues {
634				key: key.clone(),
635				values: EncodedValues(CowVec::new(value.to_vec())),
636				version,
637			})),
638			Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
639			Err(e) => Err(e),
640		}
641	}
642}
643
644impl MultiVersionStore for StandardMultiStore {}
645
646/// Iterator for forward multi-version range queries.
647pub struct MultiVersionRangeIter {
648	store: StandardMultiStore,
649	cursor: MultiVersionRangeCursor,
650	range: EncodedKeyRange,
651	version: CommitVersion,
652	batch_size: usize,
653	current_batch: Vec<MultiVersionValues>,
654	current_index: usize,
655}
656
657impl Iterator for MultiVersionRangeIter {
658	type Item = Result<MultiVersionValues>;
659
660	fn next(&mut self) -> Option<Self::Item> {
661		// If we have items in the current batch, return them
662		if self.current_index < self.current_batch.len() {
663			let item = self.current_batch[self.current_index].clone();
664			self.current_index += 1;
665			return Some(Ok(item));
666		}
667
668		// If cursor is exhausted, we're done
669		if self.cursor.exhausted {
670			return None;
671		}
672
673		// Fetch the next batch
674		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
675		{
676			Ok(batch) => {
677				if batch.items.is_empty() {
678					return None;
679				}
680				self.current_batch = batch.items;
681				self.current_index = 0;
682				self.next()
683			}
684			Err(e) => Some(Err(e)),
685		}
686	}
687}
688
689/// Iterator for reverse multi-version range queries.
690pub struct MultiVersionRangeRevIter {
691	store: StandardMultiStore,
692	cursor: MultiVersionRangeCursor,
693	range: EncodedKeyRange,
694	version: CommitVersion,
695	batch_size: usize,
696	current_batch: Vec<MultiVersionValues>,
697	current_index: usize,
698}
699
700impl Iterator for MultiVersionRangeRevIter {
701	type Item = Result<MultiVersionValues>;
702
703	fn next(&mut self) -> Option<Self::Item> {
704		// If we have items in the current batch, return them
705		if self.current_index < self.current_batch.len() {
706			let item = self.current_batch[self.current_index].clone();
707			self.current_index += 1;
708			return Some(Ok(item));
709		}
710
711		// If cursor is exhausted, we're done
712		if self.cursor.exhausted {
713			return None;
714		}
715
716		// Fetch the next batch
717		match self.store.range_rev_next(
718			&mut self.cursor,
719			self.range.clone(),
720			self.version,
721			self.batch_size as u64,
722		) {
723			Ok(batch) => {
724				if batch.items.is_empty() {
725					return None;
726				}
727				self.current_batch = batch.items;
728				self.current_index = 0;
729				self.next()
730			}
731			Err(e) => Some(Err(e)),
732		}
733	}
734}
735
736/// Classify a range to determine which table it belongs to.
737fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
738	classify_range(range).unwrap_or(Multi)
739}
740
741/// Create range bounds from an EncodedKeyRange.
742/// Returns the start and end byte slices for the range query.
743fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
744	let start = match &range.start {
745		Bound::Included(key) => key.as_ref().to_vec(),
746		Bound::Excluded(key) => key.as_ref().to_vec(),
747		Bound::Unbounded => vec![],
748	};
749
750	let end = match &range.end {
751		Bound::Included(key) => key.as_ref().to_vec(),
752		Bound::Excluded(key) => key.as_ref().to_vec(),
753		Bound::Unbounded => vec![0xFFu8; 256],
754	};
755
756	(start, end)
757}