1use std::{collections::BTreeMap, convert::TryInto};
2
3use tokio::sync::mpsc::{self, Sender};
4use tokio::sync::oneshot;
5
6pub mod model;
7
8use model::Types;
9
10enum Action {
11 Insert(String, Types),
12 Contains(String),
13 Get(String),
14 Len,
15 Keys,
16 Values,
17 Remove(String),
18 RemoveEntry(String),
19}
20
21pub struct BTree {
26 tx: Sender<(Action, tokio::sync::oneshot::Sender<Option<Types>>)>,
27}
28
29impl BTree {
30 pub fn start(buffer_size: usize) -> Self {
33 let (tx, mut rx) = mpsc::channel(buffer_size);
34 tokio::spawn(async move {
35 let mut btree: BTreeMap<String, Types> = BTreeMap::new();
36 loop {
37 if let Some((action, tx_o)) = rx.recv().await {
38 let tx_o: tokio::sync::oneshot::Sender<Option<Types>> = tx_o;
39 match action {
40 Action::Insert(k, v) => {
41 let insert = btree.insert(k, v);
42 if let Err(_) = tx_o.send(insert) {
43 println!("the receiver dropped, mpsc insert");
44 }
45 }
46 Action::Contains(k) => {
47 let contains = btree.contains_key(&k);
48 if let Err(_) = tx_o.send(Some(Types::Boolean(contains))) {
49 println!("the receiver dropped, mpsc contains k: {}", k);
50 }
51 }
52 Action::Get(k) => {
53 let get = btree.get(&k);
54 let get = if let Some(types) = get {
55 Some(types.to_owned())
56 } else {
57 None
58 };
59 if let Err(_) = tx_o.send(get) {
60 println!("the receiver dropped, mpsc get k: {}", k);
61 }
62 }
63 Action::Keys => {
64 let get = btree.keys();
65 let keys: Vec<Types> = get.map(|k| k.to_owned().into()).collect();
66
67 if let Err(_) = tx_o.send(Some(Types::Vector(keys))) {
68 println!("the receiver dropped, mpsc get keys");
69 }
70 }
71 Action::Values => {
72 let get = btree.values();
73 let values: Vec<Types> = get.map(|k| k.to_owned()).collect();
74
75 if let Err(_) = tx_o.send(Some(Types::Vector(values))) {
76 println!("the receiver dropped, mpsc get values");
77 }
78 }
79 Action::Len => {
80 let len = btree.len();
81 if let Err(_) = tx_o.send(Some(Types::UInteger(len))) {
82 println!("the receiver dropped, mpsc len");
83 }
84 }
85 Action::Remove(k) => {
86 let remove = btree.remove(&k);
87
88 if let Err(_) = tx_o.send(remove) {
89 println!("the receiver dropped, mpsc remove for key: {}", k);
90 }
91 }
92 Action::RemoveEntry(k) => {
93 let remove = btree.remove_entry(&k);
94 let key_val = if let Some((key, value)) = remove {
95 Some(Types::KeyValue(key, Box::new(value)))
96 } else {
97 None
98 };
99 if let Err(_) = tx_o.send(key_val) {
100 println!("the receiver dropped, mpsc remove_entry for key: {}", k);
101 }
102 }
103 }
104 }
105 }
106 });
107
108 Self { tx }
109 }
110
111 pub async fn insert<V: Into<Types>>(&self, k: String, v: V) -> Result<Option<Types>, String> {
115 let v = v.into();
116 let tx = self.tx.clone();
117 let (tx_o, rx_o) = oneshot::channel();
118 let action = Action::Insert(k.clone(), v.clone());
119 let send = (action, tx_o);
120
121 tx.send(send)
122 .await
123 .map_err(|_| format!("receiver dropped, insert key {}, value {:?}", k, v))?;
124
125 rx_o.await
126 .map_err(|_| format!("insert failed {}, value {:?}", k, v))
127 }
128
129 pub async fn contains(&self, k: String) -> Result<bool, String> {
133 let tx = self.tx.clone();
134 let (tx_o, rx_o) = oneshot::channel();
135 let action = Action::Contains(k.clone());
136 let send = (action, tx_o);
137
138 tx.send(send)
139 .await
140 .map_err(|_| format!("receiver dropped, contains key {}", k))?;
141 match rx_o.await {
142 Ok(Some(Types::Boolean(true))) => Ok(true),
143 Err(e) => Err(format!("{:?}", e)),
144 _ => Ok(false),
145 }
146 }
147
148 pub async fn get(&self, k: String) -> Result<Option<Types>, String> {
152 let tx = self.tx.clone();
153 let (tx_o, rx_o) = oneshot::channel();
154 let action = Action::Get(k.clone());
155 let send = (action, tx_o);
156
157 tx.send(send)
158 .await
159 .map_err(|_| format!("receiver dropped, get key {}", k))?;
160
161 match rx_o.await {
162 Ok(types) => Ok(types),
163 Err(e) => Err(format!("get failed {} with error: {:?}", k, e)),
164 }
165 }
166
167 pub async fn len(&self) -> Result<usize, String> {
170 let tx = self.tx.clone();
171 let (tx_o, rx_o) = oneshot::channel();
172 let action = Action::Len;
173 let send = (action, tx_o);
174
175 tx.send(send)
176 .await
177 .map_err(|_| format!("receiver dropped, len",))?;
178
179 match rx_o.await {
180 Ok(Some(Types::UInteger(len))) => Ok(len),
181 _ => Err(format!("len failed")),
182 }
183 }
184
185 pub async fn keys(&self) -> Result<Vec<String>, String> {
189 let tx = self.tx.clone();
190 let (tx_o, rx_o) = oneshot::channel();
191 let action = Action::Keys;
192 let send = (action, tx_o);
193
194 tx.send(send)
195 .await
196 .map_err(|_| format!("receiver dropped, get keys"))?;
197
198 match rx_o.await {
199 Ok(Some(Types::Vector(types))) => {
200 let vec = types
201 .into_iter()
202 .map(|k| k.try_into())
203 .collect::<Result<Vec<String>, String>>();
204 vec
205 }
206 Err(e) => Err(format!("get keys failed with error: {:?}", e)),
207 _ => Err(format!("get keys failed")),
208 }
209 }
210
211 pub async fn values(&self) -> Result<Vec<Types>, String> {
214 let tx = self.tx.clone();
215 let (tx_o, rx_o) = oneshot::channel();
216 let action = Action::Values;
217 let send = (action, tx_o);
218
219 tx.send(send)
220 .await
221 .map_err(|_| format!("receiver dropped, get values"))?;
222
223 match rx_o.await {
224 Ok(Some(Types::Vector(types))) => Ok(types),
225 Err(e) => Err(format!("get values failed with error: {:?}", e)),
226 _ => Err(format!("get values failed")),
227 }
228 }
229
230 pub async fn remove(&self, k: String) -> Result<Option<Types>, String> {
234 let tx = self.tx.clone();
235 let (tx_o, rx_o) = oneshot::channel();
236 let action = Action::Remove(k.clone());
237 let send = (action, tx_o);
238
239 tx.send(send)
240 .await
241 .map_err(|_| format!("receiver dropped, remove key {}", k))?;
242
243 match rx_o.await {
244 Ok(types) => Ok(types),
245 Err(e) => Err(format!("remove failed {} with error: {:?}", k, e)),
246 }
247 }
248
249 pub async fn remove_entry(&self, k: String) -> Result<Option<Types>, String> {
253 let tx = self.tx.clone();
254 let (tx_o, rx_o) = oneshot::channel();
255 let action = Action::RemoveEntry(k.clone());
256 let send = (action, tx_o);
257
258 tx.send(send)
259 .await
260 .map_err(|_| format!("receiver dropped, remove_entry key {}", k))?;
261
262 match rx_o.await {
263 Ok(types) => Ok(types),
264 Err(e) => Err(format!("remove_entry failed {} with error: {:?}", k, e)),
265 }
266 }
267}