tinybase/
table.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::ops::Deref;
4use std::sync::mpsc::{self, Sender};
5use std::sync::{Arc, RwLock};
6
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use sled::{Db, Tree};
10
11use crate::constraint::{Constraint, ConstraintInner};
12use crate::encoding::{decode, encode};
13use crate::index::{Index, IndexInner, IndexType};
14use crate::record::Record;
15use crate::result::DbResult;
16use crate::subscriber::{Event, Subscriber};
17
18pub(crate) type SenderMap<T> = Arc<RwLock<HashMap<u64, Sender<T>>>>;
19
20pub trait TableType: Serialize + DeserializeOwned + Clone + Debug {}
21impl<T: Serialize + DeserializeOwned + Debug + Clone> TableType for T {}
22
23/// Provides methods for interacting with a typed table.
24pub struct Table<T: TableType + 'static>(pub(crate) Arc<TableInner<T>>);
25
26impl<T: TableType> Clone for Table<T> {
27    fn clone(&self) -> Self {
28        Self(self.0.clone())
29    }
30}
31
32impl<T: TableType> Deref for Table<T> {
33    type Target = Arc<TableInner<T>>;
34
35    fn deref(&self) -> &Self::Target {
36        &self.0
37    }
38}
39
40pub struct TableInner<T>
41where
42    T: TableType + 'static,
43{
44    pub(crate) engine: Db,
45    root: Tree,
46    name: String,
47    senders: SenderMap<Event<T>>,
48    constraints: RwLock<Vec<Constraint<T>>>,
49}
50
51impl<T> TableInner<T>
52where
53    T: TableType,
54{
55    /// Creates a new table with the given engine and name.
56    ///
57    /// This method is intended for internal use and should not be called directly. Instead, use the
58    /// [`crate::TinyBase`]'s `open_table()` method.
59    ///
60    /// # Arguments
61    ///
62    /// * `engine` - The database engine.
63    /// * `name` - The name of the table.
64    pub(crate) fn new(engine: &Db, name: &str) -> DbResult<Self> {
65        let root = engine.open_tree(name)?;
66
67        Ok(Self {
68            engine: engine.clone(),
69            root,
70            name: name.to_owned(),
71            senders: Arc::new(RwLock::new(HashMap::new())),
72            constraints: RwLock::new(Vec::new()),
73        })
74    }
75
76    /// Insert a new record into the table.
77    ///
78    /// # Arguments
79    ///
80    /// * `value` - The value to insert.
81    ///
82    /// # Returns
83    ///
84    /// The ID of the new record.
85    pub fn insert(&self, value: T) -> DbResult<u64> {
86        let record = Record {
87            id: self.engine.generate_id()?,
88            data: value.clone(),
89        };
90
91        self.check_constraint(&record, &vec![])?;
92
93        self.root.insert(encode(&record.id)?, encode(&value)?)?;
94        self.dispatch_event(Event::Insert(record.clone()));
95
96        Ok(record.id)
97    }
98
99    /// Check if constraint is met.
100    /// Additional items can be specified if there are some items that aren't inserted yet.
101    fn check_constraint(&self, record: &Record<T>, additional_items: &Vec<T>) -> DbResult<()> {
102        for constraint in self.constraints.read().unwrap().iter() {
103            match &constraint.0 {
104                ConstraintInner::Unique(index) => {
105                    let matches = index.record_exists(record)?;
106                    // Check if record being changed is the same record that has the index error.
107                    if matches.len() > 1 || matches.len() == 1 && matches[0] != record.id {
108                        return Err(crate::result::TinyBaseError::Exists {
109                            constraint: index.idx_name(),
110                            id: record.id,
111                        });
112                    }
113
114                    let mut matches = vec![];
115                    for additional in additional_items {
116                        let key = index.gen_key(&additional)?;
117                        if matches.contains(&key) {
118                            return Err(crate::result::TinyBaseError::BatchOperationConstraints);
119                        }
120
121                        matches.push(key);
122                    }
123                }
124                ConstraintInner::Check(condition) => {
125                    if !condition(&record.data) {
126                        return Err(crate::result::TinyBaseError::Condition);
127                    }
128                }
129            };
130        }
131
132        Ok(())
133    }
134
135    /// Select a record by its ID.
136    ///
137    /// # Arguments
138    ///
139    /// * `id` - The ID of the record to select.
140    ///
141    /// # Returns
142    ///
143    /// An [`Option`] containing the selected record if it exists, or [`None`] otherwise.
144    pub fn select(&self, id: u64) -> DbResult<Option<Record<T>>> {
145        if let Some(serialized) = self.root.get(encode(&id)?)? {
146            Ok(Some(Record {
147                id,
148                data: decode(&serialized)?,
149            }))
150        } else {
151            Ok(None)
152        }
153    }
154
155    /// Delete a record by its ID.
156    ///
157    /// # Arguments
158    ///
159    /// * `id` - The ID of the record to delete.
160    ///
161    /// # Returns
162    ///
163    /// An [`Option`] containing the deleted record if it exists, or [`None`] otherwise.
164    pub fn delete(&self, id: u64) -> DbResult<Option<Record<T>>> {
165        let serialized_id = encode(&id)?;
166        if let Some(serialized) = self.root.remove(serialized_id)? {
167            let record = Record {
168                id,
169                data: decode(&serialized)?,
170            };
171
172            self.dispatch_event(Event::Remove(record.clone()));
173
174            Ok(Some(record))
175        } else {
176            Ok(None)
177        }
178    }
179
180    /// Update one or more records by their IDs.
181    ///
182    /// # Arguments
183    ///
184    /// * `ids` - The IDs of the records to update.
185    /// * `updater` - Closure to generate the new data based on the old data.
186    ///
187    /// # Returns
188    ///
189    /// All updated records.
190    pub fn update(&self, ids: &[u64], updater: fn(T) -> T) -> DbResult<Vec<Record<T>>> {
191        let mut records = vec![];
192        for id in ids {
193            if let Some(old) = self.select(*id)? {
194                records.push(Record {
195                    id: old.id,
196                    data: updater(old.data),
197                });
198            }
199        }
200
201        let additional: Vec<T> = records.iter().map(|r| r.data.clone()).collect();
202        for record in &records {
203            self.check_constraint(record, &additional)?;
204        }
205
206        let mut updated = vec![];
207        for record in records {
208            self.root
209                .update_and_fetch(encode(&record.id)?, |old_value| {
210                    if let Some(old_value) = old_value {
211                        updated.push(record.clone());
212
213                        self.dispatch_event(Event::Update {
214                            id: record.id.clone(),
215                            old_data: decode(old_value).unwrap(),
216                            new_data: record.data.clone(),
217                        });
218
219                        Some(encode(&record.data).unwrap())
220                    } else {
221                        None
222                    }
223                })?;
224        }
225
226        Ok(updated)
227    }
228
229    /// Create an index on the table.
230    ///
231    /// # Arguments
232    ///
233    /// * `name` - The name of the index.
234    /// * `key_func` - A function which computes the index key for each record.
235    ///
236    /// # Returns
237    ///
238    /// An [`Index`] instance for the created index.
239    pub fn create_index<I: IndexType>(
240        &self,
241        name: &str,
242        key_func: impl Fn(&T) -> I + Send + Sync + 'static,
243    ) -> DbResult<Index<T, I>> {
244        let sender_id = self.engine.generate_id()?;
245        let (tx, rx) = mpsc::channel();
246
247        let subscriber = Subscriber::new(sender_id, rx, self.senders.clone());
248        self.senders.write().unwrap().insert(sender_id, tx);
249
250        Ok(Index(Arc::new(IndexInner::new(
251            &format!("{}_idx_{}", self.name, name),
252            &self.engine,
253            &self.root,
254            key_func,
255            subscriber,
256        )?)))
257    }
258
259    /// Add a constraint to the table.
260    ///
261    /// # Arguments
262    ///
263    /// * `constraint` - The constraint to add.
264    pub fn constraint(&self, constraint: Constraint<T>) -> DbResult<()> {
265        let mut constraint_map = self.constraints.write().unwrap();
266
267        match &constraint.0 {
268            // Check if index has already been added if constraint is unique.
269            ConstraintInner::Unique(index) => {
270                let index_name = index.idx_name();
271
272                if constraint_map
273                    .iter()
274                    .find(|idx| {
275                        if let ConstraintInner::Unique(unique) = &idx.0 {
276                            unique.idx_name() == index_name
277                        } else {
278                            false
279                        }
280                    })
281                    .is_none()
282                {
283                    constraint_map.push(constraint);
284                }
285            }
286            ConstraintInner::Check(_) => constraint_map.push(constraint),
287        };
288
289        Ok(())
290    }
291
292    /// Dispatch event to all receivers.
293    fn dispatch_event(&self, event: Event<T>) {
294        for sender in self.senders.read().unwrap().values() {
295            sender.send(event.clone()).unwrap();
296        }
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::TinyBase;
304
305    #[test]
306    fn table_insert_and_select() {
307        let db = TinyBase::new(None, true);
308        let table: Table<String> = db.open_table("test_table").unwrap();
309
310        // Insert a string value into the table
311        let id = table.insert("test_value".to_string()).unwrap();
312        let record = table.select(id).unwrap().expect("Record not found");
313
314        assert_eq!(record.id, id);
315        assert_eq!(record.data, "test_value");
316    }
317
318    #[test]
319    fn table_delete() {
320        let db = TinyBase::new(None, true);
321        let table: Table<String> = db.open_table("test_table").unwrap();
322
323        // Insert a string value into the table
324        let id = table.insert("test_value".to_string()).unwrap();
325
326        // Delete the record with the given ID
327        let deleted_record = table.delete(id).unwrap().expect("Record not found");
328
329        assert_eq!(deleted_record.id, id);
330        assert_eq!(deleted_record.data, "test_value");
331
332        // Check if the record is really deleted
333        assert!(table.select(id).unwrap().is_none());
334    }
335
336    #[test]
337    fn table_update() {
338        let db = TinyBase::new(None, true);
339        let table: Table<String> = db.open_table("test_table").unwrap();
340
341        // Insert a string value into the table
342        let id1 = table.insert("value1".to_string()).unwrap();
343        let id2 = table.insert("value2".to_string()).unwrap();
344
345        // Update the records with new values
346        let updated_records = table
347            .update(&[id1, id2], |_| "updated_value".to_string())
348            .expect("Update failed");
349
350        assert_eq!(updated_records.len(), 2);
351        assert_eq!(updated_records[0].id, id1);
352        assert_eq!(updated_records[0].data, "updated_value");
353
354        assert_eq!(updated_records[1].id, id2);
355        assert_eq!(updated_records[1].data, "updated_value");
356    }
357}