1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 actors::drop::{DropMessage, DropRequest},
11 common::CommitVersion,
12 delta::Delta,
13 encoded::{
14 key::{EncodedKey, EncodedKeyRange},
15 row::EncodedRow,
16 },
17 event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18 interface::store::{
19 EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20 MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21 },
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27 StandardMultiStore,
28 router::{classify_key, classify_range, is_single_version_semantics_key},
29 version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32 Result,
33 tier::{RangeCursor, TierBatch, TierStorage},
34};
35
36const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42 fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43 let table = classify_key(key);
44
45 if let Some(hot) = &self.hot {
47 match get_at_version(hot, table, key.as_ref(), version)? {
48 VersionedGetResult::Value {
49 value,
50 version: v,
51 } => {
52 return Ok(Some(MultiVersionRow {
53 key: key.clone(),
54 row: EncodedRow(value),
55 version: v,
56 }));
57 }
58 VersionedGetResult::Tombstone => return Ok(None),
59 VersionedGetResult::NotFound => {}
60 }
61 }
62
63 if let Some(warm) = &self.warm {
65 match get_at_version(warm, table, key.as_ref(), version)? {
66 VersionedGetResult::Value {
67 value,
68 version: v,
69 } => {
70 return Ok(Some(MultiVersionRow {
71 key: key.clone(),
72 row: EncodedRow(value),
73 version: v,
74 }));
75 }
76 VersionedGetResult::Tombstone => return Ok(None),
77 VersionedGetResult::NotFound => {}
78 }
79 }
80
81 if let Some(cold) = &self.cold {
83 match get_at_version(cold, table, key.as_ref(), version)? {
84 VersionedGetResult::Value {
85 value,
86 version: v,
87 } => {
88 return Ok(Some(MultiVersionRow {
89 key: key.clone(),
90 row: EncodedRow(value),
91 version: v,
92 }));
93 }
94 VersionedGetResult::Tombstone => return Ok(None),
95 VersionedGetResult::NotFound => {}
96 }
97 }
98
99 Ok(None)
100 }
101}
102
103impl MultiVersionContains for StandardMultiStore {
104 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106 Ok(MultiVersionGet::get(self, key, version)?.is_some())
107 }
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113 let Some(storage) = &self.hot else {
115 return Ok(());
116 };
117
118 let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119 let mut writes: Vec<MultiWrite> = Vec::new();
120 let mut deletes: Vec<MultiDelete> = Vec::new();
121 let mut batches: TierBatch = HashMap::new();
122 let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
123
124 for delta in deltas.iter() {
125 let key = delta.key();
126
127 let table = classify_key(key);
128 let is_single_version = is_single_version_semantics_key(key);
129
130 match delta {
131 Delta::Set {
132 key,
133 row,
134 } => {
135 if is_single_version {
136 pending_set_keys.insert(key.0.clone());
137 }
138
139 writes.push(MultiWrite {
140 key: key.clone(),
141 value_bytes: row.len() as u64,
142 });
143
144 batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
145 }
146 Delta::Unset {
147 key,
148 row,
149 } => {
150 deletes.push(MultiDelete {
151 key: key.clone(),
152 value_bytes: row.len() as u64,
153 });
154
155 batches.entry(table).or_default().push((key.0.clone(), None));
156 }
157 Delta::Remove {
158 key,
159 } => {
160 batches.entry(table).or_default().push((key.0.clone(), None));
161 }
162 Delta::Drop {
163 key,
164 } => {
165 explicit_drops.push((table, key.clone()));
166 }
167 }
168 }
169
170 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
172 for (table, key) in explicit_drops {
173 let pending_version = if pending_set_keys.contains(key.as_ref()) {
174 Some(version)
175 } else {
176 None
177 };
178
179 drop_batch.push(DropRequest {
180 table,
181 key: key.0.clone(),
182 commit_version: version,
183 pending_version,
184 });
185 }
186
187 for key in pending_set_keys.iter() {
189 let table = classify_key(&EncodedKey(key.clone()));
190 drop_batch.push(DropRequest {
191 table,
192 key: key.clone(),
193 commit_version: version,
194 pending_version: Some(version),
195 });
196 }
197
198 if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
199 warn!("Failed to send drop batch");
200 }
201
202 storage.set(version, batches)?;
204
205 if !writes.is_empty() || !deletes.is_empty() {
207 self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
208 }
209
210 Ok(())
211 }
212}
213
214#[derive(Debug, Clone, Default)]
219pub struct MultiVersionRangeCursor {
220 pub hot: RangeCursor,
222 pub warm: RangeCursor,
224 pub cold: RangeCursor,
226 pub exhausted: bool,
228}
229
230impl MultiVersionRangeCursor {
231 pub fn new() -> Self {
233 Self::default()
234 }
235
236 pub fn is_exhausted(&self) -> bool {
238 self.exhausted
239 }
240}
241
242struct TierScanQuery<'a> {
244 table: EntryKind,
245 start: &'a [u8],
246 end: &'a [u8],
247 version: CommitVersion,
248 range: &'a EncodedKeyRange,
249}
250
251impl StandardMultiStore {
252 pub fn range_next(
257 &self,
258 cursor: &mut MultiVersionRangeCursor,
259 range: EncodedKeyRange,
260 version: CommitVersion,
261 batch_size: u64,
262 ) -> Result<MultiVersionBatch> {
263 if cursor.exhausted {
264 return Ok(MultiVersionBatch {
265 items: Vec::new(),
266 has_more: false,
267 });
268 }
269
270 let table = classify_key_range(&range);
271 let (start, end) = make_range_bounds(&range);
272 let batch_size = batch_size as usize;
273 let scan = TierScanQuery {
274 table,
275 start: &start,
276 end: &end,
277 version,
278 range: &range,
279 };
280
281 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
283
284 while collected.len() < batch_size {
286 let mut any_progress = false;
287
288 if let Some(hot) = &self.hot
290 && !cursor.hot.exhausted
291 {
292 let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
293 any_progress |= progress;
294 }
295
296 if let Some(warm) = &self.warm
298 && !cursor.warm.exhausted
299 {
300 let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
301 any_progress |= progress;
302 }
303
304 if let Some(cold) = &self.cold
306 && !cursor.cold.exhausted
307 {
308 let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
309 any_progress |= progress;
310 }
311
312 if !any_progress {
313 cursor.exhausted = true;
315 break;
316 }
317 }
318
319 let items: Vec<MultiVersionRow> = collected
321 .into_iter()
322 .take(batch_size)
323 .filter_map(|(key_bytes, (v, value))| {
324 value.map(|val| MultiVersionRow {
325 key: EncodedKey(CowVec::new(key_bytes)),
326 row: EncodedRow(val),
327 version: v,
328 })
329 })
330 .collect();
331
332 let has_more = items.len() >= batch_size || !cursor.exhausted;
333
334 Ok(MultiVersionBatch {
335 items,
336 has_more,
337 })
338 }
339
340 fn scan_tier_chunk<S: TierStorage>(
343 storage: &S,
344 cursor: &mut RangeCursor,
345 scan: &TierScanQuery,
346 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
347 ) -> Result<bool> {
348 let batch = storage.range_next(
349 scan.table,
350 cursor,
351 Bound::Included(scan.start),
352 Bound::Included(scan.end),
353 scan.version,
354 TIER_SCAN_CHUNK_SIZE,
355 )?;
356
357 if batch.entries.is_empty() {
358 return Ok(false);
359 }
360
361 for entry in batch.entries {
362 let original_key = entry.key.as_slice().to_vec();
364 let entry_version = entry.version;
365
366 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
368 if !scan.range.contains(&original_key_encoded) {
369 continue;
370 }
371
372 let should_update = match collected.get(&original_key) {
374 None => true,
375 Some((existing_version, _)) => entry_version > *existing_version,
376 };
377
378 if should_update {
379 collected.insert(original_key, (entry_version, entry.value));
380 }
381 }
382
383 Ok(true)
384 }
385
386 pub fn range(
392 &self,
393 range: EncodedKeyRange,
394 version: CommitVersion,
395 batch_size: usize,
396 ) -> MultiVersionRangeIter {
397 MultiVersionRangeIter {
398 store: self.clone(),
399 cursor: MultiVersionRangeCursor::new(),
400 range,
401 version,
402 batch_size,
403 current_batch: Vec::new(),
404 current_index: 0,
405 }
406 }
407
408 pub fn range_rev(
414 &self,
415 range: EncodedKeyRange,
416 version: CommitVersion,
417 batch_size: usize,
418 ) -> MultiVersionRangeRevIter {
419 MultiVersionRangeRevIter {
420 store: self.clone(),
421 cursor: MultiVersionRangeCursor::new(),
422 range,
423 version,
424 batch_size,
425 current_batch: Vec::new(),
426 current_index: 0,
427 }
428 }
429
430 fn range_rev_next(
435 &self,
436 cursor: &mut MultiVersionRangeCursor,
437 range: EncodedKeyRange,
438 version: CommitVersion,
439 batch_size: u64,
440 ) -> Result<MultiVersionBatch> {
441 if cursor.exhausted {
442 return Ok(MultiVersionBatch {
443 items: Vec::new(),
444 has_more: false,
445 });
446 }
447
448 let table = classify_key_range(&range);
449 let (start, end) = make_range_bounds(&range);
450 let batch_size = batch_size as usize;
451 let scan = TierScanQuery {
452 table,
453 start: &start,
454 end: &end,
455 version,
456 range: &range,
457 };
458
459 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
461
462 while collected.len() < batch_size {
464 let mut any_progress = false;
465
466 if let Some(hot) = &self.hot
468 && !cursor.hot.exhausted
469 {
470 let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
471 any_progress |= progress;
472 }
473
474 if let Some(warm) = &self.warm
476 && !cursor.warm.exhausted
477 {
478 let progress =
479 Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
480 any_progress |= progress;
481 }
482
483 if let Some(cold) = &self.cold
485 && !cursor.cold.exhausted
486 {
487 let progress =
488 Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
489 any_progress |= progress;
490 }
491
492 if !any_progress {
493 cursor.exhausted = true;
495 break;
496 }
497 }
498
499 let items: Vec<MultiVersionRow> = collected
501 .into_iter()
502 .rev()
503 .take(batch_size)
504 .filter_map(|(key_bytes, (v, value))| {
505 value.map(|val| MultiVersionRow {
506 key: EncodedKey(CowVec::new(key_bytes)),
507 row: EncodedRow(val),
508 version: v,
509 })
510 })
511 .collect();
512
513 let has_more = items.len() >= batch_size || !cursor.exhausted;
514
515 Ok(MultiVersionBatch {
516 items,
517 has_more,
518 })
519 }
520
521 fn scan_tier_chunk_rev<S: TierStorage>(
524 storage: &S,
525 cursor: &mut RangeCursor,
526 scan: &TierScanQuery,
527 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
528 ) -> Result<bool> {
529 let batch = storage.range_rev_next(
530 scan.table,
531 cursor,
532 Bound::Included(scan.start),
533 Bound::Included(scan.end),
534 scan.version,
535 TIER_SCAN_CHUNK_SIZE,
536 )?;
537
538 if batch.entries.is_empty() {
539 return Ok(false);
540 }
541
542 for entry in batch.entries {
543 let original_key = entry.key.as_slice().to_vec();
545 let entry_version = entry.version;
546
547 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
549 if !scan.range.contains(&original_key_encoded) {
550 continue;
551 }
552
553 let should_update = match collected.get(&original_key) {
555 None => true,
556 Some((existing_version, _)) => entry_version > *existing_version,
557 };
558
559 if should_update {
560 collected.insert(original_key, (entry_version, entry.value));
561 }
562 }
563
564 Ok(true)
565 }
566}
567
568impl MultiVersionGetPrevious for StandardMultiStore {
569 fn get_previous_version(
570 &self,
571 key: &EncodedKey,
572 before_version: CommitVersion,
573 ) -> Result<Option<MultiVersionRow>> {
574 if before_version.0 == 0 {
575 return Ok(None);
576 }
577
578 let storage = self.hot.as_ref().expect("hot storage required for version lookups");
580
581 let table = classify_key(key);
582 let prev_version = CommitVersion(before_version.0 - 1);
583
584 match get_at_version(storage, table, key.as_ref(), prev_version) {
585 Ok(VersionedGetResult::Value {
586 value,
587 version,
588 }) => Ok(Some(MultiVersionRow {
589 key: key.clone(),
590 row: EncodedRow(CowVec::new(value.to_vec())),
591 version,
592 })),
593 Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
594 Err(e) => Err(e),
595 }
596 }
597}
598
599impl MultiVersionStore for StandardMultiStore {}
600
601pub struct MultiVersionRangeIter {
603 store: StandardMultiStore,
604 cursor: MultiVersionRangeCursor,
605 range: EncodedKeyRange,
606 version: CommitVersion,
607 batch_size: usize,
608 current_batch: Vec<MultiVersionRow>,
609 current_index: usize,
610}
611
612impl Iterator for MultiVersionRangeIter {
613 type Item = Result<MultiVersionRow>;
614
615 fn next(&mut self) -> Option<Self::Item> {
616 if self.current_index < self.current_batch.len() {
618 let item = self.current_batch[self.current_index].clone();
619 self.current_index += 1;
620 return Some(Ok(item));
621 }
622
623 if self.cursor.exhausted {
625 return None;
626 }
627
628 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
630 {
631 Ok(batch) => {
632 if batch.items.is_empty() {
633 return None;
634 }
635 self.current_batch = batch.items;
636 self.current_index = 0;
637 self.next()
638 }
639 Err(e) => Some(Err(e)),
640 }
641 }
642}
643
644pub struct MultiVersionRangeRevIter {
646 store: StandardMultiStore,
647 cursor: MultiVersionRangeCursor,
648 range: EncodedKeyRange,
649 version: CommitVersion,
650 batch_size: usize,
651 current_batch: Vec<MultiVersionRow>,
652 current_index: usize,
653}
654
655impl Iterator for MultiVersionRangeRevIter {
656 type Item = Result<MultiVersionRow>;
657
658 fn next(&mut self) -> Option<Self::Item> {
659 if self.current_index < self.current_batch.len() {
661 let item = self.current_batch[self.current_index].clone();
662 self.current_index += 1;
663 return Some(Ok(item));
664 }
665
666 if self.cursor.exhausted {
668 return None;
669 }
670
671 match self.store.range_rev_next(
673 &mut self.cursor,
674 self.range.clone(),
675 self.version,
676 self.batch_size as u64,
677 ) {
678 Ok(batch) => {
679 if batch.items.is_empty() {
680 return None;
681 }
682 self.current_batch = batch.items;
683 self.current_index = 0;
684 self.next()
685 }
686 Err(e) => Some(Err(e)),
687 }
688 }
689}
690
691fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
693 classify_range(range).unwrap_or(EntryKind::Multi)
694}
695
696fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
699 let start = match &range.start {
700 Bound::Included(key) => key.as_ref().to_vec(),
701 Bound::Excluded(key) => key.as_ref().to_vec(),
702 Bound::Unbounded => vec![],
703 };
704
705 let end = match &range.end {
706 Bound::Included(key) => key.as_ref().to_vec(),
707 Bound::Excluded(key) => key.as_ref().to_vec(),
708 Bound::Unbounded => vec![0xFFu8; 256],
709 };
710
711 (start, end)
712}