multisql/databases/memory/
base.rs1use std::collections::{BTreeMap, HashMap};
2
3use crate::{join_iters, JoinType, Row};
4
5use {
6 crate::{
7 DBBase, IndexFilter, MemoryDatabase, MemoryDatabaseError, Plane, Result, Schema, Value,
8 },
9 async_trait::async_trait,
10};
11
12#[async_trait(?Send)]
13impl DBBase for MemoryDatabase {
14 async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
15 Ok(self.tables.get(&table_name.to_string()).cloned())
16 }
17 async fn scan_schemas(&self) -> Result<Vec<Schema>> {
18 Ok(self.tables.values().cloned().collect())
19 }
20
21 async fn scan_data(&self, table_name: &str) -> Result<Plane> {
22 self.data
23 .get(&table_name.to_string())
24 .cloned()
25 .ok_or(MemoryDatabaseError::TableNotFound.into())
26 .map(|rows| rows.into_iter().collect())
27 }
28
29 async fn scan_data_indexed(
30 &self,
31 table_name: &str,
32 index_filter: IndexFilter,
33 ) -> Result<Plane> {
34 let index_results = self.scan_index(table_name, index_filter).await?;
35 let default = HashMap::new();
36 let rows = self.data.get(&table_name.to_string()).unwrap_or(&default);
37 Ok(index_results
38 .into_iter()
39 .filter_map(|pk| rows.get(&pk).map(|row| (pk.clone(), row.clone())))
40 .collect::<Vec<(Value, Row)>>())
41 }
42
43 async fn scan_index(&self, table_name: &str, index_filter: IndexFilter) -> Result<Vec<Value>> {
44 use IndexFilter::*;
45 match index_filter.clone() {
46 LessThan(index_name, ..) | MoreThan(index_name, ..) => {
47 let default = BTreeMap::new();
48 let index = self
49 .indexes
50 .get(&table_name.to_string())
51 .and_then(|indexes| indexes.get(&index_name))
52 .unwrap_or(&default);
53 let index_results = match index_filter {
54 LessThan(_, max) => index.range(..max),
55 MoreThan(_, min) => index.range(min..),
56 _ => unreachable!(),
57 }
58 .map(|(_, pk)| pk.clone())
59 .collect();
60 Ok(index_results)
61 }
62 Inner(left, right) => {
63 let (left, right) = (
64 self.scan_index(table_name, *left),
65 self.scan_index(table_name, *right),
66 );
67 let (left, right) = (left.await?, right.await?);
68 Ok(join_iters(JoinType::Inner, left, right))
69 }
70 Outer(left, right) => {
71 let (left, right) = (
72 self.scan_index(table_name, *left),
73 self.scan_index(table_name, *right),
74 );
75 let (left, right) = (left.await?, right.await?);
76 Ok(join_iters(JoinType::Outer, left, right))
77 }
78 }
79 }
80}