observable_tree/
lib.rs

1use std::{collections::BTreeMap, convert::TryInto};
2
3use tokio::sync::mpsc::{self, Sender};
4use tokio::sync::oneshot;
5
6pub mod model;
7
8use model::Types;
9
10enum Action {
11    Insert(String, Types),
12    Contains(String),
13    Get(String),
14    Len,
15    Keys,
16    Values,
17    Remove(String),
18    RemoveEntry(String),
19}
20
21/// `BTree` is where the informatio `Sender` is contained.
22/// Its inner implementation has a tuple containing the action to be taken as well as a oneshot channel to receive data.
23/// To start the `BTree` thread just execute `BTree::start(buffer_size: usize)`. If you `buffer_size` is too short
24/// it may cause synchronization problems, so it should be well ajusted to your application needs.
25pub struct BTree {
26    tx: Sender<(Action, tokio::sync::oneshot::Sender<Option<Types>>)>,
27}
28
29impl BTree {
30    /// `BTree::start(buffer_size: usize)` is the entrypoint to start using `BTree` methods.
31    /// It creates a thread containing the BTreeMap and keeps listening to entries.
32    pub fn start(buffer_size: usize) -> Self {
33        let (tx, mut rx) = mpsc::channel(buffer_size);
34        tokio::spawn(async move {
35            let mut btree: BTreeMap<String, Types> = BTreeMap::new();
36            loop {
37                if let Some((action, tx_o)) = rx.recv().await {
38                    let tx_o: tokio::sync::oneshot::Sender<Option<Types>> = tx_o;
39                    match action {
40                        Action::Insert(k, v) => {
41                            let insert = btree.insert(k, v);
42                            if let Err(_) = tx_o.send(insert) {
43                                println!("the receiver dropped, mpsc insert");
44                            }
45                        }
46                        Action::Contains(k) => {
47                            let contains = btree.contains_key(&k);
48                            if let Err(_) = tx_o.send(Some(Types::Boolean(contains))) {
49                                println!("the receiver dropped, mpsc contains k: {}", k);
50                            }
51                        }
52                        Action::Get(k) => {
53                            let get = btree.get(&k);
54                            let get = if let Some(types) = get {
55                                Some(types.to_owned())
56                            } else {
57                                None
58                            };
59                            if let Err(_) = tx_o.send(get) {
60                                println!("the receiver dropped, mpsc get k: {}", k);
61                            }
62                        }
63                        Action::Keys => {
64                            let get = btree.keys();
65                            let keys: Vec<Types> = get.map(|k| k.to_owned().into()).collect();
66
67                            if let Err(_) = tx_o.send(Some(Types::Vector(keys))) {
68                                println!("the receiver dropped, mpsc get keys");
69                            }
70                        }
71                        Action::Values => {
72                            let get = btree.values();
73                            let values: Vec<Types> = get.map(|k| k.to_owned()).collect();
74
75                            if let Err(_) = tx_o.send(Some(Types::Vector(values))) {
76                                println!("the receiver dropped, mpsc get values");
77                            }
78                        }
79                        Action::Len => {
80                            let len = btree.len();
81                            if let Err(_) = tx_o.send(Some(Types::UInteger(len))) {
82                                println!("the receiver dropped, mpsc len");
83                            }
84                        }
85                        Action::Remove(k) => {
86                            let remove = btree.remove(&k);
87
88                            if let Err(_) = tx_o.send(remove) {
89                                println!("the receiver dropped, mpsc remove for key: {}", k);
90                            }
91                        }
92                        Action::RemoveEntry(k) => {
93                            let remove = btree.remove_entry(&k);
94                            let key_val = if let Some((key, value)) = remove {
95                                Some(Types::KeyValue(key, Box::new(value)))
96                            } else {
97                                None
98                            };
99                            if let Err(_) = tx_o.send(key_val) {
100                                println!("the receiver dropped, mpsc remove_entry for key: {}", k);
101                            }
102                        }
103                    }
104                }
105            }
106        });
107
108        Self { tx }
109    }
110
111    /// Method `insert` is equivalent to [`std::collection::BTreeMap insert`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.insert),
112    /// it returns `None` if the key does not exist and it returns `Some(Types::_)` with the previous value,
113    /// if the key already exists.
114    pub async fn insert<V: Into<Types>>(&self, k: String, v: V) -> Result<Option<Types>, String> {
115        let v = v.into();
116        let tx = self.tx.clone();
117        let (tx_o, rx_o) = oneshot::channel();
118        let action = Action::Insert(k.clone(), v.clone());
119        let send = (action, tx_o);
120
121        tx.send(send)
122            .await
123            .map_err(|_| format!("receiver dropped, insert key {}, value {:?}", k, v))?;
124
125        rx_o.await
126            .map_err(|_| format!("insert failed {}, value {:?}", k, v))
127    }
128
129    /// Method `contains` is equivalent to [`std::collection::BTreeMap contains_key`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.contains_key),
130    /// It checks if a key already exists in the `BTree`. If the key exists the return is `Ok(true)`,
131    /// if it doesn't exist it returns `Ok(false)`
132    pub async fn contains(&self, k: String) -> Result<bool, String> {
133        let tx = self.tx.clone();
134        let (tx_o, rx_o) = oneshot::channel();
135        let action = Action::Contains(k.clone());
136        let send = (action, tx_o);
137
138        tx.send(send)
139            .await
140            .map_err(|_| format!("receiver dropped, contains key {}", k))?;
141        match rx_o.await {
142            Ok(Some(Types::Boolean(true))) => Ok(true),
143            Err(e) => Err(format!("{:?}", e)),
144            _ => Ok(false),
145        }
146    }
147
148    /// Method `get` is equivalent to [`std::collection::BTreeMap get`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.get),
149    /// It returns the value contained at the key passed as argument. If no key is found the return is `Ok(None)`,
150    /// else it returns `Ok(Some(Types::_))`.
151    pub async fn get(&self, k: String) -> Result<Option<Types>, String> {
152        let tx = self.tx.clone();
153        let (tx_o, rx_o) = oneshot::channel();
154        let action = Action::Get(k.clone());
155        let send = (action, tx_o);
156
157        tx.send(send)
158            .await
159            .map_err(|_| format!("receiver dropped, get key {}", k))?;
160
161        match rx_o.await {
162            Ok(types) => Ok(types),
163            Err(e) => Err(format!("get failed {} with error: {:?}", k, e)),
164        }
165    }
166
167    /// Method `len` is equivalent to [`std::collection::BTreeMap len`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.len),
168    /// It returns the length of the btree as a usize.
169    pub async fn len(&self) -> Result<usize, String> {
170        let tx = self.tx.clone();
171        let (tx_o, rx_o) = oneshot::channel();
172        let action = Action::Len;
173        let send = (action, tx_o);
174
175        tx.send(send)
176            .await
177            .map_err(|_| format!("receiver dropped, len",))?;
178
179        match rx_o.await {
180            Ok(Some(Types::UInteger(len))) => Ok(len),
181            _ => Err(format!("len failed")),
182        }
183    }
184
185    /// Method `keys` is equivalent to [`std::collection::BTreeMap keys`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.keys),
186    /// It returns a vector containing all the keys sorted.
187    /// For `BTree` the keys are always `Strings`.
188    pub async fn keys(&self) -> Result<Vec<String>, String> {
189        let tx = self.tx.clone();
190        let (tx_o, rx_o) = oneshot::channel();
191        let action = Action::Keys;
192        let send = (action, tx_o);
193
194        tx.send(send)
195            .await
196            .map_err(|_| format!("receiver dropped, get keys"))?;
197
198        match rx_o.await {
199            Ok(Some(Types::Vector(types))) => {
200                let vec = types
201                    .into_iter()
202                    .map(|k| k.try_into())
203                    .collect::<Result<Vec<String>, String>>();
204                vec
205            }
206            Err(e) => Err(format!("get keys failed with error: {:?}", e)),
207            _ => Err(format!("get keys failed")),
208        }
209    }
210
211    /// Method `values` is equivalent to [`std::collection::BTreeMap values`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.values),
212    /// It returns a vector containing all the values sorted by their respective keys order.
213    pub async fn values(&self) -> Result<Vec<Types>, String> {
214        let tx = self.tx.clone();
215        let (tx_o, rx_o) = oneshot::channel();
216        let action = Action::Values;
217        let send = (action, tx_o);
218
219        tx.send(send)
220            .await
221            .map_err(|_| format!("receiver dropped, get values"))?;
222
223        match rx_o.await {
224            Ok(Some(Types::Vector(types))) => Ok(types),
225            Err(e) => Err(format!("get values failed with error: {:?}", e)),
226            _ => Err(format!("get values failed")),
227        }
228    }
229
230    /// Method `remove` is equivalent to [`std::collection::BTreeMap remove`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.remove),
231    /// It returns the value removed from the `BTree` for the key passed as argument. If no key is found the return is `Ok(None)`,
232    /// else it returns `Ok(Some(Types::_))`.
233    pub async fn remove(&self, k: String) -> Result<Option<Types>, String> {
234        let tx = self.tx.clone();
235        let (tx_o, rx_o) = oneshot::channel();
236        let action = Action::Remove(k.clone());
237        let send = (action, tx_o);
238
239        tx.send(send)
240            .await
241            .map_err(|_| format!("receiver dropped, remove key {}", k))?;
242
243        match rx_o.await {
244            Ok(types) => Ok(types),
245            Err(e) => Err(format!("remove failed {} with error: {:?}", k, e)),
246        }
247    }
248
249    /// Method `remove_entry` is equivalent to [`std::collection::BTreeMap remove_entry`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.remove_entry),
250    /// It returns the key and value removed from the `BTree` for the key passed as argument as a `Option<Types::KeyValue(_,_)>`.
251    /// If no key is found the return is `Ok(None)`,
252    pub async fn remove_entry(&self, k: String) -> Result<Option<Types>, String> {
253        let tx = self.tx.clone();
254        let (tx_o, rx_o) = oneshot::channel();
255        let action = Action::RemoveEntry(k.clone());
256        let send = (action, tx_o);
257
258        tx.send(send)
259            .await
260            .map_err(|_| format!("receiver dropped, remove_entry key {}", k))?;
261
262        match rx_o.await {
263            Ok(types) => Ok(types),
264            Err(e) => Err(format!("remove_entry failed {} with error: {:?}", k, e)),
265        }
266    }
267}