atomo/
table.rs

1use std::{
2    any::{Any, TypeId},
3    borrow::Borrow,
4    cell::RefCell,
5    hash::Hash,
6    marker::PhantomData,
7    sync::Arc,
8};
9
10use fxhash::FxHashSet;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14    batch::{BatchReference, Operation, VerticalBatch},
15    db::TableId,
16    inner::AtomoInner,
17    keys::VerticalKeys,
18    serder::SerdeBackend,
19    snapshot::Snapshot,
20    KeyIterator,
21};
22
23pub struct TableMeta {
24    pub _name: String,
25    pub k_id: TypeId,
26    pub v_id: TypeId,
27}
28
29/// A resolved table reference can be used to cache the lookup of a table by its string name
30/// and the type validations and can be used to speed up the [`TableSelector::get_table`] function.
31///
32/// This can be achieved by pre-computing a bunch of [`ResolvedTableReference`]s before invoking
33/// the `run` function and use the resolved references to get a [`TableRef`].
34///
35/// [`ResolvedTableReference`]s of one `Atomo` instance can not be used for another instance, and
36/// that will cause a panic.
37#[derive(Clone, Copy)]
38pub struct ResolvedTableReference<K, V> {
39    /// The ID for the `Atomo` instance.
40    atomo_id: usize,
41    /// The index of the table.
42    index: TableId,
43    kv: PhantomData<(K, V)>,
44}
45
46/// The table selector contain multiple tables and is provided to the user at the beginning of a
47/// query or an update. You can think about this as the execution context.
48///
49/// This is given as a parameter to the callback which you pass to the [`crate::Atomo::run`]
50/// function.
51///
52/// Once you have an instance of a table selector, you can get a reference to a specific table.
53///
54/// And at that point you can use that table reference instance to operate on a table or retrieve
55/// values from it.
56pub struct TableSelector<S: SerdeBackend> {
57    /// The [`Atomo`] instance.
58    atomo: Arc<AtomoInner<S>>,
59    /// The current version of the data.
60    snapshot: Snapshot<VerticalBatch, VerticalKeys>,
61    /// A set of already claimed tables.
62    // TODO(qti3e): Replace this with a UnsafeCell or a `SingleThreadedBoolVec`.
63    selected: RefCell<FxHashSet<TableId>>,
64    /// The *hot* changes happening here in this run.
65    batch: VerticalBatch,
66    /// The new version of the keys.
67    keys: RefCell<VerticalKeys>,
68}
69
70/// A reference to a table inside an execution context (i.e [`TableSelector`]). A table reference
71/// can be used as a reference to a table to operate on it.
72pub struct TableRef<
73    'selector,
74    K: Hash + Eq + Serialize + DeserializeOwned + Any,
75    V: Serialize + DeserializeOwned + Any,
76    S: SerdeBackend,
77> {
78    tid: TableId,
79    batch: BatchReference,
80    selector: &'selector TableSelector<S>,
81    kv: PhantomData<(K, V)>,
82}
83
84impl TableMeta {
85    #[inline(always)]
86    pub fn new<K: Any, V: Any>(_name: String) -> Self {
87        let k_id = TypeId::of::<K>();
88        let v_id = TypeId::of::<V>();
89        Self { _name, k_id, v_id }
90    }
91}
92
93// When a table ref is dropped make it available to be claimed again.
94impl<'selector, K, V, S: SerdeBackend> Drop for TableRef<'selector, K, V, S>
95where
96    K: Hash + Eq + Serialize + DeserializeOwned + Any,
97    V: Serialize + DeserializeOwned + Any,
98{
99    fn drop(&mut self) {
100        self.selector.selected.borrow_mut().remove(&self.tid);
101    }
102}
103
104impl<S: SerdeBackend> TableSelector<S> {
105    /// Create a new table selector for the head of an Atomo instance.
106    #[inline]
107    pub fn new(atomo: Arc<AtomoInner<S>>) -> Self {
108        let num_tables = atomo.tables.len();
109        let batch = VerticalBatch::new(num_tables);
110        let snapshot = atomo.snapshot_list.current();
111        let keys = snapshot.get_metadata().clone();
112
113        Self {
114            atomo,
115            snapshot,
116            selected: RefCell::new(FxHashSet::default()),
117            batch,
118            keys: RefCell::new(keys),
119        }
120    }
121
122    #[inline]
123    pub(crate) fn into_raw(self) -> (VerticalBatch, VerticalKeys) {
124        (self.batch, self.keys.into_inner())
125    }
126
127    /// Return the table reference for the table with the provided name and K, V type.
128    ///
129    /// # Panics
130    ///
131    /// If the information provided are not correct and such a table does not exists.
132    pub fn get_table<K, V>(&self, name: impl AsRef<str>) -> TableRef<K, V, S>
133    where
134        K: Hash + Eq + Serialize + DeserializeOwned + Any,
135        V: Serialize + DeserializeOwned + Any,
136    {
137        self.atomo.resolve::<K, V>(name).get(self)
138    }
139}
140
141impl<K, V> ResolvedTableReference<K, V> {
142    pub(crate) fn new(atomo_id: usize, index: TableId) -> Self {
143        ResolvedTableReference {
144            atomo_id,
145            index,
146            kv: PhantomData,
147        }
148    }
149
150    /// Returns the table reference for this table.
151    ///
152    /// # Panics
153    ///
154    /// If the table is already claimed.
155    pub fn get<'selector, S: SerdeBackend>(
156        &self,
157        selector: &'selector TableSelector<S>,
158    ) -> TableRef<'selector, K, V, S>
159    where
160        K: Hash + Eq + Serialize + DeserializeOwned + Any,
161        V: Serialize + DeserializeOwned + Any,
162    {
163        assert_eq!(
164            self.atomo_id, selector.atomo.id,
165            "Table reference of another Atomo instance was used."
166        );
167
168        if !selector.selected.borrow_mut().insert(self.index) {
169            panic!("Table reference is already claimed.");
170        }
171
172        // Safety:
173        //
174        // 1. Using the previous assertion, we ensure that the table is not yet
175        // claimed, and hence only one mutable reference to a table exists.
176        //
177        // 2. The returned reference has lifetime `'selector` and batch is owned by
178        // the selector, and since a selector never replaces its batch, we ensure that
179        //      1. The returned reference never outlives the 'batch'.
180        //      2. The batch never go out of scope before returned reference is dropped.
181        let batch = unsafe { selector.batch.claim(self.index as usize) };
182
183        TableRef {
184            tid: self.index,
185            batch,
186            selector,
187            kv: PhantomData,
188        }
189    }
190}
191
192impl<'selector, K, V, S: SerdeBackend> TableRef<'selector, K, V, S>
193where
194    K: Hash + Eq + Serialize + DeserializeOwned + Any,
195    V: Serialize + DeserializeOwned + Any,
196{
197    /// Insert a new `key` and `value` pair into the table.
198    pub fn insert(&mut self, key: impl Borrow<K>, value: impl Borrow<V>) {
199        let k = S::serialize(key.borrow()).into_boxed_slice();
200        let v = S::serialize(value.borrow()).into_boxed_slice();
201        self.selector
202            .keys
203            .borrow_mut()
204            .update(self.tid, |collection| {
205                collection.insert(k.clone());
206            });
207        self.batch.as_mut().insert(k, Operation::Insert(v));
208    }
209
210    /// Remove the given key from the table.
211    pub fn remove(&mut self, key: impl Borrow<K>) {
212        let k = S::serialize(key.borrow()).into_boxed_slice();
213        self.selector
214            .keys
215            .borrow_mut()
216            .update(self.tid, |collection| {
217                collection.remove(&k);
218            });
219        self.batch.as_mut().insert(k, Operation::Remove);
220    }
221
222    /// Returns the value associated with the provided key. If the key doesn't exits in the table
223    /// [`None`] is returned.
224    pub fn get(&self, key: impl Borrow<K>) -> Option<V> {
225        let k = S::serialize(key.borrow()).into_boxed_slice();
226
227        match self.batch.get(&k) {
228            Some(Operation::Insert(value)) => return Some(S::deserialize(value)),
229            Some(Operation::Remove) => return None,
230            _ => {},
231        }
232
233        let index = self.tid as usize;
234        if let Some(operation) = self
235            .selector
236            .snapshot
237            .find(|batch| batch.get(index).get(&k))
238        {
239            return match operation {
240                Operation::Remove => None,
241                Operation::Insert(value) => Some(S::deserialize(value)),
242            };
243        }
244
245        self.selector.atomo.get::<V>(self.tid, &k)
246    }
247
248    /// Returns `true` if the key exists in the table.
249    pub fn contains_key(&self, key: impl Borrow<K>) -> bool {
250        let k = S::serialize(key.borrow()).into_boxed_slice();
251
252        {
253            let keys_ref = self.selector.keys.borrow();
254            if let Some(im) = keys_ref.get(self.tid) {
255                return im.contains(&k);
256            }
257        }
258
259        match self.batch.get(&k) {
260            Some(Operation::Insert(_)) => return true,
261            Some(Operation::Remove) => return false,
262            _ => {},
263        }
264
265        let index = self.tid as usize;
266        if let Some(op) = self
267            .selector
268            .snapshot
269            .find(|batch| batch.get(index).get(&k))
270        {
271            return match op {
272                Operation::Insert(_) => true,
273                Operation::Remove => false,
274            };
275        }
276
277        self.selector.atomo.contains_key(self.tid, &k)
278    }
279
280    /// Returns an iterator of the keys in this table.
281    ///
282    /// # Panics
283    ///
284    /// If the current table is not opened with iterator support when opening the
285    /// Atomo instance. See the documentation for [`crate::AtomoBuilder::enable_iter`]
286    /// for more information.
287    pub fn keys(&self) -> KeyIterator<K> {
288        let keys = self
289            .selector
290            .keys
291            .borrow()
292            .get(self.tid)
293            .clone()
294            .expect("Iterator functionality is not enabled for the table.");
295
296        KeyIterator::new(keys)
297    }
298}