reifydb_store_multi/tier/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Common storage tier traits and types.
5//!
6//! This module defines the minimal interface that all storage tiers (hot, warm, cold)
7//! must implement. All MVCC, CDC, and routing logic belongs in the store layer above.
8
9use std::{collections::HashMap, ops::Bound};
10
11use reifydb_core::{
12 common::CommitVersion,
13 interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14};
15use reifydb_type::{Result, util::cowvec::CowVec};
16
17/// A batch of key-value entries grouped by entry kind, used for atomic multi-table writes.
18pub type TierBatch = HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>;
19
20/// Identifies a logical table/namespace in storage.
21///
22/// The store layer routes keys to the appropriate storage based on key type.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum EntryKind {
25 /// Multi-version storage for general data
26 Multi,
27 /// Per-source table for row data
28 Source(ShapeId),
29 /// Per-operator table for flow node state
30 Operator(FlowNodeId),
31}
32
33/// A raw storage entry with version.
34///
35/// Value is None for tombstones (deletions).
36#[derive(Debug, Clone)]
37pub struct RawEntry {
38 pub key: CowVec<u8>,
39 pub version: CommitVersion,
40 pub value: Option<CowVec<u8>>,
41}
42
43/// A batch of range results with continuation info for pagination.
44#[derive(Debug, Clone)]
45pub struct RangeBatch {
46 /// The entries in this batch.
47 pub entries: Vec<RawEntry>,
48 /// Whether there are more entries after this batch.
49 pub has_more: bool,
50}
51
52impl RangeBatch {
53 /// Creates an empty batch with no more results.
54 pub fn empty() -> Self {
55 Self {
56 entries: Vec::new(),
57 has_more: false,
58 }
59 }
60
61 /// Returns true if this batch contains no entries.
62 pub fn is_empty(&self) -> bool {
63 self.entries.is_empty()
64 }
65}
66
67/// Cursor state for streaming range queries.
68///
69/// Tracks position within a range scan, enabling efficient continuation
70/// across multiple batches without re-scanning from the beginning.
71#[derive(Debug, Clone)]
72pub struct RangeCursor {
73 /// Last key seen in the previous batch (for Bound::Excluded continuation)
74 pub last_key: Option<CowVec<u8>>,
75 /// Whether this stream is exhausted
76 pub exhausted: bool,
77}
78
79impl RangeCursor {
80 /// Create a new cursor at the start of a range.
81 pub fn new() -> Self {
82 Self {
83 last_key: None,
84 exhausted: false,
85 }
86 }
87
88 /// Check if the stream is exhausted.
89 pub fn is_exhausted(&self) -> bool {
90 self.exhausted
91 }
92}
93
94impl Default for RangeCursor {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100/// The tier storage trait.
101///
102/// This is intentionally minimal - just raw bytes in/out.
103/// Version is a first-class parameter for all operations.
104/// All MVCC, CDC, and routing logic belongs in the store layer above.
105///
106/// Implementations must be thread-safe and cloneable.
107pub trait TierStorage: Send + Sync + Clone + 'static {
108 /// Get the value for a key at or before the given version.
109 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>>;
110
111 /// Check if a key exists at or before the given version.
112 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
113 Ok(self.get(table, key, version)?.is_some())
114 }
115
116 /// Write entries to multiple tables atomically at the given version.
117 ///
118 /// All entries across all tables are written in a single transaction.
119 /// This ensures durability and atomicity for multi-table commits.
120 fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()>;
121
122 /// Fetch the next batch of entries in key order at or before version.
123 ///
124 /// Uses the cursor to track position. On first call, cursor should be new.
125 /// On subsequent calls, pass the same cursor to continue from where left off.
126 /// Returns up to `batch_size` entries. The cursor is updated with the last
127 /// key seen, and `exhausted` is set to true when no more entries remain.
128 fn range_next(
129 &self,
130 table: EntryKind,
131 cursor: &mut RangeCursor,
132 start: Bound<&[u8]>,
133 end: Bound<&[u8]>,
134 version: CommitVersion,
135 batch_size: usize,
136 ) -> Result<RangeBatch>;
137
138 /// Fetch the next batch of entries in reverse key order at or before version.
139 ///
140 /// Uses the cursor to track position. On first call, cursor should be new.
141 /// On subsequent calls, pass the same cursor to continue from where left off.
142 /// Returns up to `batch_size` entries. The cursor is updated with the last
143 /// key seen, and `exhausted` is set to true when no more entries remain.
144 fn range_rev_next(
145 &self,
146 table: EntryKind,
147 cursor: &mut RangeCursor,
148 start: Bound<&[u8]>,
149 end: Bound<&[u8]>,
150 version: CommitVersion,
151 batch_size: usize,
152 ) -> Result<RangeBatch>;
153
154 /// Ensure a table exists (creates if needed).
155 ///
156 /// For memory backends this is typically a no-op.
157 /// For SQL backends this may create tables.
158 fn ensure_table(&self, table: EntryKind) -> Result<()>;
159
160 /// Delete all entries in a table.
161 fn clear_table(&self, table: EntryKind) -> Result<()>;
162
163 /// Physically drop specific versions of entries from storage.
164 ///
165 /// Unlike `set()` with None values which inserts tombstones for MVCC,
166 /// this method actually removes entries from storage to reclaim memory.
167 /// Used by the drop worker to erase old versions after they're no longer needed.
168 ///
169 /// Each entry in the batch is a (key, version) pair identifying the specific
170 /// version of the key to remove.
171 fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()>;
172
173 /// Get all versions of a specific key (for internal cleanup operations).
174 ///
175 /// Unlike `get()` which does MVCC resolution, this returns ALL stored versions
176 /// of the key with their values. Used by the drop worker to discover which
177 /// versions exist before deciding which to clean up.
178 ///
179 /// Returns a vector of (version, value) pairs, sorted by version descending.
180 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>>;
181}
182
183/// Marker trait for storage tiers that support the tier storage interface.
184pub trait TierBackend: TierStorage {}