1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use {
super::{err_into, key, lock, SledStorage, Snapshot, State},
async_trait::async_trait,
gluesql_core::{
data::{Key, Row, Schema},
result::{Error, Result},
store::{RowIter, Store},
},
};
#[async_trait(?Send)]
impl Store for SledStorage {
async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
let (txid, created_at, temp) = match self.state {
State::Transaction {
txid, created_at, ..
} => (txid, created_at, false),
State::Idle => lock::register(&self.tree, self.id_offset)
.map(|(txid, created_at)| (txid, created_at, true))?,
};
let lock_txid = lock::fetch(&self.tree, txid, created_at, self.tx_timeout)?;
let key = format!("schema/{}", table_name);
let schema = self
.tree
.get(key.as_bytes())
.map_err(err_into)?
.map(|v| bincode::deserialize(&v))
.transpose()
.map_err(err_into)?
.and_then(|snapshot: Snapshot<Schema>| snapshot.extract(txid, lock_txid));
if temp {
lock::unregister(&self.tree, txid)?;
}
Ok(schema)
}
async fn fetch_data(&self, table_name: &str, key: &Key) -> Result<Option<Row>> {
let (txid, created_at) = match self.state {
State::Transaction {
txid, created_at, ..
} => (txid, created_at),
State::Idle => {
return Err(Error::StorageMsg(
"conflict - fetch_data failed, lock does not exist".to_owned(),
));
}
};
let lock_txid = lock::fetch(&self.tree, txid, created_at, self.tx_timeout)?;
let key = key::data(table_name, key.to_cmp_be_bytes());
let row = self
.tree
.get(&key)
.map_err(err_into)?
.map(|v| bincode::deserialize(&v))
.transpose()
.map_err(err_into)?
.and_then(|snapshot: Snapshot<Row>| snapshot.extract(txid, lock_txid));
Ok(row)
}
async fn scan_data(&self, table_name: &str) -> Result<RowIter> {
let (txid, created_at) = match self.state {
State::Transaction {
txid, created_at, ..
} => (txid, created_at),
State::Idle => {
return Err(Error::StorageMsg(
"conflict - scan_data failed, lock does not exist".to_owned(),
));
}
};
let lock_txid = lock::fetch(&self.tree, txid, created_at, self.tx_timeout)?;
let prefix = key::data_prefix(table_name);
let prefix_len = prefix.len();
let result_set = self
.tree
.scan_prefix(prefix.as_bytes())
.map(move |item| {
let (key, value) = item.map_err(err_into)?;
let key = key.subslice(prefix_len, key.len() - prefix_len).to_vec();
let snapshot: Snapshot<Row> = bincode::deserialize(&value).map_err(err_into)?;
let row = snapshot.extract(txid, lock_txid);
let item = row.map(|row| (Key::Bytea(key), row));
Ok(item)
})
.filter_map(|item| item.transpose());
Ok(Box::new(result_set))
}
}