1use std::any::Any;
2use std::ops::Deref;
3use std::sync::Arc;
4use std::vec;
5
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use sled::{Db, IVec, Tree};
9
10use crate::encoding::{decode, encode};
11use crate::record::Record;
12use crate::result::DbResult;
13use crate::subscriber::{self, Subscriber};
14use crate::table::TableType;
15
16pub trait IndexType: Serialize + DeserializeOwned {}
17impl<T: Serialize + DeserializeOwned> IndexType for T {}
18
19pub struct Index<T: TableType, I: IndexType>(pub(crate) Arc<IndexInner<T, I>>);
21
22impl<T: TableType, I: IndexType> Clone for Index<T, I> {
23 fn clone(&self) -> Self {
24 Self(self.0.clone())
25 }
26}
27
28impl<T: TableType, I: IndexType> Deref for Index<T, I> {
29 type Target = Arc<IndexInner<T, I>>;
30
31 fn deref(&self) -> &Self::Target {
32 &self.0
33 }
34}
35
36pub struct IndexInner<T: TableType, I: IndexType> {
38 table_data: Tree,
39 key_func: Box<dyn Fn(&T) -> I + Send + Sync>,
41 indexed_data: Tree,
43 subscriber: Subscriber<T>,
45}
46
47impl<T: TableType, I: IndexType> IndexInner<T, I> {
48 pub(crate) fn new(
65 idx_name: &str,
66 engine: &Db,
67 table_data: &Tree,
68 key_func: impl Fn(&T) -> I + Send + Sync + 'static,
69 subscriber: Subscriber<T>,
70 ) -> DbResult<Self> {
71 let need_sync = !engine.tree_names().contains(&IVec::from(idx_name));
72
73 let new_index = Self {
74 table_data: table_data.clone(),
75 key_func: Box::new(key_func),
76 indexed_data: engine.open_tree(idx_name)?,
77 subscriber,
78 };
79
80 if need_sync {
82 new_index.sync()?;
83 }
84
85 Ok(new_index)
86 }
87
88 pub fn sync(&self) -> DbResult<()> {
90 self.indexed_data.clear()?;
91 for key in self.table_data.iter().keys() {
92 if let Some(data) = self.table_data.get(&key.clone()?)? {
94 self.insert(&Record {
95 id: decode(&key?)?,
96 data: decode(&data)?,
97 })?;
98 }
99 }
100
101 Ok(())
102 }
103
104 fn commit_log(&self) -> DbResult<()> {
106 while let Ok(event) = self.subscriber.rx.try_recv() {
108 match event {
109 subscriber::Event::Remove(record) => self.remove(&record)?,
110 subscriber::Event::Insert(record) => self.insert(&record)?,
111 subscriber::Event::Update {
112 id,
113 old_data,
114 new_data,
115 } => {
116 self.remove(&Record { id, data: old_data })?;
117 self.insert(&Record { id, data: new_data })?;
118 }
119 }
120 }
121
122 Ok(())
123 }
124
125 fn insert(&self, record: &Record<T>) -> DbResult<()> {
131 let key = encode(&(self.key_func)(&record.data))?;
132
133 if let Some(data) = self.indexed_data.get(&key)? {
134 let mut vec: Vec<u64> = decode(&data)?;
135 vec.push(record.id);
136 self.indexed_data.insert(key, encode(&vec)?)?;
137 } else {
138 self.indexed_data.insert(key, encode(&vec![record.id])?)?;
139 }
140
141 Ok(())
142 }
143
144 fn remove(&self, record: &Record<T>) -> DbResult<()> {
151 let key = encode(&(self.key_func)(&record.data))?;
152
153 if let Some(data) = self.indexed_data.get(&key)? {
154 let mut index_values: Vec<u64> = decode(&data)?;
155
156 if index_values.len() < 2 {
158 self.indexed_data.remove(&key)?;
159 } else {
160 if let Some(pos) = index_values.iter().position(|id| *id == record.id) {
162 index_values.remove(pos);
163 self.indexed_data.insert(&key, encode(&index_values)?)?;
165 }
166 }
167 }
168
169 Ok(())
170 }
171
172 pub fn delete(&self, query: &I) -> DbResult<Vec<Record<T>>> {
182 let records = self.select(query)?;
183
184 for record in &records {
185 self.table_data.remove(&encode(&record.id)?)?;
186 self.remove(&record)?;
187 }
188
189 Ok(records)
190 }
191
192 pub fn select(&self, query: &I) -> DbResult<Vec<Record<T>>> {
202 self.commit_log()?;
203
204 Ok(
205 if let Ok(Some(bytes)) = self.indexed_data.get(encode(&query)?) {
206 let ids: Vec<u64> = decode(&bytes)?;
207
208 let mut results = vec![];
209 for id in ids {
210 let encoded_data = self.table_data.get(encode(&id)?)?;
211 if let Some(encoded_data) = encoded_data {
212 results.push(Record {
213 id,
214 data: decode::<T>(&encoded_data)?,
215 })
216 }
217 }
218
219 results
220 } else {
221 Vec::new()
222 },
223 )
224 }
225
226 pub fn exists_record(&self, record: &Record<T>) -> DbResult<Vec<u64>> {
228 self.exists((self.key_func)(&record.data))
229 }
230
231 pub fn exists(&self, key: I) -> DbResult<Vec<u64>> {
237 Ok(self.select(&key)?.iter().map(|record| record.id).collect())
238 }
239
240 pub fn index_name(&self) -> String {
241 std::str::from_utf8(&self.indexed_data.name())
242 .unwrap()
243 .to_string()
244 }
245
246 pub fn generate_key(&self, data: &T) -> DbResult<Vec<u8>> {
247 encode(&(self.key_func)(&data))
248 }
249}
250
251pub trait AnyIndex<T: TableType> {
253 fn record_exists(&self, record: &Record<T>) -> DbResult<Vec<u64>>;
255 fn search(&self, value: Box<dyn Any>) -> DbResult<Vec<Record<T>>>;
257 fn idx_name(&self) -> String;
259 fn gen_key(&self, data: &T) -> DbResult<Vec<u8>>;
261}
262
263impl<T, I> AnyIndex<T> for Index<T, I>
264where
265 T: TableType,
266 I: IndexType + 'static,
267{
268 fn search(&self, value: Box<dyn Any>) -> DbResult<Vec<Record<T>>> {
269 let i = *value.downcast::<I>().unwrap();
270 self.select(&i)
271 }
272
273 fn idx_name(&self) -> String {
274 self.index_name()
275 }
276
277 fn record_exists(&self, record: &Record<T>) -> DbResult<Vec<u64>> {
278 self.exists_record(record)
279 }
280
281 fn gen_key(&self, data: &T) -> DbResult<Vec<u8>> {
282 self.generate_key(data)
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::{Table, TinyBase};
290
291 #[test]
292 fn index_sync() {
293 let db = TinyBase::new(None, true);
294 let table: Table<String> = db.open_table("test_table").unwrap();
295
296 let id = table.insert("value1".to_string()).unwrap();
298 let id2 = table.insert("value2".to_string()).unwrap();
299
300 let index = table.create_index("length", |value| value.len()).unwrap();
302
303 assert!(index.sync().is_ok());
304
305 let results = index.select(&6).unwrap();
306
307 assert_eq!(results.len(), 2);
308 assert_eq!(results[0].id, id);
309 assert_eq!(results[1].id, id2);
310 }
311
312 #[test]
313 fn index_select() {
314 let db = TinyBase::new(None, true);
315 let table: Table<String> = db.open_table("test_table").unwrap();
316
317 table.insert("value1".to_string()).unwrap();
319 table.insert("value2".to_string()).unwrap();
320
321 let index = table
323 .create_index("name", |value| value.to_owned())
324 .unwrap();
325
326 let record: Vec<Record<String>> =
327 index.select(&"value1".to_string()).expect("Select failed");
328
329 assert_eq!(record.len(), 1);
330 assert_eq!(record[0].data, "value1");
331
332 let record_2 = index
333 .select(&"non_existent_value".to_string())
334 .expect("Select failed");
335
336 assert_eq!(record_2.len(), 0);
337 }
338
339 #[test]
340 fn index_exists() {
341 let db = TinyBase::new(None, true);
342 let table: Table<String> = db.open_table("test_table").unwrap();
343
344 let index = table
346 .create_index("index_name", |value| value.to_owned())
347 .unwrap();
348
349 let id = table.insert("value1".to_string()).unwrap();
351
352 let record = Record {
353 id,
354 data: "value1".to_string(),
355 };
356
357 assert!(!index
358 .exists_record(&record)
359 .expect("Exists check failed")
360 .is_empty());
361
362 let record_not_exist = Record {
363 id: 999,
364 data: "non_existent_value".to_string(),
365 };
366
367 assert!(index
368 .exists_record(&record_not_exist)
369 .expect("Exists check failed")
370 .is_empty());
371 }
372}