multisql/databases/sled/
base.rs

1use {
2	super::{
3		err_into, fetch_schema,
4		mutable::{index_prefix, indexed_key},
5		SledDatabase,
6	},
7	crate::{
8		join_iters, DBBase, IndexFilter, JoinType, NullOrd, Plane, Result, Row, Schema, Value,
9	},
10	async_trait::async_trait,
11	rayon::slice::ParallelSliceMut,
12	sled::IVec,
13	std::{cmp::Ordering, convert::Into},
14};
15
16#[async_trait(?Send)]
17impl DBBase for SledDatabase {
18	async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
19		fetch_schema(&self.tree, table_name).map(|(_, schema)| schema)
20	}
21	async fn scan_schemas(&self) -> Result<Vec<Schema>> {
22		let prefix = "schema/".to_string();
23		self.tree
24			.scan_prefix(prefix.as_bytes())
25			.map(|item| {
26				let (_, bytes) = item.map_err(err_into)?;
27				bincode::deserialize(&bytes).map_err(err_into)
28			})
29			.collect()
30	}
31
32	async fn scan_data(&self, table_name: &str) -> Result<Plane> {
33		let prefix = format!("data/{}/", table_name);
34
35		self.tree
36			.scan_prefix(prefix.as_bytes())
37			.map(|item| {
38				let (key, value) = item.map_err(err_into)?;
39				let value = bincode::deserialize(&value).map_err(err_into)?;
40
41				Ok(((&key).into(), value))
42			})
43			.collect::<Result<Vec<(Value, Row)>>>()
44	}
45
46	async fn scan_data_indexed(
47		&self,
48		table_name: &str,
49		index_filter: IndexFilter,
50	) -> Result<Plane> {
51		let index_results = self.scan_index(table_name, index_filter).await?;
52		let row_results = index_results.into_iter().map(|pk| {
53			if let Value::Bytes(pk) = pk {
54				self.tree
55					.get(&pk)
56					.map(|row| (pk, row.unwrap() /*TODO: Handle!*/))
57			} else {
58				unreachable!();
59			}
60		});
61		row_results
62			.map(|item| {
63				let (pk, value) = item.map_err(err_into)?;
64				let value = bincode::deserialize(&value).map_err(err_into)?;
65
66				Ok((Value::Bytes(pk.to_vec()), value))
67			})
68			.collect::<Result<Vec<(Value, Row)>>>()
69	}
70
71	async fn scan_index(&self, table_name: &str, index_filter: IndexFilter) -> Result<Vec<Value>> {
72		use IndexFilter::*;
73		match index_filter.clone() {
74			LessThan(index_name, ..) | MoreThan(index_name, ..) => {
75				// TODO: Genericise and optimise
76				let prefix = index_prefix(table_name, &index_name);
77				let abs_min = IVec::from(prefix.as_bytes());
78				let abs_max = IVec::from([prefix.as_bytes(), &[0xFF]].concat());
79
80				let index_results = match index_filter {
81					LessThan(_, max) => self.tree.range(abs_min..indexed_key(&prefix, &max)?),
82					MoreThan(_, min) => self.tree.range(indexed_key(&prefix, &min)?..abs_max),
83					_ => unreachable!(),
84				};
85				let mut index_results = index_results
86					.map(|item| {
87						let (_, pk) = item.map_err(err_into)?;
88						let pk = Value::Bytes(pk.to_vec());
89
90						Ok(pk)
91					})
92					.collect::<Result<Vec<Value>>>()?;
93
94				index_results.par_sort_unstable_by(|a, b| a.null_cmp(b).unwrap_or(Ordering::Equal));
95				Ok(index_results)
96			}
97			Inner(left, right) => {
98				let (left, right) = (
99					self.scan_index(table_name, *left),
100					self.scan_index(table_name, *right),
101				);
102				let (left, right) = (left.await?, right.await?);
103				Ok(join_iters(JoinType::Inner, left, right))
104			}
105			Outer(left, right) => {
106				let (left, right) = (
107					self.scan_index(table_name, *left),
108					self.scan_index(table_name, *right),
109				);
110				let (left, right) = (left.await?, right.await?);
111				Ok(join_iters(JoinType::Outer, left, right))
112			}
113		}
114	}
115}