Skip to main content

reifydb_store_single/
store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::BTreeMap,
6	ops::{Bound, Deref},
7	sync::Arc,
8};
9
10use reifydb_core::{
11	delta::Delta,
12	encoded::{
13		key::{EncodedKey, EncodedKeyRange},
14		row::EncodedRow,
15	},
16	event::EventBus,
17	interface::store::SingleVersionRow,
18};
19use reifydb_runtime::{
20	actor::system::ActorSystem,
21	context::clock::Clock,
22	pool::{PoolConfig, Pools},
23};
24use reifydb_type::util::{cowvec::CowVec, hex};
25use tracing::instrument;
26
27use crate::{
28	HotConfig, Result, SingleVersionBatch, SingleVersionCommit, SingleVersionContains, SingleVersionGet,
29	SingleVersionRange, SingleVersionRangeRev, SingleVersionRemove, SingleVersionSet, SingleVersionStore,
30	config::SingleStoreConfig,
31	hot::tier::HotTier,
32	tier::{RangeCursor, TierStorage},
33};
34
35#[derive(Clone)]
36pub struct StandardSingleStore(Arc<StandardSingleStoreInner>);
37
38pub struct StandardSingleStoreInner {
39	pub(crate) hot: Option<HotTier>,
40}
41
42impl StandardSingleStore {
43	#[instrument(name = "store::single::new", level = "debug", skip(config), fields(
44		has_hot = config.hot.is_some(),
45	))]
46	pub fn new(config: SingleStoreConfig) -> Result<Self> {
47		let hot = config.hot.map(|c| c.storage);
48
49		Ok(Self(Arc::new(StandardSingleStoreInner {
50			hot,
51		})))
52	}
53
54	/// Get access to the hot storage tier.
55	///
56	/// Returns `None` if the hot tier is not configured.
57	pub fn hot(&self) -> Option<&HotTier> {
58		self.hot.as_ref()
59	}
60}
61
62impl Deref for StandardSingleStore {
63	type Target = StandardSingleStoreInner;
64
65	fn deref(&self) -> &Self::Target {
66		&self.0
67	}
68}
69
70impl StandardSingleStore {
71	pub fn testing_memory() -> Self {
72		let pools = Pools::new(PoolConfig::default());
73		let actor_system = ActorSystem::new(pools, Clock::Real);
74		Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
75	}
76
77	pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
78		Self::new(SingleStoreConfig {
79			hot: Some(HotConfig {
80				storage: HotTier::memory(),
81			}),
82			event_bus,
83		})
84		.unwrap()
85	}
86}
87
88impl SingleVersionGet for StandardSingleStore {
89	#[instrument(name = "store::single::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())))]
90	fn get(&self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
91		if let Some(hot) = &self.hot
92			&& let Some(value) = hot.get(key.as_ref())?
93		{
94			return Ok(Some(SingleVersionRow {
95				key: key.clone(),
96				row: EncodedRow(value),
97			}));
98		}
99
100		Ok(None)
101	}
102}
103
104impl SingleVersionContains for StandardSingleStore {
105	#[instrument(name = "store::single::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())), ret)]
106	fn contains(&self, key: &EncodedKey) -> Result<bool> {
107		if let Some(hot) = &self.hot
108			&& hot.contains(key.as_ref())?
109		{
110			return Ok(true);
111		}
112
113		Ok(false)
114	}
115}
116
117impl SingleVersionCommit for StandardSingleStore {
118	#[instrument(name = "store::single::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len()))]
119	fn commit(&mut self, deltas: CowVec<Delta>) -> Result<()> {
120		// Get the hot storage tier (warm and cold are placeholders for now)
121		let Some(storage) = &self.hot else {
122			return Ok(());
123		};
124
125		// Process deltas as a batch
126		let entries: Vec<_> = deltas
127			.iter()
128			.map(|delta| match delta {
129				Delta::Set {
130					key,
131					row,
132				} => (CowVec::new(key.as_ref().to_vec()), Some(CowVec::new(row.as_ref().to_vec()))),
133				Delta::Unset {
134					key,
135					..
136				}
137				| Delta::Remove {
138					key,
139				}
140				| Delta::Drop {
141					key,
142				} => (CowVec::new(key.as_ref().to_vec()), None),
143			})
144			.collect();
145
146		storage.set(entries)?;
147
148		Ok(())
149	}
150}
151
152impl SingleVersionSet for StandardSingleStore {}
153impl SingleVersionRemove for StandardSingleStore {}
154
155impl SingleVersionRange for StandardSingleStore {
156	#[instrument(name = "store::single::range_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
157	fn range_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
158		let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
159
160		let (start, end) = make_range_bounds(&range);
161
162		// Process hot tier
163		if let Some(hot) = &self.hot {
164			let mut cursor = RangeCursor::new();
165
166			loop {
167				let batch =
168					hot.range_next(&mut cursor, bound_as_ref(&start), bound_as_ref(&end), 4096)?;
169
170				for entry in batch.entries {
171					all_entries.entry(entry.key).or_insert(entry.value);
172				}
173
174				if cursor.exhausted {
175					break;
176				}
177			}
178		}
179
180		// Convert to SingleVersionRow, filtering out tombstones
181		let items: Vec<SingleVersionRow> = all_entries
182			.into_iter()
183			.filter_map(|(key_bytes, value)| {
184				value.map(|val| SingleVersionRow {
185					key: EncodedKey(key_bytes),
186					row: EncodedRow(val),
187				})
188			})
189			.take(batch_size as usize)
190			.collect();
191
192		let has_more = items.len() >= batch_size as usize;
193
194		Ok(SingleVersionBatch {
195			items,
196			has_more,
197		})
198	}
199}
200
201impl SingleVersionRangeRev for StandardSingleStore {
202	#[instrument(name = "store::single::range_rev_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
203	fn range_rev_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
204		let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
205
206		let (start, end) = make_range_bounds(&range);
207
208		// Process hot tier
209		if let Some(hot) = &self.hot {
210			let mut cursor = RangeCursor::new();
211
212			loop {
213				let batch = hot.range_rev_next(
214					&mut cursor,
215					bound_as_ref(&start),
216					bound_as_ref(&end),
217					4096,
218				)?;
219
220				for entry in batch.entries {
221					all_entries.entry(entry.key).or_insert(entry.value);
222				}
223
224				if cursor.exhausted {
225					break;
226				}
227			}
228		}
229
230		// Convert to SingleVersionRow in reverse order, filtering out tombstones
231		let items: Vec<SingleVersionRow> = all_entries
232			.into_iter()
233			.rev() // Reverse for descending order
234			.filter_map(|(key_bytes, value)| {
235				value.map(|val| SingleVersionRow {
236					key: EncodedKey(key_bytes),
237					row: EncodedRow(val),
238				})
239			})
240			.take(batch_size as usize)
241			.collect();
242
243		let has_more = items.len() >= batch_size as usize;
244
245		Ok(SingleVersionBatch {
246			items,
247			has_more,
248		})
249	}
250}
251
252impl SingleVersionStore for StandardSingleStore {}
253
254/// Helper to convert owned Bound to ref
255fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
256	match bound {
257		Bound::Included(v) => Bound::Included(v.as_slice()),
258		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
259		Bound::Unbounded => Bound::Unbounded,
260	}
261}
262
263/// Convert EncodedKeyRange to primitive storage bounds (owned for )
264fn make_range_bounds(range: &EncodedKeyRange) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
265	let start = match &range.start {
266		Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
267		Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
268		Bound::Unbounded => Bound::Unbounded,
269	};
270
271	let end = match &range.end {
272		Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
273		Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
274		Bound::Unbounded => Bound::Unbounded,
275	};
276
277	(start, end)
278}