tinybase/
table.rs

1use alloc::borrow::ToOwned;
2use hashbrown::HashMap;
3use alloc::fmt::Debug;
4use core::ops::Deref;
5use thingbuf::mpsc::{self, Sender};
6use alloc::sync::Arc;
7use spin::RwLock;
8
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use sled::{Db, Tree};
12
13use crate::constraint::{Constraint, ConstraintInner};
14use crate::encoding::{decode, encode};
15use crate::index::{Index, IndexInner, IndexType};
16use crate::record::Record;
17use crate::result::DbResult;
18use crate::subscriber::{Event, Subscriber};
19
20pub(crate) type SenderMap<T> = Arc<RwLock<HashMap<u64, Sender<T>>>>;
21
22#[allow(unused_imports)]
23use alloc::{boxed::Box, format, vec, vec::Vec, string::{String, ToString}};
24
25pub trait TableType: Serialize + DeserializeOwned + Clone + Debug {}
26impl<T: Serialize + DeserializeOwned + Debug + Clone> TableType for T {}
27
28/// Provides methods for interacting with a typed table.
29pub struct Table<T: TableType + 'static>(pub(crate) Arc<TableInner<T>>);
30
31impl<T: TableType + 'static> Table<T> {
32    /// Create an index on the table.
33    ///
34    /// # Arguments
35    ///
36    /// * `name` - The name of the index.
37    /// * `key_func` - A function which computes the index key for each record.
38    ///
39    /// # Returns
40    ///
41    /// An [`Index`] instance for the created index.
42    pub fn create_index<I: IndexType>(
43        &self,
44        name: &str,
45        key_func: impl Fn(&T) -> I + Send + Sync + 'static,
46    ) -> DbResult<Index<T, I>> {
47        let sender_id = self.engine.generate_id()?;
48        let (tx, rx) = mpsc::channel(usize::MAX);
49
50        let subscriber = Subscriber::new(sender_id, rx, self.senders.clone());
51        self.senders.write().insert(sender_id, tx);
52
53        let weak_self = Arc::downgrade(&self.0);
54
55        Ok(Index(Arc::new(IndexInner::new(
56            &format!("{}_idx_{}", self.name, name),
57            &self.engine,
58            weak_self,
59            key_func,
60            subscriber,
61        )?)))
62    }
63}
64
65impl<T: TableType> Clone for Table<T> {
66    fn clone(&self) -> Self {
67        Self(self.0.clone())
68    }
69}
70
71impl<T: TableType> Deref for Table<T> {
72    type Target = Arc<TableInner<T>>;
73
74    fn deref(&self) -> &Self::Target {
75        &self.0
76    }
77}
78
79pub struct TableInner<T>
80where
81    T: TableType + 'static,
82{
83    pub(crate) engine: Db,
84    /// This has a global lock to make sure that constraints are honored during inserts.
85    pub(crate) root: RwLock<Tree>,
86    name: String,
87    senders: SenderMap<Event<T>>,
88    constraints: RwLock<Vec<Constraint<T>>>,
89}
90
91impl<T> TableInner<T>
92where
93    T: TableType,
94{
95    /// Creates a new table with the given engine and name.
96    ///
97    /// This method is intended for internal use and should not be called directly. Instead, use the
98    /// [`crate::TinyBase`]'s `open_table()` method.
99    ///
100    /// # Arguments
101    ///
102    /// * `engine` - The database engine.
103    /// * `name` - The name of the table.
104    pub(crate) fn new(engine: &Db, name: &str) -> DbResult<Self> {
105        let root = RwLock::new(engine.open_tree(name)?);
106
107        Ok(Self {
108            engine: engine.clone(),
109            root,
110            name: name.to_owned(),
111            senders: Arc::new(RwLock::new(HashMap::new())),
112            constraints: RwLock::new(Vec::new()),
113        })
114    }
115
116    /// Insert a new record into the table.
117    ///
118    /// # Arguments
119    ///
120    /// * `value` - The value to insert.
121    ///
122    /// # Returns
123    ///
124    /// The ID of the new record.
125    pub fn insert(&self, value: T) -> DbResult<u64> {
126        let root = self.root.write();
127
128        let record = Record {
129            id: self.engine.generate_id()?,
130            data: value.clone(),
131        };
132
133        self.check_constraint(&root, &record, &vec![])?;
134        root.insert(encode(&record.id)?, encode(&value)?)?;
135
136        self.dispatch_event(Event::Insert(record.clone()));
137
138        Ok(record.id)
139    }
140
141    /// Check if constraint is met.
142    /// Additional items can be specified if there are some items that aren't inserted yet.
143    /// Any time you pass the tree it should probably be obtained via a write lock.
144    fn check_constraint(
145        &self,
146        tree: &Tree,
147        record: &Record<T>,
148        additional_items: &Vec<T>,
149    ) -> DbResult<()> {
150        for constraint in self.constraints.read().iter() {
151            match &constraint.0 {
152                ConstraintInner::Unique(index) => {
153                    let matches = index.tree_exists(tree, record)?;
154                    // Check if record being changed is the same record that has the index error.
155                    if matches.len() > 1 || matches.len() == 1 && matches[0] != record.id {
156                        return Err(crate::result::TinyBaseError::Exists {
157                            constraint: index.idx_name(),
158                            id: record.id,
159                        });
160                    }
161
162                    let mut matches = vec![];
163                    for additional in additional_items {
164                        let key = index.gen_key(&additional)?;
165                        if matches.contains(&key) {
166                            return Err(crate::result::TinyBaseError::BatchOperationConstraints);
167                        }
168
169                        matches.push(key);
170                    }
171                }
172                ConstraintInner::Check(condition) => {
173                    if !condition(&record.data) {
174                        return Err(crate::result::TinyBaseError::Condition);
175                    }
176                }
177            };
178        }
179
180        Ok(())
181    }
182
183    /// Select a record by its ID.
184    ///
185    /// # Arguments
186    ///
187    /// * `id` - The ID of the record to select.
188    ///
189    /// # Returns
190    ///
191    /// An [`Option`] containing the selected record if it exists, or [`None`] otherwise.
192    pub fn select(&self, id: u64) -> DbResult<Option<Record<T>>> {
193        self.tree_select(&self.root.read(), id)
194    }
195
196    /// Select that doesn't obtain a read lock.
197    pub(crate) fn tree_select(&self, tree: &Tree, id: u64) -> DbResult<Option<Record<T>>> {
198        match tree.get(encode(&id)?)? { Some(serialized) => {
199            Ok(Some(Record {
200                id,
201                data: decode(&serialized)?,
202            }))
203        } _ => {
204            Ok(None)
205        }}
206    }
207
208    /// Delete a record by its ID.
209    ///
210    /// # Arguments
211    ///
212    /// * `id` - The ID of the record to delete.
213    ///
214    /// # Returns
215    ///
216    /// An [`Option`] containing the deleted record if it exists, or [`None`] otherwise.
217    pub fn delete(&self, id: u64) -> DbResult<Option<Record<T>>> {
218        let serialized_id = encode(&id)?;
219
220        // We don't need to lock table even though we write because deleting will never invalidate unique constraint.
221        match self.root.read().remove(serialized_id)? { Some(serialized) => {
222            let record = Record {
223                id,
224                data: decode(&serialized)?,
225            };
226
227            self.dispatch_event(Event::Remove(record.clone()));
228
229            Ok(Some(record))
230        } _ => {
231            Ok(None)
232        }}
233    }
234
235    /// Update one or more records by their IDs.
236    ///
237    /// # Arguments
238    ///
239    /// * `ids` - The IDs of the records to update.
240    /// * `updater` - Closure to generate the new data based on the old data.
241    ///
242    /// # Returns
243    ///
244    /// All updated records.
245    pub fn update(&self, ids: &[u64], updater: fn(T) -> T) -> DbResult<Vec<Record<T>>> {
246        let root = self.root.write();
247
248        let mut records = vec![];
249        for id in ids {
250            if let Some(old) = self.tree_select(&root, *id)? {
251                records.push(Record {
252                    id: old.id,
253                    data: updater(old.data),
254                });
255            }
256        }
257
258        let additional: Vec<T> = records.iter().map(|r| r.data.clone()).collect();
259        for record in &records {
260            self.check_constraint(&root, record, &additional)?;
261        }
262
263        let mut updated = vec![];
264        for record in records {
265            root.update_and_fetch(encode(&record.id)?, |old_value| {
266                if let Some(old_value) = old_value {
267                    updated.push(record.clone());
268
269                    self.dispatch_event(Event::Update {
270                        id: record.id.clone(),
271                        old_data: decode(old_value).unwrap(),
272                        new_data: record.data.clone(),
273                    });
274
275                    Some(encode(&record.data).unwrap())
276                } else {
277                    None
278                }
279            })?;
280        }
281
282        Ok(updated)
283    }
284
285    /// Add a constraint to the table.
286    ///
287    /// # Arguments
288    ///
289    /// * `constraint` - The constraint to add.
290    pub fn constraint(&self, constraint: Constraint<T>) -> DbResult<()> {
291        let mut constraint_map = self.constraints.write();
292
293        match &constraint.0 {
294            // Check if index has already been added if constraint is unique.
295            ConstraintInner::Unique(index) => {
296                let index_name = index.idx_name();
297
298                if constraint_map
299                    .iter()
300                    .find(|idx| {
301                        if let ConstraintInner::Unique(unique) = &idx.0 {
302                            unique.idx_name() == index_name
303                        } else {
304                            false
305                        }
306                    })
307                    .is_none()
308                {
309                    constraint_map.push(constraint);
310                }
311            }
312            ConstraintInner::Check(_) => constraint_map.push(constraint),
313        };
314
315        Ok(())
316    }
317
318    /// Dispatch event to all receivers.
319    fn dispatch_event(&self, event: Event<T>) {
320        for sender in self.senders.read().values() {
321            #[allow(unused_must_use)]
322            sender.send(event.clone());
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::TinyBase;
331
332    #[test]
333    fn table_insert_and_select() {
334        let db = TinyBase::new(None, true);
335        let table: Table<String> = db.open_table("test_table").unwrap();
336
337        // Insert a string value into the table
338        let id = table.insert("test_value".to_string()).unwrap();
339        let record = table.select(id).unwrap().expect("Record not found");
340
341        assert_eq!(record.id, id);
342        assert_eq!(record.data, "test_value");
343    }
344
345    #[test]
346    fn table_delete() {
347        let db = TinyBase::new(None, true);
348        let table: Table<String> = db.open_table("test_table").unwrap();
349
350        // Insert a string value into the table
351        let id = table.insert("test_value".to_string()).unwrap();
352
353        // Delete the record with the given ID
354        let deleted_record = table.delete(id).unwrap().expect("Record not found");
355
356        assert_eq!(deleted_record.id, id);
357        assert_eq!(deleted_record.data, "test_value");
358
359        // Check if the record is really deleted
360        assert!(table.select(id).unwrap().is_none());
361    }
362
363    #[test]
364    fn table_update() {
365        let db = TinyBase::new(None, true);
366        let table: Table<String> = db.open_table("test_table").unwrap();
367
368        // Insert a string value into the table
369        let id1 = table.insert("value1".to_string()).unwrap();
370        let id2 = table.insert("value2".to_string()).unwrap();
371
372        // Update the records with new values
373        let updated_records = table
374            .update(&[id1, id2], |_| "updated_value".to_string())
375            .expect("Update failed");
376
377        assert_eq!(updated_records.len(), 2);
378        assert_eq!(updated_records[0].id, id1);
379        assert_eq!(updated_records[0].data, "updated_value");
380
381        assert_eq!(updated_records[1].id, id2);
382        assert_eq!(updated_records[1].data, "updated_value");
383    }
384}