1use std::{
2 any::{Any, TypeId},
3 borrow::Borrow,
4 cell::RefCell,
5 hash::Hash,
6 marker::PhantomData,
7 sync::Arc,
8};
9
10use fxhash::FxHashSet;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14 batch::{BatchReference, Operation, VerticalBatch},
15 db::TableId,
16 inner::AtomoInner,
17 keys::VerticalKeys,
18 serder::SerdeBackend,
19 snapshot::Snapshot,
20 KeyIterator,
21};
22
23pub struct TableMeta {
24 pub _name: String,
25 pub k_id: TypeId,
26 pub v_id: TypeId,
27}
28
29#[derive(Clone, Copy)]
38pub struct ResolvedTableReference<K, V> {
39 atomo_id: usize,
41 index: TableId,
43 kv: PhantomData<(K, V)>,
44}
45
46pub struct TableSelector<S: SerdeBackend> {
57 atomo: Arc<AtomoInner<S>>,
59 snapshot: Snapshot<VerticalBatch, VerticalKeys>,
61 selected: RefCell<FxHashSet<TableId>>,
64 batch: VerticalBatch,
66 keys: RefCell<VerticalKeys>,
68}
69
70pub struct TableRef<
73 'selector,
74 K: Hash + Eq + Serialize + DeserializeOwned + Any,
75 V: Serialize + DeserializeOwned + Any,
76 S: SerdeBackend,
77> {
78 tid: TableId,
79 batch: BatchReference,
80 selector: &'selector TableSelector<S>,
81 kv: PhantomData<(K, V)>,
82}
83
84impl TableMeta {
85 #[inline(always)]
86 pub fn new<K: Any, V: Any>(_name: String) -> Self {
87 let k_id = TypeId::of::<K>();
88 let v_id = TypeId::of::<V>();
89 Self { _name, k_id, v_id }
90 }
91}
92
93impl<'selector, K, V, S: SerdeBackend> Drop for TableRef<'selector, K, V, S>
95where
96 K: Hash + Eq + Serialize + DeserializeOwned + Any,
97 V: Serialize + DeserializeOwned + Any,
98{
99 fn drop(&mut self) {
100 self.selector.selected.borrow_mut().remove(&self.tid);
101 }
102}
103
104impl<S: SerdeBackend> TableSelector<S> {
105 #[inline]
107 pub fn new(atomo: Arc<AtomoInner<S>>) -> Self {
108 let num_tables = atomo.tables.len();
109 let batch = VerticalBatch::new(num_tables);
110 let snapshot = atomo.snapshot_list.current();
111 let keys = snapshot.get_metadata().clone();
112
113 Self {
114 atomo,
115 snapshot,
116 selected: RefCell::new(FxHashSet::default()),
117 batch,
118 keys: RefCell::new(keys),
119 }
120 }
121
122 #[inline]
123 pub(crate) fn into_raw(self) -> (VerticalBatch, VerticalKeys) {
124 (self.batch, self.keys.into_inner())
125 }
126
127 pub fn get_table<K, V>(&self, name: impl AsRef<str>) -> TableRef<K, V, S>
133 where
134 K: Hash + Eq + Serialize + DeserializeOwned + Any,
135 V: Serialize + DeserializeOwned + Any,
136 {
137 self.atomo.resolve::<K, V>(name).get(self)
138 }
139}
140
141impl<K, V> ResolvedTableReference<K, V> {
142 pub(crate) fn new(atomo_id: usize, index: TableId) -> Self {
143 ResolvedTableReference {
144 atomo_id,
145 index,
146 kv: PhantomData,
147 }
148 }
149
150 pub fn get<'selector, S: SerdeBackend>(
156 &self,
157 selector: &'selector TableSelector<S>,
158 ) -> TableRef<'selector, K, V, S>
159 where
160 K: Hash + Eq + Serialize + DeserializeOwned + Any,
161 V: Serialize + DeserializeOwned + Any,
162 {
163 assert_eq!(
164 self.atomo_id, selector.atomo.id,
165 "Table reference of another Atomo instance was used."
166 );
167
168 if !selector.selected.borrow_mut().insert(self.index) {
169 panic!("Table reference is already claimed.");
170 }
171
172 let batch = unsafe { selector.batch.claim(self.index as usize) };
182
183 TableRef {
184 tid: self.index,
185 batch,
186 selector,
187 kv: PhantomData,
188 }
189 }
190}
191
192impl<'selector, K, V, S: SerdeBackend> TableRef<'selector, K, V, S>
193where
194 K: Hash + Eq + Serialize + DeserializeOwned + Any,
195 V: Serialize + DeserializeOwned + Any,
196{
197 pub fn insert(&mut self, key: impl Borrow<K>, value: impl Borrow<V>) {
199 let k = S::serialize(key.borrow()).into_boxed_slice();
200 let v = S::serialize(value.borrow()).into_boxed_slice();
201 self.selector
202 .keys
203 .borrow_mut()
204 .update(self.tid, |collection| {
205 collection.insert(k.clone());
206 });
207 self.batch.as_mut().insert(k, Operation::Insert(v));
208 }
209
210 pub fn remove(&mut self, key: impl Borrow<K>) {
212 let k = S::serialize(key.borrow()).into_boxed_slice();
213 self.selector
214 .keys
215 .borrow_mut()
216 .update(self.tid, |collection| {
217 collection.remove(&k);
218 });
219 self.batch.as_mut().insert(k, Operation::Remove);
220 }
221
222 pub fn get(&self, key: impl Borrow<K>) -> Option<V> {
225 let k = S::serialize(key.borrow()).into_boxed_slice();
226
227 match self.batch.get(&k) {
228 Some(Operation::Insert(value)) => return Some(S::deserialize(value)),
229 Some(Operation::Remove) => return None,
230 _ => {},
231 }
232
233 let index = self.tid as usize;
234 if let Some(operation) = self
235 .selector
236 .snapshot
237 .find(|batch| batch.get(index).get(&k))
238 {
239 return match operation {
240 Operation::Remove => None,
241 Operation::Insert(value) => Some(S::deserialize(value)),
242 };
243 }
244
245 self.selector.atomo.get::<V>(self.tid, &k)
246 }
247
248 pub fn contains_key(&self, key: impl Borrow<K>) -> bool {
250 let k = S::serialize(key.borrow()).into_boxed_slice();
251
252 {
253 let keys_ref = self.selector.keys.borrow();
254 if let Some(im) = keys_ref.get(self.tid) {
255 return im.contains(&k);
256 }
257 }
258
259 match self.batch.get(&k) {
260 Some(Operation::Insert(_)) => return true,
261 Some(Operation::Remove) => return false,
262 _ => {},
263 }
264
265 let index = self.tid as usize;
266 if let Some(op) = self
267 .selector
268 .snapshot
269 .find(|batch| batch.get(index).get(&k))
270 {
271 return match op {
272 Operation::Insert(_) => true,
273 Operation::Remove => false,
274 };
275 }
276
277 self.selector.atomo.contains_key(self.tid, &k)
278 }
279
280 pub fn keys(&self) -> KeyIterator<K> {
288 let keys = self
289 .selector
290 .keys
291 .borrow()
292 .get(self.tid)
293 .clone()
294 .expect("Iterator functionality is not enabled for the table.");
295
296 KeyIterator::new(keys)
297 }
298}