1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::ops::Deref;
4use std::sync::mpsc::{self, Sender};
5use std::sync::{Arc, RwLock};
6
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use sled::{Db, Tree};
10
11use crate::constraint::{Constraint, ConstraintInner};
12use crate::encoding::{decode, encode};
13use crate::index::{Index, IndexInner, IndexType};
14use crate::record::Record;
15use crate::result::DbResult;
16use crate::subscriber::{Event, Subscriber};
17
18pub(crate) type SenderMap<T> = Arc<RwLock<HashMap<u64, Sender<T>>>>;
19
20pub trait TableType: Serialize + DeserializeOwned + Clone + Debug {}
21impl<T: Serialize + DeserializeOwned + Debug + Clone> TableType for T {}
22
23pub struct Table<T: TableType + 'static>(pub(crate) Arc<TableInner<T>>);
25
26impl<T: TableType> Clone for Table<T> {
27 fn clone(&self) -> Self {
28 Self(self.0.clone())
29 }
30}
31
32impl<T: TableType> Deref for Table<T> {
33 type Target = Arc<TableInner<T>>;
34
35 fn deref(&self) -> &Self::Target {
36 &self.0
37 }
38}
39
40pub struct TableInner<T>
41where
42 T: TableType + 'static,
43{
44 pub(crate) engine: Db,
45 root: Tree,
46 name: String,
47 senders: SenderMap<Event<T>>,
48 constraints: RwLock<Vec<Constraint<T>>>,
49}
50
51impl<T> TableInner<T>
52where
53 T: TableType,
54{
55 pub(crate) fn new(engine: &Db, name: &str) -> DbResult<Self> {
65 let root = engine.open_tree(name)?;
66
67 Ok(Self {
68 engine: engine.clone(),
69 root,
70 name: name.to_owned(),
71 senders: Arc::new(RwLock::new(HashMap::new())),
72 constraints: RwLock::new(Vec::new()),
73 })
74 }
75
76 pub fn insert(&self, value: T) -> DbResult<u64> {
86 let record = Record {
87 id: self.engine.generate_id()?,
88 data: value.clone(),
89 };
90
91 self.check_constraint(&record, &vec![])?;
92
93 self.root.insert(encode(&record.id)?, encode(&value)?)?;
94 self.dispatch_event(Event::Insert(record.clone()));
95
96 Ok(record.id)
97 }
98
99 fn check_constraint(&self, record: &Record<T>, additional_items: &Vec<T>) -> DbResult<()> {
102 for constraint in self.constraints.read().unwrap().iter() {
103 match &constraint.0 {
104 ConstraintInner::Unique(index) => {
105 let matches = index.record_exists(record)?;
106 if matches.len() > 1 || matches.len() == 1 && matches[0] != record.id {
108 return Err(crate::result::TinyBaseError::Exists {
109 constraint: index.idx_name(),
110 id: record.id,
111 });
112 }
113
114 let mut matches = vec![];
115 for additional in additional_items {
116 let key = index.gen_key(&additional)?;
117 if matches.contains(&key) {
118 return Err(crate::result::TinyBaseError::BatchOperationConstraints);
119 }
120
121 matches.push(key);
122 }
123 }
124 ConstraintInner::Check(condition) => {
125 if !condition(&record.data) {
126 return Err(crate::result::TinyBaseError::Condition);
127 }
128 }
129 };
130 }
131
132 Ok(())
133 }
134
135 pub fn select(&self, id: u64) -> DbResult<Option<Record<T>>> {
145 if let Some(serialized) = self.root.get(encode(&id)?)? {
146 Ok(Some(Record {
147 id,
148 data: decode(&serialized)?,
149 }))
150 } else {
151 Ok(None)
152 }
153 }
154
155 pub fn delete(&self, id: u64) -> DbResult<Option<Record<T>>> {
165 let serialized_id = encode(&id)?;
166 if let Some(serialized) = self.root.remove(serialized_id)? {
167 let record = Record {
168 id,
169 data: decode(&serialized)?,
170 };
171
172 self.dispatch_event(Event::Remove(record.clone()));
173
174 Ok(Some(record))
175 } else {
176 Ok(None)
177 }
178 }
179
180 pub fn update(&self, ids: &[u64], updater: fn(T) -> T) -> DbResult<Vec<Record<T>>> {
191 let mut records = vec![];
192 for id in ids {
193 if let Some(old) = self.select(*id)? {
194 records.push(Record {
195 id: old.id,
196 data: updater(old.data),
197 });
198 }
199 }
200
201 let additional: Vec<T> = records.iter().map(|r| r.data.clone()).collect();
202 for record in &records {
203 self.check_constraint(record, &additional)?;
204 }
205
206 let mut updated = vec![];
207 for record in records {
208 self.root
209 .update_and_fetch(encode(&record.id)?, |old_value| {
210 if let Some(old_value) = old_value {
211 updated.push(record.clone());
212
213 self.dispatch_event(Event::Update {
214 id: record.id.clone(),
215 old_data: decode(old_value).unwrap(),
216 new_data: record.data.clone(),
217 });
218
219 Some(encode(&record.data).unwrap())
220 } else {
221 None
222 }
223 })?;
224 }
225
226 Ok(updated)
227 }
228
229 pub fn create_index<I: IndexType>(
240 &self,
241 name: &str,
242 key_func: impl Fn(&T) -> I + Send + Sync + 'static,
243 ) -> DbResult<Index<T, I>> {
244 let sender_id = self.engine.generate_id()?;
245 let (tx, rx) = mpsc::channel();
246
247 let subscriber = Subscriber::new(sender_id, rx, self.senders.clone());
248 self.senders.write().unwrap().insert(sender_id, tx);
249
250 Ok(Index(Arc::new(IndexInner::new(
251 &format!("{}_idx_{}", self.name, name),
252 &self.engine,
253 &self.root,
254 key_func,
255 subscriber,
256 )?)))
257 }
258
259 pub fn constraint(&self, constraint: Constraint<T>) -> DbResult<()> {
265 let mut constraint_map = self.constraints.write().unwrap();
266
267 match &constraint.0 {
268 ConstraintInner::Unique(index) => {
270 let index_name = index.idx_name();
271
272 if constraint_map
273 .iter()
274 .find(|idx| {
275 if let ConstraintInner::Unique(unique) = &idx.0 {
276 unique.idx_name() == index_name
277 } else {
278 false
279 }
280 })
281 .is_none()
282 {
283 constraint_map.push(constraint);
284 }
285 }
286 ConstraintInner::Check(_) => constraint_map.push(constraint),
287 };
288
289 Ok(())
290 }
291
292 fn dispatch_event(&self, event: Event<T>) {
294 for sender in self.senders.read().unwrap().values() {
295 sender.send(event.clone()).unwrap();
296 }
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::TinyBase;
304
305 #[test]
306 fn table_insert_and_select() {
307 let db = TinyBase::new(None, true);
308 let table: Table<String> = db.open_table("test_table").unwrap();
309
310 let id = table.insert("test_value".to_string()).unwrap();
312 let record = table.select(id).unwrap().expect("Record not found");
313
314 assert_eq!(record.id, id);
315 assert_eq!(record.data, "test_value");
316 }
317
318 #[test]
319 fn table_delete() {
320 let db = TinyBase::new(None, true);
321 let table: Table<String> = db.open_table("test_table").unwrap();
322
323 let id = table.insert("test_value".to_string()).unwrap();
325
326 let deleted_record = table.delete(id).unwrap().expect("Record not found");
328
329 assert_eq!(deleted_record.id, id);
330 assert_eq!(deleted_record.data, "test_value");
331
332 assert!(table.select(id).unwrap().is_none());
334 }
335
336 #[test]
337 fn table_update() {
338 let db = TinyBase::new(None, true);
339 let table: Table<String> = db.open_table("test_table").unwrap();
340
341 let id1 = table.insert("value1".to_string()).unwrap();
343 let id2 = table.insert("value2".to_string()).unwrap();
344
345 let updated_records = table
347 .update(&[id1, id2], |_| "updated_value".to_string())
348 .expect("Update failed");
349
350 assert_eq!(updated_records.len(), 2);
351 assert_eq!(updated_records[0].id, id1);
352 assert_eq!(updated_records[0].data, "updated_value");
353
354 assert_eq!(updated_records[1].id, id2);
355 assert_eq!(updated_records[1].data, "updated_value");
356 }
357}