1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Deref},
7 sync::Arc,
8};
9
10use reifydb_core::{
11 delta::Delta,
12 encoded::{
13 key::{EncodedKey, EncodedKeyRange},
14 row::EncodedRow,
15 },
16 event::EventBus,
17 interface::store::SingleVersionRow,
18};
19use reifydb_runtime::{
20 actor::system::ActorSystem,
21 context::clock::Clock,
22 pool::{PoolConfig, Pools},
23};
24use reifydb_type::util::{cowvec::CowVec, hex};
25use tracing::instrument;
26
27use crate::{
28 HotConfig, Result, SingleVersionBatch, SingleVersionCommit, SingleVersionContains, SingleVersionGet,
29 SingleVersionRange, SingleVersionRangeRev, SingleVersionRemove, SingleVersionSet, SingleVersionStore,
30 config::SingleStoreConfig,
31 hot::tier::HotTier,
32 tier::{RangeCursor, TierStorage},
33};
34
35#[derive(Clone)]
36pub struct StandardSingleStore(Arc<StandardSingleStoreInner>);
37
38pub struct StandardSingleStoreInner {
39 pub(crate) hot: Option<HotTier>,
40}
41
42impl StandardSingleStore {
43 #[instrument(name = "store::single::new", level = "debug", skip(config), fields(
44 has_hot = config.hot.is_some(),
45 ))]
46 pub fn new(config: SingleStoreConfig) -> Result<Self> {
47 let hot = config.hot.map(|c| c.storage);
48
49 Ok(Self(Arc::new(StandardSingleStoreInner {
50 hot,
51 })))
52 }
53
54 pub fn hot(&self) -> Option<&HotTier> {
58 self.hot.as_ref()
59 }
60}
61
62impl Deref for StandardSingleStore {
63 type Target = StandardSingleStoreInner;
64
65 fn deref(&self) -> &Self::Target {
66 &self.0
67 }
68}
69
70impl StandardSingleStore {
71 pub fn testing_memory() -> Self {
72 let pools = Pools::new(PoolConfig::default());
73 let actor_system = ActorSystem::new(pools, Clock::Real);
74 Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
75 }
76
77 pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
78 Self::new(SingleStoreConfig {
79 hot: Some(HotConfig {
80 storage: HotTier::memory(),
81 }),
82 event_bus,
83 })
84 .unwrap()
85 }
86}
87
88impl SingleVersionGet for StandardSingleStore {
89 #[instrument(name = "store::single::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())))]
90 fn get(&self, key: &EncodedKey) -> Result<Option<SingleVersionRow>> {
91 if let Some(hot) = &self.hot
92 && let Some(value) = hot.get(key.as_ref())?
93 {
94 return Ok(Some(SingleVersionRow {
95 key: key.clone(),
96 row: EncodedRow(value),
97 }));
98 }
99
100 Ok(None)
101 }
102}
103
104impl SingleVersionContains for StandardSingleStore {
105 #[instrument(name = "store::single::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())), ret)]
106 fn contains(&self, key: &EncodedKey) -> Result<bool> {
107 if let Some(hot) = &self.hot
108 && hot.contains(key.as_ref())?
109 {
110 return Ok(true);
111 }
112
113 Ok(false)
114 }
115}
116
117impl SingleVersionCommit for StandardSingleStore {
118 #[instrument(name = "store::single::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len()))]
119 fn commit(&mut self, deltas: CowVec<Delta>) -> Result<()> {
120 let Some(storage) = &self.hot else {
122 return Ok(());
123 };
124
125 let entries: Vec<_> = deltas
127 .iter()
128 .map(|delta| match delta {
129 Delta::Set {
130 key,
131 row,
132 } => (CowVec::new(key.as_ref().to_vec()), Some(CowVec::new(row.as_ref().to_vec()))),
133 Delta::Unset {
134 key,
135 ..
136 }
137 | Delta::Remove {
138 key,
139 }
140 | Delta::Drop {
141 key,
142 } => (CowVec::new(key.as_ref().to_vec()), None),
143 })
144 .collect();
145
146 storage.set(entries)?;
147
148 Ok(())
149 }
150}
151
152impl SingleVersionSet for StandardSingleStore {}
153impl SingleVersionRemove for StandardSingleStore {}
154
155impl SingleVersionRange for StandardSingleStore {
156 #[instrument(name = "store::single::range_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
157 fn range_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
158 let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
159
160 let (start, end) = make_range_bounds(&range);
161
162 if let Some(hot) = &self.hot {
164 let mut cursor = RangeCursor::new();
165
166 loop {
167 let batch =
168 hot.range_next(&mut cursor, bound_as_ref(&start), bound_as_ref(&end), 4096)?;
169
170 for entry in batch.entries {
171 all_entries.entry(entry.key).or_insert(entry.value);
172 }
173
174 if cursor.exhausted {
175 break;
176 }
177 }
178 }
179
180 let items: Vec<SingleVersionRow> = all_entries
182 .into_iter()
183 .filter_map(|(key_bytes, value)| {
184 value.map(|val| SingleVersionRow {
185 key: EncodedKey(key_bytes),
186 row: EncodedRow(val),
187 })
188 })
189 .take(batch_size as usize)
190 .collect();
191
192 let has_more = items.len() >= batch_size as usize;
193
194 Ok(SingleVersionBatch {
195 items,
196 has_more,
197 })
198 }
199}
200
201impl SingleVersionRangeRev for StandardSingleStore {
202 #[instrument(name = "store::single::range_rev_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
203 fn range_rev_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
204 let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
205
206 let (start, end) = make_range_bounds(&range);
207
208 if let Some(hot) = &self.hot {
210 let mut cursor = RangeCursor::new();
211
212 loop {
213 let batch = hot.range_rev_next(
214 &mut cursor,
215 bound_as_ref(&start),
216 bound_as_ref(&end),
217 4096,
218 )?;
219
220 for entry in batch.entries {
221 all_entries.entry(entry.key).or_insert(entry.value);
222 }
223
224 if cursor.exhausted {
225 break;
226 }
227 }
228 }
229
230 let items: Vec<SingleVersionRow> = all_entries
232 .into_iter()
233 .rev() .filter_map(|(key_bytes, value)| {
235 value.map(|val| SingleVersionRow {
236 key: EncodedKey(key_bytes),
237 row: EncodedRow(val),
238 })
239 })
240 .take(batch_size as usize)
241 .collect();
242
243 let has_more = items.len() >= batch_size as usize;
244
245 Ok(SingleVersionBatch {
246 items,
247 has_more,
248 })
249 }
250}
251
252impl SingleVersionStore for StandardSingleStore {}
253
254fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
256 match bound {
257 Bound::Included(v) => Bound::Included(v.as_slice()),
258 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
259 Bound::Unbounded => Bound::Unbounded,
260 }
261}
262
263fn make_range_bounds(range: &EncodedKeyRange) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
265 let start = match &range.start {
266 Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
267 Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
268 Bound::Unbounded => Bound::Unbounded,
269 };
270
271 let end = match &range.end {
272 Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
273 Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
274 Bound::Unbounded => Bound::Unbounded,
275 };
276
277 (start, end)
278}