multisql/databases/sled/
base.rs1use {
2 super::{
3 err_into, fetch_schema,
4 mutable::{index_prefix, indexed_key},
5 SledDatabase,
6 },
7 crate::{
8 join_iters, DBBase, IndexFilter, JoinType, NullOrd, Plane, Result, Row, Schema, Value,
9 },
10 async_trait::async_trait,
11 rayon::slice::ParallelSliceMut,
12 sled::IVec,
13 std::{cmp::Ordering, convert::Into},
14};
15
16#[async_trait(?Send)]
17impl DBBase for SledDatabase {
18 async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
19 fetch_schema(&self.tree, table_name).map(|(_, schema)| schema)
20 }
21 async fn scan_schemas(&self) -> Result<Vec<Schema>> {
22 let prefix = "schema/".to_string();
23 self.tree
24 .scan_prefix(prefix.as_bytes())
25 .map(|item| {
26 let (_, bytes) = item.map_err(err_into)?;
27 bincode::deserialize(&bytes).map_err(err_into)
28 })
29 .collect()
30 }
31
32 async fn scan_data(&self, table_name: &str) -> Result<Plane> {
33 let prefix = format!("data/{}/", table_name);
34
35 self.tree
36 .scan_prefix(prefix.as_bytes())
37 .map(|item| {
38 let (key, value) = item.map_err(err_into)?;
39 let value = bincode::deserialize(&value).map_err(err_into)?;
40
41 Ok(((&key).into(), value))
42 })
43 .collect::<Result<Vec<(Value, Row)>>>()
44 }
45
46 async fn scan_data_indexed(
47 &self,
48 table_name: &str,
49 index_filter: IndexFilter,
50 ) -> Result<Plane> {
51 let index_results = self.scan_index(table_name, index_filter).await?;
52 let row_results = index_results.into_iter().map(|pk| {
53 if let Value::Bytes(pk) = pk {
54 self.tree
55 .get(&pk)
56 .map(|row| (pk, row.unwrap() ))
57 } else {
58 unreachable!();
59 }
60 });
61 row_results
62 .map(|item| {
63 let (pk, value) = item.map_err(err_into)?;
64 let value = bincode::deserialize(&value).map_err(err_into)?;
65
66 Ok((Value::Bytes(pk.to_vec()), value))
67 })
68 .collect::<Result<Vec<(Value, Row)>>>()
69 }
70
71 async fn scan_index(&self, table_name: &str, index_filter: IndexFilter) -> Result<Vec<Value>> {
72 use IndexFilter::*;
73 match index_filter.clone() {
74 LessThan(index_name, ..) | MoreThan(index_name, ..) => {
75 let prefix = index_prefix(table_name, &index_name);
77 let abs_min = IVec::from(prefix.as_bytes());
78 let abs_max = IVec::from([prefix.as_bytes(), &[0xFF]].concat());
79
80 let index_results = match index_filter {
81 LessThan(_, max) => self.tree.range(abs_min..indexed_key(&prefix, &max)?),
82 MoreThan(_, min) => self.tree.range(indexed_key(&prefix, &min)?..abs_max),
83 _ => unreachable!(),
84 };
85 let mut index_results = index_results
86 .map(|item| {
87 let (_, pk) = item.map_err(err_into)?;
88 let pk = Value::Bytes(pk.to_vec());
89
90 Ok(pk)
91 })
92 .collect::<Result<Vec<Value>>>()?;
93
94 index_results.par_sort_unstable_by(|a, b| a.null_cmp(b).unwrap_or(Ordering::Equal));
95 Ok(index_results)
96 }
97 Inner(left, right) => {
98 let (left, right) = (
99 self.scan_index(table_name, *left),
100 self.scan_index(table_name, *right),
101 );
102 let (left, right) = (left.await?, right.await?);
103 Ok(join_iters(JoinType::Inner, left, right))
104 }
105 Outer(left, right) => {
106 let (left, right) = (
107 self.scan_index(table_name, *left),
108 self.scan_index(table_name, *right),
109 );
110 let (left, right) = (left.await?, right.await?);
111 Ok(join_iters(JoinType::Outer, left, right))
112 }
113 }
114 }
115}