Skip to main content

reifydb_store_single/
store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::BTreeMap,
6	ops::{Bound, Deref},
7	sync::Arc,
8};
9
10use reifydb_core::{
11	delta::Delta,
12	encoded::{
13		encoded::EncodedValues,
14		key::{EncodedKey, EncodedKeyRange},
15	},
16	event::EventBus,
17	interface::store::SingleVersionValues,
18};
19use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
20use reifydb_type::util::{cowvec::CowVec, hex};
21use tracing::instrument;
22
23use crate::{
24	HotConfig, Result, SingleVersionBatch, SingleVersionCommit, SingleVersionContains, SingleVersionGet,
25	SingleVersionRange, SingleVersionRangeRev, SingleVersionRemove, SingleVersionSet, SingleVersionStore,
26	config::SingleStoreConfig,
27	hot::tier::HotTier,
28	tier::{RangeCursor, TierStorage},
29};
30
31#[derive(Clone)]
32pub struct StandardSingleStore(Arc<StandardSingleStoreInner>);
33
34pub struct StandardSingleStoreInner {
35	pub(crate) hot: Option<HotTier>,
36}
37
38impl StandardSingleStore {
39	#[instrument(name = "store::single::new", level = "debug", skip(config), fields(
40		has_hot = config.hot.is_some(),
41	))]
42	pub fn new(config: SingleStoreConfig) -> Result<Self> {
43		let hot = config.hot.map(|c| c.storage);
44
45		Ok(Self(Arc::new(StandardSingleStoreInner {
46			hot,
47		})))
48	}
49
50	/// Get access to the hot storage tier.
51	///
52	/// Returns `None` if the hot tier is not configured.
53	pub fn hot(&self) -> Option<&HotTier> {
54		self.hot.as_ref()
55	}
56}
57
58impl Deref for StandardSingleStore {
59	type Target = StandardSingleStoreInner;
60
61	fn deref(&self) -> &Self::Target {
62		&self.0
63	}
64}
65
66impl StandardSingleStore {
67	pub fn testing_memory() -> Self {
68		let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
69		Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
70	}
71
72	pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
73		Self::new(SingleStoreConfig {
74			hot: Some(HotConfig {
75				storage: HotTier::memory(),
76			}),
77			event_bus,
78		})
79		.unwrap()
80	}
81}
82
83impl SingleVersionGet for StandardSingleStore {
84	#[instrument(name = "store::single::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())))]
85	fn get(&self, key: &EncodedKey) -> Result<Option<SingleVersionValues>> {
86		if let Some(hot) = &self.hot {
87			if let Some(value) = hot.get(key.as_ref())? {
88				return Ok(Some(SingleVersionValues {
89					key: key.clone(),
90					values: EncodedValues(value),
91				}));
92			}
93		}
94
95		Ok(None)
96	}
97}
98
99impl SingleVersionContains for StandardSingleStore {
100	#[instrument(name = "store::single::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())), ret)]
101	fn contains(&self, key: &EncodedKey) -> Result<bool> {
102		if let Some(hot) = &self.hot {
103			if hot.contains(key.as_ref())? {
104				return Ok(true);
105			}
106		}
107
108		Ok(false)
109	}
110}
111
112impl SingleVersionCommit for StandardSingleStore {
113	#[instrument(name = "store::single::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len()))]
114	fn commit(&mut self, deltas: CowVec<Delta>) -> Result<()> {
115		// Get the hot storage tier (warm and cold are placeholders for now)
116		let Some(storage) = &self.hot else {
117			return Ok(());
118		};
119
120		// Process deltas as a batch
121		let entries: Vec<_> = deltas
122			.iter()
123			.map(|delta| match delta {
124				Delta::Set {
125					key,
126					values,
127				} => (CowVec::new(key.as_ref().to_vec()), Some(CowVec::new(values.as_ref().to_vec()))),
128				Delta::Unset {
129					key,
130					..
131				}
132				| Delta::Remove {
133					key,
134				}
135				| Delta::Drop {
136					key,
137					..
138				} => (CowVec::new(key.as_ref().to_vec()), None),
139			})
140			.collect();
141
142		storage.set(entries)?;
143
144		Ok(())
145	}
146}
147
148impl SingleVersionSet for StandardSingleStore {}
149impl SingleVersionRemove for StandardSingleStore {}
150
151impl SingleVersionRange for StandardSingleStore {
152	#[instrument(name = "store::single::range_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
153	fn range_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
154		let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
155
156		let (start, end) = make_range_bounds(&range);
157
158		// Process hot tier
159		if let Some(hot) = &self.hot {
160			let mut cursor = RangeCursor::new();
161
162			loop {
163				let batch =
164					hot.range_next(&mut cursor, bound_as_ref(&start), bound_as_ref(&end), 4096)?;
165
166				for entry in batch.entries {
167					all_entries.entry(entry.key).or_insert(entry.value);
168				}
169
170				if cursor.exhausted {
171					break;
172				}
173			}
174		}
175
176		// Convert to SingleVersionValues, filtering out tombstones
177		let items: Vec<SingleVersionValues> = all_entries
178			.into_iter()
179			.filter_map(|(key_bytes, value)| {
180				value.map(|val| SingleVersionValues {
181					key: EncodedKey(key_bytes),
182					values: EncodedValues(val),
183				})
184			})
185			.take(batch_size as usize)
186			.collect();
187
188		let has_more = items.len() >= batch_size as usize;
189
190		Ok(SingleVersionBatch {
191			items,
192			has_more,
193		})
194	}
195}
196
197impl SingleVersionRangeRev for StandardSingleStore {
198	#[instrument(name = "store::single::range_rev_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
199	fn range_rev_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
200		let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
201
202		let (start, end) = make_range_bounds(&range);
203
204		// Process hot tier
205		if let Some(hot) = &self.hot {
206			let mut cursor = RangeCursor::new();
207
208			loop {
209				let batch = hot.range_rev_next(
210					&mut cursor,
211					bound_as_ref(&start),
212					bound_as_ref(&end),
213					4096,
214				)?;
215
216				for entry in batch.entries {
217					all_entries.entry(entry.key).or_insert(entry.value);
218				}
219
220				if cursor.exhausted {
221					break;
222				}
223			}
224		}
225
226		// Convert to SingleVersionValues in reverse order, filtering out tombstones
227		let items: Vec<SingleVersionValues> = all_entries
228			.into_iter()
229			.rev() // Reverse for descending order
230			.filter_map(|(key_bytes, value)| {
231				value.map(|val| SingleVersionValues {
232					key: EncodedKey(key_bytes),
233					values: EncodedValues(val),
234				})
235			})
236			.take(batch_size as usize)
237			.collect();
238
239		let has_more = items.len() >= batch_size as usize;
240
241		Ok(SingleVersionBatch {
242			items,
243			has_more,
244		})
245	}
246}
247
248impl SingleVersionStore for StandardSingleStore {}
249
250/// Helper to convert owned Bound to ref
251fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
252	match bound {
253		Bound::Included(v) => Bound::Included(v.as_slice()),
254		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
255		Bound::Unbounded => Bound::Unbounded,
256	}
257}
258
259/// Convert EncodedKeyRange to primitive storage bounds (owned for )
260fn make_range_bounds(range: &EncodedKeyRange) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
261	let start = match &range.start {
262		Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
263		Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
264		Bound::Unbounded => Bound::Unbounded,
265	};
266
267	let end = match &range.end {
268		Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
269		Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
270		Bound::Unbounded => Bound::Unbounded,
271	};
272
273	(start, end)
274}