1use std::{
5 collections::BTreeMap,
6 ops::{Bound, Deref},
7 sync::Arc,
8};
9
10use reifydb_core::{
11 delta::Delta,
12 encoded::{
13 encoded::EncodedValues,
14 key::{EncodedKey, EncodedKeyRange},
15 },
16 event::EventBus,
17 interface::store::SingleVersionValues,
18};
19use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
20use reifydb_type::util::{cowvec::CowVec, hex};
21use tracing::instrument;
22
23use crate::{
24 HotConfig, Result, SingleVersionBatch, SingleVersionCommit, SingleVersionContains, SingleVersionGet,
25 SingleVersionRange, SingleVersionRangeRev, SingleVersionRemove, SingleVersionSet, SingleVersionStore,
26 config::SingleStoreConfig,
27 hot::tier::HotTier,
28 tier::{RangeCursor, TierStorage},
29};
30
31#[derive(Clone)]
32pub struct StandardSingleStore(Arc<StandardSingleStoreInner>);
33
34pub struct StandardSingleStoreInner {
35 pub(crate) hot: Option<HotTier>,
36}
37
38impl StandardSingleStore {
39 #[instrument(name = "store::single::new", level = "debug", skip(config), fields(
40 has_hot = config.hot.is_some(),
41 ))]
42 pub fn new(config: SingleStoreConfig) -> Result<Self> {
43 let hot = config.hot.map(|c| c.storage);
44
45 Ok(Self(Arc::new(StandardSingleStoreInner {
46 hot,
47 })))
48 }
49
50 pub fn hot(&self) -> Option<&HotTier> {
54 self.hot.as_ref()
55 }
56}
57
58impl Deref for StandardSingleStore {
59 type Target = StandardSingleStoreInner;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66impl StandardSingleStore {
67 pub fn testing_memory() -> Self {
68 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
69 Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
70 }
71
72 pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
73 Self::new(SingleStoreConfig {
74 hot: Some(HotConfig {
75 storage: HotTier::memory(),
76 }),
77 event_bus,
78 })
79 .unwrap()
80 }
81}
82
83impl SingleVersionGet for StandardSingleStore {
84 #[instrument(name = "store::single::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())))]
85 fn get(&self, key: &EncodedKey) -> Result<Option<SingleVersionValues>> {
86 if let Some(hot) = &self.hot {
87 if let Some(value) = hot.get(key.as_ref())? {
88 return Ok(Some(SingleVersionValues {
89 key: key.clone(),
90 values: EncodedValues(value),
91 }));
92 }
93 }
94
95 Ok(None)
96 }
97}
98
99impl SingleVersionContains for StandardSingleStore {
100 #[instrument(name = "store::single::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref())), ret)]
101 fn contains(&self, key: &EncodedKey) -> Result<bool> {
102 if let Some(hot) = &self.hot {
103 if hot.contains(key.as_ref())? {
104 return Ok(true);
105 }
106 }
107
108 Ok(false)
109 }
110}
111
112impl SingleVersionCommit for StandardSingleStore {
113 #[instrument(name = "store::single::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len()))]
114 fn commit(&mut self, deltas: CowVec<Delta>) -> Result<()> {
115 let Some(storage) = &self.hot else {
117 return Ok(());
118 };
119
120 let entries: Vec<_> = deltas
122 .iter()
123 .map(|delta| match delta {
124 Delta::Set {
125 key,
126 values,
127 } => (CowVec::new(key.as_ref().to_vec()), Some(CowVec::new(values.as_ref().to_vec()))),
128 Delta::Unset {
129 key,
130 ..
131 }
132 | Delta::Remove {
133 key,
134 }
135 | Delta::Drop {
136 key,
137 ..
138 } => (CowVec::new(key.as_ref().to_vec()), None),
139 })
140 .collect();
141
142 storage.set(entries)?;
143
144 Ok(())
145 }
146}
147
148impl SingleVersionSet for StandardSingleStore {}
149impl SingleVersionRemove for StandardSingleStore {}
150
151impl SingleVersionRange for StandardSingleStore {
152 #[instrument(name = "store::single::range_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
153 fn range_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
154 let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
155
156 let (start, end) = make_range_bounds(&range);
157
158 if let Some(hot) = &self.hot {
160 let mut cursor = RangeCursor::new();
161
162 loop {
163 let batch =
164 hot.range_next(&mut cursor, bound_as_ref(&start), bound_as_ref(&end), 4096)?;
165
166 for entry in batch.entries {
167 all_entries.entry(entry.key).or_insert(entry.value);
168 }
169
170 if cursor.exhausted {
171 break;
172 }
173 }
174 }
175
176 let items: Vec<SingleVersionValues> = all_entries
178 .into_iter()
179 .filter_map(|(key_bytes, value)| {
180 value.map(|val| SingleVersionValues {
181 key: EncodedKey(key_bytes),
182 values: EncodedValues(val),
183 })
184 })
185 .take(batch_size as usize)
186 .collect();
187
188 let has_more = items.len() >= batch_size as usize;
189
190 Ok(SingleVersionBatch {
191 items,
192 has_more,
193 })
194 }
195}
196
197impl SingleVersionRangeRev for StandardSingleStore {
198 #[instrument(name = "store::single::range_rev_batch", level = "debug", skip(self), fields(batch_size = batch_size))]
199 fn range_rev_batch(&self, range: EncodedKeyRange, batch_size: u64) -> Result<SingleVersionBatch> {
200 let mut all_entries: BTreeMap<CowVec<u8>, Option<CowVec<u8>>> = BTreeMap::new();
201
202 let (start, end) = make_range_bounds(&range);
203
204 if let Some(hot) = &self.hot {
206 let mut cursor = RangeCursor::new();
207
208 loop {
209 let batch = hot.range_rev_next(
210 &mut cursor,
211 bound_as_ref(&start),
212 bound_as_ref(&end),
213 4096,
214 )?;
215
216 for entry in batch.entries {
217 all_entries.entry(entry.key).or_insert(entry.value);
218 }
219
220 if cursor.exhausted {
221 break;
222 }
223 }
224 }
225
226 let items: Vec<SingleVersionValues> = all_entries
228 .into_iter()
229 .rev() .filter_map(|(key_bytes, value)| {
231 value.map(|val| SingleVersionValues {
232 key: EncodedKey(key_bytes),
233 values: EncodedValues(val),
234 })
235 })
236 .take(batch_size as usize)
237 .collect();
238
239 let has_more = items.len() >= batch_size as usize;
240
241 Ok(SingleVersionBatch {
242 items,
243 has_more,
244 })
245 }
246}
247
248impl SingleVersionStore for StandardSingleStore {}
249
250fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
252 match bound {
253 Bound::Included(v) => Bound::Included(v.as_slice()),
254 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
255 Bound::Unbounded => Bound::Unbounded,
256 }
257}
258
259fn make_range_bounds(range: &EncodedKeyRange) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
261 let start = match &range.start {
262 Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
263 Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
264 Bound::Unbounded => Bound::Unbounded,
265 };
266
267 let end = match &range.end {
268 Bound::Included(key) => Bound::Included(key.as_ref().to_vec()),
269 Bound::Excluded(key) => Bound::Excluded(key.as_ref().to_vec()),
270 Bound::Unbounded => Bound::Unbounded,
271 };
272
273 (start, end)
274}