Skip to main content

reifydb_store_multi/tier/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Common storage tier traits and types.
5//!
6//! This module defines the minimal interface that all storage tiers (hot, warm, cold)
7//! must implement. All MVCC, CDC, and routing logic belongs in the store layer above.
8
9use std::{collections::HashMap, ops::Bound};
10
11use reifydb_core::{
12	common::CommitVersion,
13	interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14};
15use reifydb_type::{Result, util::cowvec::CowVec};
16
17/// A batch of key-value entries grouped by entry kind, used for atomic multi-table writes.
18pub type TierBatch = HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>;
19
20/// Identifies a logical table/namespace in storage.
21///
22/// The store layer routes keys to the appropriate storage based on key type.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum EntryKind {
25	/// Multi-version storage for general data
26	Multi,
27	/// Per-source table for row data
28	Source(ShapeId),
29	/// Per-operator table for flow node state
30	Operator(FlowNodeId),
31}
32
33/// A raw storage entry with version.
34///
35/// Value is None for tombstones (deletions).
36#[derive(Debug, Clone)]
37pub struct RawEntry {
38	pub key: CowVec<u8>,
39	pub version: CommitVersion,
40	pub value: Option<CowVec<u8>>,
41}
42
43/// A batch of range results with continuation info for pagination.
44#[derive(Debug, Clone)]
45pub struct RangeBatch {
46	/// The entries in this batch.
47	pub entries: Vec<RawEntry>,
48	/// Whether there are more entries after this batch.
49	pub has_more: bool,
50}
51
52impl RangeBatch {
53	/// Creates an empty batch with no more results.
54	pub fn empty() -> Self {
55		Self {
56			entries: Vec::new(),
57			has_more: false,
58		}
59	}
60
61	/// Returns true if this batch contains no entries.
62	pub fn is_empty(&self) -> bool {
63		self.entries.is_empty()
64	}
65}
66
67/// Cursor state for streaming range queries.
68///
69/// Tracks position within a range scan, enabling efficient continuation
70/// across multiple batches without re-scanning from the beginning.
71#[derive(Debug, Clone)]
72pub struct RangeCursor {
73	/// Last key seen in the previous batch (for Bound::Excluded continuation)
74	pub last_key: Option<CowVec<u8>>,
75	/// Whether this stream is exhausted
76	pub exhausted: bool,
77}
78
79impl RangeCursor {
80	/// Create a new cursor at the start of a range.
81	pub fn new() -> Self {
82		Self {
83			last_key: None,
84			exhausted: false,
85		}
86	}
87
88	/// Check if the stream is exhausted.
89	pub fn is_exhausted(&self) -> bool {
90		self.exhausted
91	}
92}
93
94impl Default for RangeCursor {
95	fn default() -> Self {
96		Self::new()
97	}
98}
99
100/// The tier storage trait.
101///
102/// This is intentionally minimal - just raw bytes in/out.
103/// Version is a first-class parameter for all operations.
104/// All MVCC, CDC, and routing logic belongs in the store layer above.
105///
106/// Implementations must be thread-safe and cloneable.
107pub trait TierStorage: Send + Sync + Clone + 'static {
108	/// Get the value for a key at or before the given version.
109	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>>;
110
111	/// Check if a key exists at or before the given version.
112	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
113		Ok(self.get(table, key, version)?.is_some())
114	}
115
116	/// Write entries to multiple tables atomically at the given version.
117	///
118	/// All entries across all tables are written in a single transaction.
119	/// This ensures durability and atomicity for multi-table commits.
120	fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()>;
121
122	/// Fetch the next batch of entries in key order at or before version.
123	///
124	/// Uses the cursor to track position. On first call, cursor should be new.
125	/// On subsequent calls, pass the same cursor to continue from where left off.
126	/// Returns up to `batch_size` entries. The cursor is updated with the last
127	/// key seen, and `exhausted` is set to true when no more entries remain.
128	fn range_next(
129		&self,
130		table: EntryKind,
131		cursor: &mut RangeCursor,
132		start: Bound<&[u8]>,
133		end: Bound<&[u8]>,
134		version: CommitVersion,
135		batch_size: usize,
136	) -> Result<RangeBatch>;
137
138	/// Fetch the next batch of entries in reverse key order at or before version.
139	///
140	/// Uses the cursor to track position. On first call, cursor should be new.
141	/// On subsequent calls, pass the same cursor to continue from where left off.
142	/// Returns up to `batch_size` entries. The cursor is updated with the last
143	/// key seen, and `exhausted` is set to true when no more entries remain.
144	fn range_rev_next(
145		&self,
146		table: EntryKind,
147		cursor: &mut RangeCursor,
148		start: Bound<&[u8]>,
149		end: Bound<&[u8]>,
150		version: CommitVersion,
151		batch_size: usize,
152	) -> Result<RangeBatch>;
153
154	/// Ensure a table exists (creates if needed).
155	///
156	/// For memory backends this is typically a no-op.
157	/// For SQL backends this may create tables.
158	fn ensure_table(&self, table: EntryKind) -> Result<()>;
159
160	/// Delete all entries in a table.
161	fn clear_table(&self, table: EntryKind) -> Result<()>;
162
163	/// Physically drop specific versions of entries from storage.
164	///
165	/// Unlike `set()` with None values which inserts tombstones for MVCC,
166	/// this method actually removes entries from storage to reclaim memory.
167	/// Used by the drop worker to erase old versions after they're no longer needed.
168	///
169	/// Each entry in the batch is a (key, version) pair identifying the specific
170	/// version of the key to remove.
171	fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()>;
172
173	/// Get all versions of a specific key (for internal cleanup operations).
174	///
175	/// Unlike `get()` which does MVCC resolution, this returns ALL stored versions
176	/// of the key with their values. Used by the drop worker to discover which
177	/// versions exist before deciding which to clean up.
178	///
179	/// Returns a vector of (version, value) pairs, sorted by version descending.
180	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>>;
181}
182
183/// Marker trait for storage tiers that support the tier storage interface.
184pub trait TierBackend: TierStorage {}