tinybase/
index.rs

1use std::any::Any;
2use std::ops::Deref;
3use std::sync::Arc;
4use std::vec;
5
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use sled::{Db, IVec, Tree};
9
10use crate::encoding::{decode, encode};
11use crate::record::Record;
12use crate::result::DbResult;
13use crate::subscriber::{self, Subscriber};
14use crate::table::TableType;
15
16pub trait IndexType: Serialize + DeserializeOwned {}
17impl<T: Serialize + DeserializeOwned> IndexType for T {}
18
19/// Provides methods for interacting with an index on a typed table.
20pub struct Index<T: TableType, I: IndexType>(pub(crate) Arc<IndexInner<T, I>>);
21
22impl<T: TableType, I: IndexType> Clone for Index<T, I> {
23    fn clone(&self) -> Self {
24        Self(self.0.clone())
25    }
26}
27
28impl<T: TableType, I: IndexType> Deref for Index<T, I> {
29    type Target = Arc<IndexInner<T, I>>;
30
31    fn deref(&self) -> &Self::Target {
32        &self.0
33    }
34}
35
36/// Inner state of an index on a typed table.
37pub struct IndexInner<T: TableType, I: IndexType> {
38    table_data: Tree,
39    /// Function which will be used to compute the key per insert.
40    key_func: Box<dyn Fn(&T) -> I + Send + Sync>,
41    /// Built index, each key can have multiple matching records.
42    indexed_data: Tree,
43    /// Reference to uncommitted operation log.
44    subscriber: Subscriber<T>,
45}
46
47impl<T: TableType, I: IndexType> IndexInner<T, I> {
48    /// Creates a new index with the given name, engine, table data, key function, and subscriber.
49    ///
50    /// This method is intended for internal use and should not be called directly. Instead, use the
51    /// [`crate::Table`]'s `create_index()` method.
52    ///
53    /// # Arguments
54    ///
55    /// * `idx_name` - The name of the index.
56    /// * `engine` - The database engine.
57    /// * `table_data` - The data of the table to be indexed.
58    /// * `key_func` - A function which computes the index key for each record.
59    /// * `subscriber` - A subscriber to uncommitted operation log.
60    ///
61    /// # Returns
62    ///
63    /// The new [`IndexInner`] instance.
64    pub(crate) fn new(
65        idx_name: &str,
66        engine: &Db,
67        table_data: &Tree,
68        key_func: impl Fn(&T) -> I + Send + Sync + 'static,
69        subscriber: Subscriber<T>,
70    ) -> DbResult<Self> {
71        let need_sync = !engine.tree_names().contains(&IVec::from(idx_name));
72
73        let new_index = Self {
74            table_data: table_data.clone(),
75            key_func: Box::new(key_func),
76            indexed_data: engine.open_tree(idx_name)?,
77            subscriber,
78        };
79
80        // Index is new, sync data
81        if need_sync {
82            new_index.sync()?;
83        }
84
85        Ok(new_index)
86    }
87
88    /// Resync index to be up to date with table.
89    pub fn sync(&self) -> DbResult<()> {
90        self.indexed_data.clear()?;
91        for key in self.table_data.iter().keys() {
92            // This should always succeed
93            if let Some(data) = self.table_data.get(&key.clone()?)? {
94                self.insert(&Record {
95                    id: decode(&key?)?,
96                    data: decode(&data)?,
97                })?;
98            }
99        }
100
101        Ok(())
102    }
103
104    /// Commits the received events from the main table to the index.
105    fn commit_log(&self) -> DbResult<()> {
106        // Commit log of events on the main table.
107        while let Ok(event) = self.subscriber.rx.try_recv() {
108            match event {
109                subscriber::Event::Remove(record) => self.remove(&record)?,
110                subscriber::Event::Insert(record) => self.insert(&record)?,
111                subscriber::Event::Update {
112                    id,
113                    old_data,
114                    new_data,
115                } => {
116                    self.remove(&Record { id, data: old_data })?;
117                    self.insert(&Record { id, data: new_data })?;
118                }
119            }
120        }
121
122        Ok(())
123    }
124
125    /// Insert a record into the index. The index key will be computed.
126    ///
127    /// # Arguments
128    ///
129    /// * `record` - The record to insert.
130    fn insert(&self, record: &Record<T>) -> DbResult<()> {
131        let key = encode(&(self.key_func)(&record.data))?;
132
133        if let Some(data) = self.indexed_data.get(&key)? {
134            let mut vec: Vec<u64> = decode(&data)?;
135            vec.push(record.id);
136            self.indexed_data.insert(key, encode(&vec)?)?;
137        } else {
138            self.indexed_data.insert(key, encode(&vec![record.id])?)?;
139        }
140
141        Ok(())
142    }
143
144    /// Delete a record from the index.
145    /// The record will compute an index key to delete by.
146    ///
147    /// # Arguments
148    ///
149    /// * `record` - The record to delete.
150    fn remove(&self, record: &Record<T>) -> DbResult<()> {
151        let key = encode(&(self.key_func)(&record.data))?;
152
153        if let Some(data) = self.indexed_data.get(&key)? {
154            let mut index_values: Vec<u64> = decode(&data)?;
155
156            // We can remove the entire node here since its one element.
157            if index_values.len() < 2 {
158                self.indexed_data.remove(&key)?;
159            } else {
160                // Remove the single ID from here.
161                if let Some(pos) = index_values.iter().position(|id| *id == record.id) {
162                    index_values.remove(pos);
163                    // Replace the row with one that doesn't have the element.
164                    self.indexed_data.insert(&key, encode(&index_values)?)?;
165                }
166            }
167        }
168
169        Ok(())
170    }
171
172    /// Delete records from the table and the index based on the given query.
173    ///
174    /// # Arguments
175    ///
176    /// * `query` - A reference to the query key.
177    ///
178    /// # Returns
179    ///
180    /// All the deleted [`Record`] instances.
181    pub fn delete(&self, query: &I) -> DbResult<Vec<Record<T>>> {
182        let records = self.select(query)?;
183
184        for record in &records {
185            self.table_data.remove(&encode(&record.id)?)?;
186            self.remove(&record)?;
187        }
188
189        Ok(records)
190    }
191
192    /// Select records from the table based on the given query.
193    ///
194    /// # Arguments
195    ///
196    /// * `query` - A reference to the query key.
197    ///
198    /// # Returns
199    ///
200    /// All selected [`Record`] instances.
201    pub fn select(&self, query: &I) -> DbResult<Vec<Record<T>>> {
202        self.commit_log()?;
203
204        Ok(
205            if let Ok(Some(bytes)) = self.indexed_data.get(encode(&query)?) {
206                let ids: Vec<u64> = decode(&bytes)?;
207
208                let mut results = vec![];
209                for id in ids {
210                    let encoded_data = self.table_data.get(encode(&id)?)?;
211                    if let Some(encoded_data) = encoded_data {
212                        results.push(Record {
213                            id,
214                            data: decode::<T>(&encoded_data)?,
215                        })
216                    }
217                }
218
219                results
220            } else {
221                Vec::new()
222            },
223        )
224    }
225
226    /// Check if a record matches the built index key.
227    pub fn exists_record(&self, record: &Record<T>) -> DbResult<Vec<u64>> {
228        self.exists((self.key_func)(&record.data))
229    }
230
231    /// Check if a record exists by the index key.
232    ///
233    /// # Arguments
234    ///
235    /// * `key` - The index key to check for existence.
236    pub fn exists(&self, key: I) -> DbResult<Vec<u64>> {
237        Ok(self.select(&key)?.iter().map(|record| record.id).collect())
238    }
239
240    pub fn index_name(&self) -> String {
241        std::str::from_utf8(&self.indexed_data.name())
242            .unwrap()
243            .to_string()
244    }
245
246    pub fn generate_key(&self, data: &T) -> DbResult<Vec<u8>> {
247        encode(&(self.key_func)(&data))
248    }
249}
250
251/// Type which [`Index`] can be casted to which doesn't require the `I` type parameter.
252pub trait AnyIndex<T: TableType> {
253    /// Alias for `exists_record`.
254    fn record_exists(&self, record: &Record<T>) -> DbResult<Vec<u64>>;
255    /// Select which allows any type.
256    fn search(&self, value: Box<dyn Any>) -> DbResult<Vec<Record<T>>>;
257    /// Alias for `index_name`.
258    fn idx_name(&self) -> String;
259    /// Generate a key and return encoded value.
260    fn gen_key(&self, data: &T) -> DbResult<Vec<u8>>;
261}
262
263impl<T, I> AnyIndex<T> for Index<T, I>
264where
265    T: TableType,
266    I: IndexType + 'static,
267{
268    fn search(&self, value: Box<dyn Any>) -> DbResult<Vec<Record<T>>> {
269        let i = *value.downcast::<I>().unwrap();
270        self.select(&i)
271    }
272
273    fn idx_name(&self) -> String {
274        self.index_name()
275    }
276
277    fn record_exists(&self, record: &Record<T>) -> DbResult<Vec<u64>> {
278        self.exists_record(record)
279    }
280
281    fn gen_key(&self, data: &T) -> DbResult<Vec<u8>> {
282        self.generate_key(data)
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::{Table, TinyBase};
290
291    #[test]
292    fn index_sync() {
293        let db = TinyBase::new(None, true);
294        let table: Table<String> = db.open_table("test_table").unwrap();
295
296        // Insert a string value into the table
297        let id = table.insert("value1".to_string()).unwrap();
298        let id2 = table.insert("value2".to_string()).unwrap();
299
300        // Create an index for the table
301        let index = table.create_index("length", |value| value.len()).unwrap();
302
303        assert!(index.sync().is_ok());
304
305        let results = index.select(&6).unwrap();
306
307        assert_eq!(results.len(), 2);
308        assert_eq!(results[0].id, id);
309        assert_eq!(results[1].id, id2);
310    }
311
312    #[test]
313    fn index_select() {
314        let db = TinyBase::new(None, true);
315        let table: Table<String> = db.open_table("test_table").unwrap();
316
317        // Insert a string value into the table
318        table.insert("value1".to_string()).unwrap();
319        table.insert("value2".to_string()).unwrap();
320
321        // Create an index for the table
322        let index = table
323            .create_index("name", |value| value.to_owned())
324            .unwrap();
325
326        let record: Vec<Record<String>> =
327            index.select(&"value1".to_string()).expect("Select failed");
328
329        assert_eq!(record.len(), 1);
330        assert_eq!(record[0].data, "value1");
331
332        let record_2 = index
333            .select(&"non_existent_value".to_string())
334            .expect("Select failed");
335
336        assert_eq!(record_2.len(), 0);
337    }
338
339    #[test]
340    fn index_exists() {
341        let db = TinyBase::new(None, true);
342        let table: Table<String> = db.open_table("test_table").unwrap();
343
344        // Create an index for the table
345        let index = table
346            .create_index("index_name", |value| value.to_owned())
347            .unwrap();
348
349        // Insert a string value into the table
350        let id = table.insert("value1".to_string()).unwrap();
351
352        let record = Record {
353            id,
354            data: "value1".to_string(),
355        };
356
357        assert!(!index
358            .exists_record(&record)
359            .expect("Exists check failed")
360            .is_empty());
361
362        let record_not_exist = Record {
363            id: 999,
364            data: "non_existent_value".to_string(),
365        };
366
367        assert!(index
368            .exists_record(&record_not_exist)
369            .expect("Exists check failed")
370            .is_empty());
371    }
372}