1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use {
	super::{
		err_into, fetch_schema,
		mutable::{index_prefix, indexed_key},
		SledDatabase,
	},
	crate::{
		join_iters, DBBase, IndexFilter, JoinType, NullOrd, Plane, Result, Row, Schema, Value,
	},
	async_trait::async_trait,
	rayon::slice::ParallelSliceMut,
	sled::IVec,
	std::{cmp::Ordering, convert::Into},
};

#[async_trait(?Send)]
impl DBBase for SledDatabase {
	async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
		fetch_schema(&self.tree, table_name).map(|(_, schema)| schema)
	}
	async fn scan_schemas(&self) -> Result<Vec<Schema>> {
		let prefix = "schema/".to_string();
		self.tree
			.scan_prefix(prefix.as_bytes())
			.map(|item| {
				let (_, bytes) = item.map_err(err_into)?;
				bincode::deserialize(&bytes).map_err(err_into)
			})
			.collect()
	}

	async fn scan_data(&self, table_name: &str) -> Result<Plane> {
		let prefix = format!("data/{}/", table_name);

		self.tree
			.scan_prefix(prefix.as_bytes())
			.map(|item| {
				let (key, value) = item.map_err(err_into)?;
				let value = bincode::deserialize(&value).map_err(err_into)?;

				Ok(((&key).into(), value))
			})
			.collect::<Result<Vec<(Value, Row)>>>()
	}

	async fn scan_data_indexed(
		&self,
		table_name: &str,
		index_filter: IndexFilter,
	) -> Result<Plane> {
		let index_results = self.scan_index(table_name, index_filter).await?;
		let row_results = index_results.into_iter().map(|pk| {
			if let Value::Bytes(pk) = pk {
				self.tree
					.get(&pk)
					.map(|row| (pk, row.unwrap() /*TODO: Handle!*/))
			} else {
				unreachable!();
			}
		});
		row_results
			.map(|item| {
				let (pk, value) = item.map_err(err_into)?;
				let value = bincode::deserialize(&value).map_err(err_into)?;

				Ok((Value::Bytes(pk.to_vec()), value))
			})
			.collect::<Result<Vec<(Value, Row)>>>()
	}

	async fn scan_index(&self, table_name: &str, index_filter: IndexFilter) -> Result<Vec<Value>> {
		use IndexFilter::*;
		match index_filter.clone() {
			LessThan(index_name, ..) | MoreThan(index_name, ..) => {
				// TODO: Genericise and optimise
				let prefix = index_prefix(table_name, &index_name);
				let abs_min = IVec::from(prefix.as_bytes());
				let abs_max = IVec::from([prefix.as_bytes(), &[0xFF]].concat());

				let index_results = match index_filter {
					LessThan(_, max) => self.tree.range(abs_min..indexed_key(&prefix, &max)?),
					MoreThan(_, min) => self.tree.range(indexed_key(&prefix, &min)?..abs_max),
					_ => unreachable!(),
				};
				let mut index_results = index_results
					.map(|item| {
						let (_, pk) = item.map_err(err_into)?;
						let pk = Value::Bytes(pk.to_vec());

						Ok(pk)
					})
					.collect::<Result<Vec<Value>>>()?;

				index_results.par_sort_unstable_by(|a, b| a.null_cmp(b).unwrap_or(Ordering::Equal));
				Ok(index_results)
			}
			Inner(left, right) => {
				let (left, right) = (
					self.scan_index(table_name, *left),
					self.scan_index(table_name, *right),
				);
				let (left, right) = (left.await?, right.await?);
				Ok(join_iters(JoinType::Inner, left, right))
			}
			Outer(left, right) => {
				let (left, right) = (
					self.scan_index(table_name, *left),
					self.scan_index(table_name, *right),
				);
				let (left, right) = (left.await?, right.await?);
				Ok(join_iters(JoinType::Outer, left, right))
			}
		}
	}
}