1use alloc::borrow::ToOwned;
2use hashbrown::HashMap;
3use alloc::fmt::Debug;
4use core::ops::Deref;
5use thingbuf::mpsc::{self, Sender};
6use alloc::sync::Arc;
7use spin::RwLock;
8
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use sled::{Db, Tree};
12
13use crate::constraint::{Constraint, ConstraintInner};
14use crate::encoding::{decode, encode};
15use crate::index::{Index, IndexInner, IndexType};
16use crate::record::Record;
17use crate::result::DbResult;
18use crate::subscriber::{Event, Subscriber};
19
20pub(crate) type SenderMap<T> = Arc<RwLock<HashMap<u64, Sender<T>>>>;
21
22#[allow(unused_imports)]
23use alloc::{boxed::Box, format, vec, vec::Vec, string::{String, ToString}};
24
25pub trait TableType: Serialize + DeserializeOwned + Clone + Debug {}
26impl<T: Serialize + DeserializeOwned + Debug + Clone> TableType for T {}
27
28pub struct Table<T: TableType + 'static>(pub(crate) Arc<TableInner<T>>);
30
31impl<T: TableType + 'static> Table<T> {
32 pub fn create_index<I: IndexType>(
43 &self,
44 name: &str,
45 key_func: impl Fn(&T) -> I + Send + Sync + 'static,
46 ) -> DbResult<Index<T, I>> {
47 let sender_id = self.engine.generate_id()?;
48 let (tx, rx) = mpsc::channel(usize::MAX);
49
50 let subscriber = Subscriber::new(sender_id, rx, self.senders.clone());
51 self.senders.write().insert(sender_id, tx);
52
53 let weak_self = Arc::downgrade(&self.0);
54
55 Ok(Index(Arc::new(IndexInner::new(
56 &format!("{}_idx_{}", self.name, name),
57 &self.engine,
58 weak_self,
59 key_func,
60 subscriber,
61 )?)))
62 }
63}
64
65impl<T: TableType> Clone for Table<T> {
66 fn clone(&self) -> Self {
67 Self(self.0.clone())
68 }
69}
70
71impl<T: TableType> Deref for Table<T> {
72 type Target = Arc<TableInner<T>>;
73
74 fn deref(&self) -> &Self::Target {
75 &self.0
76 }
77}
78
79pub struct TableInner<T>
80where
81 T: TableType + 'static,
82{
83 pub(crate) engine: Db,
84 pub(crate) root: RwLock<Tree>,
86 name: String,
87 senders: SenderMap<Event<T>>,
88 constraints: RwLock<Vec<Constraint<T>>>,
89}
90
91impl<T> TableInner<T>
92where
93 T: TableType,
94{
95 pub(crate) fn new(engine: &Db, name: &str) -> DbResult<Self> {
105 let root = RwLock::new(engine.open_tree(name)?);
106
107 Ok(Self {
108 engine: engine.clone(),
109 root,
110 name: name.to_owned(),
111 senders: Arc::new(RwLock::new(HashMap::new())),
112 constraints: RwLock::new(Vec::new()),
113 })
114 }
115
116 pub fn insert(&self, value: T) -> DbResult<u64> {
126 let root = self.root.write();
127
128 let record = Record {
129 id: self.engine.generate_id()?,
130 data: value.clone(),
131 };
132
133 self.check_constraint(&root, &record, &vec![])?;
134 root.insert(encode(&record.id)?, encode(&value)?)?;
135
136 self.dispatch_event(Event::Insert(record.clone()));
137
138 Ok(record.id)
139 }
140
141 fn check_constraint(
145 &self,
146 tree: &Tree,
147 record: &Record<T>,
148 additional_items: &Vec<T>,
149 ) -> DbResult<()> {
150 for constraint in self.constraints.read().iter() {
151 match &constraint.0 {
152 ConstraintInner::Unique(index) => {
153 let matches = index.tree_exists(tree, record)?;
154 if matches.len() > 1 || matches.len() == 1 && matches[0] != record.id {
156 return Err(crate::result::TinyBaseError::Exists {
157 constraint: index.idx_name(),
158 id: record.id,
159 });
160 }
161
162 let mut matches = vec![];
163 for additional in additional_items {
164 let key = index.gen_key(&additional)?;
165 if matches.contains(&key) {
166 return Err(crate::result::TinyBaseError::BatchOperationConstraints);
167 }
168
169 matches.push(key);
170 }
171 }
172 ConstraintInner::Check(condition) => {
173 if !condition(&record.data) {
174 return Err(crate::result::TinyBaseError::Condition);
175 }
176 }
177 };
178 }
179
180 Ok(())
181 }
182
183 pub fn select(&self, id: u64) -> DbResult<Option<Record<T>>> {
193 self.tree_select(&self.root.read(), id)
194 }
195
196 pub(crate) fn tree_select(&self, tree: &Tree, id: u64) -> DbResult<Option<Record<T>>> {
198 match tree.get(encode(&id)?)? { Some(serialized) => {
199 Ok(Some(Record {
200 id,
201 data: decode(&serialized)?,
202 }))
203 } _ => {
204 Ok(None)
205 }}
206 }
207
208 pub fn delete(&self, id: u64) -> DbResult<Option<Record<T>>> {
218 let serialized_id = encode(&id)?;
219
220 match self.root.read().remove(serialized_id)? { Some(serialized) => {
222 let record = Record {
223 id,
224 data: decode(&serialized)?,
225 };
226
227 self.dispatch_event(Event::Remove(record.clone()));
228
229 Ok(Some(record))
230 } _ => {
231 Ok(None)
232 }}
233 }
234
235 pub fn update(&self, ids: &[u64], updater: fn(T) -> T) -> DbResult<Vec<Record<T>>> {
246 let root = self.root.write();
247
248 let mut records = vec![];
249 for id in ids {
250 if let Some(old) = self.tree_select(&root, *id)? {
251 records.push(Record {
252 id: old.id,
253 data: updater(old.data),
254 });
255 }
256 }
257
258 let additional: Vec<T> = records.iter().map(|r| r.data.clone()).collect();
259 for record in &records {
260 self.check_constraint(&root, record, &additional)?;
261 }
262
263 let mut updated = vec![];
264 for record in records {
265 root.update_and_fetch(encode(&record.id)?, |old_value| {
266 if let Some(old_value) = old_value {
267 updated.push(record.clone());
268
269 self.dispatch_event(Event::Update {
270 id: record.id.clone(),
271 old_data: decode(old_value).unwrap(),
272 new_data: record.data.clone(),
273 });
274
275 Some(encode(&record.data).unwrap())
276 } else {
277 None
278 }
279 })?;
280 }
281
282 Ok(updated)
283 }
284
285 pub fn constraint(&self, constraint: Constraint<T>) -> DbResult<()> {
291 let mut constraint_map = self.constraints.write();
292
293 match &constraint.0 {
294 ConstraintInner::Unique(index) => {
296 let index_name = index.idx_name();
297
298 if constraint_map
299 .iter()
300 .find(|idx| {
301 if let ConstraintInner::Unique(unique) = &idx.0 {
302 unique.idx_name() == index_name
303 } else {
304 false
305 }
306 })
307 .is_none()
308 {
309 constraint_map.push(constraint);
310 }
311 }
312 ConstraintInner::Check(_) => constraint_map.push(constraint),
313 };
314
315 Ok(())
316 }
317
318 fn dispatch_event(&self, event: Event<T>) {
320 for sender in self.senders.read().values() {
321 #[allow(unused_must_use)]
322 sender.send(event.clone());
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::TinyBase;
331
332 #[test]
333 fn table_insert_and_select() {
334 let db = TinyBase::new(None, true);
335 let table: Table<String> = db.open_table("test_table").unwrap();
336
337 let id = table.insert("test_value".to_string()).unwrap();
339 let record = table.select(id).unwrap().expect("Record not found");
340
341 assert_eq!(record.id, id);
342 assert_eq!(record.data, "test_value");
343 }
344
345 #[test]
346 fn table_delete() {
347 let db = TinyBase::new(None, true);
348 let table: Table<String> = db.open_table("test_table").unwrap();
349
350 let id = table.insert("test_value".to_string()).unwrap();
352
353 let deleted_record = table.delete(id).unwrap().expect("Record not found");
355
356 assert_eq!(deleted_record.id, id);
357 assert_eq!(deleted_record.data, "test_value");
358
359 assert!(table.select(id).unwrap().is_none());
361 }
362
363 #[test]
364 fn table_update() {
365 let db = TinyBase::new(None, true);
366 let table: Table<String> = db.open_table("test_table").unwrap();
367
368 let id1 = table.insert("value1".to_string()).unwrap();
370 let id2 = table.insert("value2".to_string()).unwrap();
371
372 let updated_records = table
374 .update(&[id1, id2], |_| "updated_value".to_string())
375 .expect("Update failed");
376
377 assert_eq!(updated_records.len(), 2);
378 assert_eq!(updated_records[0].id, id1);
379 assert_eq!(updated_records[0].data, "updated_value");
380
381 assert_eq!(updated_records[1].id, id2);
382 assert_eq!(updated_records[1].data, "updated_value");
383 }
384}