Skip to main content

concread/bptree/
asynch.rs

1//! Async `BptreeMap` - See the documentation for the sync `BptreeMap`
2
3#[cfg(feature = "serde")]
4use serde::{
5    de::{Deserialize, Deserializer},
6    ser::{Serialize, SerializeMap, Serializer},
7};
8
9#[cfg(feature = "serde")]
10use crate::utils::MapCollector;
11
12use crate::internals::lincowcell_async::{LinCowCell, LinCowCellReadTxn, LinCowCellWriteTxn};
13
14include!("impl.rs");
15
16impl<K: Clone + Ord + Debug + Sync + Send + 'static, V: Clone + Sync + Send + 'static>
17    BptreeMap<K, V>
18{
19    /// Initiate a read transaction for the tree, concurrent to any
20    /// other readers or writers.
21    pub fn read<'x>(&'x self) -> BptreeMapReadTxn<'x, K, V> {
22        let inner = self.inner.read();
23        BptreeMapReadTxn { inner }
24    }
25
26    /// Initiate a write transaction for the tree, exclusive to this
27    /// writer, and concurrently to all existing reads.
28    pub async fn write<'x>(&'x self) -> BptreeMapWriteTxn<'x, K, V> {
29        let inner = self.inner.write().await;
30        BptreeMapWriteTxn { inner }
31    }
32}
33
34impl<K: Clone + Ord + Debug + Sync + Send + 'static, V: Clone + Sync + Send + 'static>
35    BptreeMapWriteTxn<'_, K, V>
36{
37    /// Commit the changes from this write transaction. Readers after this point
38    /// will be able to perceive these changes.
39    ///
40    /// To abort (unstage changes), just do not call this function.
41    pub fn commit(self) {
42        self.inner.commit();
43    }
44}
45
46#[cfg(feature = "serde")]
47impl<K, V> Serialize for BptreeMapReadTxn<'_, K, V>
48where
49    K: Serialize + Clone + Ord + Debug + Sync + Send + 'static,
50    V: Serialize + Clone + Sync + Send + 'static,
51{
52    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
53    where
54        S: Serializer,
55    {
56        let mut state = serializer.serialize_map(Some(self.len()))?;
57
58        for (key, val) in self.iter() {
59            state.serialize_entry(key, val)?;
60        }
61
62        state.end()
63    }
64}
65
66#[cfg(feature = "serde")]
67impl<K, V> Serialize for BptreeMap<K, V>
68where
69    K: Serialize + Clone + Ord + Debug + Sync + Send + 'static,
70    V: Serialize + Clone + Sync + Send + 'static,
71{
72    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73    where
74        S: Serializer,
75    {
76        self.read().serialize(serializer)
77    }
78}
79
80#[cfg(feature = "serde")]
81impl<'de, K, V> Deserialize<'de> for BptreeMap<K, V>
82where
83    K: Deserialize<'de> + Clone + Ord + Debug + Sync + Send + 'static,
84    V: Deserialize<'de> + Clone + Sync + Send + 'static,
85{
86    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
87    where
88        D: Deserializer<'de>,
89    {
90        deserializer.deserialize_map(MapCollector::new())
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::BptreeMap;
97    use crate::internals::bptree::node::{assert_released, L_CAPACITY};
98    // use rand::prelude::*;
99    use rand::seq::SliceRandom;
100
101    #[tokio::test]
102    async fn test_bptree2_map_basic_write() {
103        let bptree: BptreeMap<usize, usize> = BptreeMap::new();
104        {
105            let mut bpwrite = bptree.write().await;
106            // We should be able to insert.
107            bpwrite.insert(0, 0);
108            bpwrite.insert(1, 1);
109            assert!(bpwrite.get(&0) == Some(&0));
110            assert!(bpwrite.get(&1) == Some(&1));
111            bpwrite.insert(2, 2);
112            bpwrite.commit();
113            // println!("commit");
114        }
115        {
116            // Do a clear, but roll it back.
117            let mut bpwrite = bptree.write().await;
118            bpwrite.clear();
119            // DO NOT commit, this triggers the rollback.
120            // println!("post clear");
121        }
122        {
123            let bpwrite = bptree.write().await;
124            assert!(bpwrite.get(&0) == Some(&0));
125            assert!(bpwrite.get(&1) == Some(&1));
126            // println!("fin write");
127        }
128        std::mem::drop(bptree);
129        assert_released();
130    }
131
132    #[tokio::test]
133    async fn test_bptree2_map_cursed_get_mut() {
134        let bptree: BptreeMap<usize, usize> = BptreeMap::new();
135        {
136            let mut w = bptree.write().await;
137            w.insert(0, 0);
138            w.commit();
139        }
140        let r1 = bptree.read();
141        {
142            let mut w = bptree.write().await;
143            let cursed_zone = w.get_mut(&0).unwrap();
144            *cursed_zone = 1;
145            // Correctly fails to work as it's a second borrow, which isn't
146            // possible once w.remove occurs
147            // w.remove(&0);
148            // *cursed_zone = 2;
149            w.commit();
150        }
151        let r2 = bptree.read();
152        assert!(r1.get(&0) == Some(&0));
153        assert!(r2.get(&0) == Some(&1));
154
155        /*
156        // Correctly fails to compile. PHEW!
157        let fail = {
158            let mut w = bptree.write();
159            w.get_mut(&0).unwrap()
160        };
161        */
162        std::mem::drop(r1);
163        std::mem::drop(r2);
164        std::mem::drop(bptree);
165        assert_released();
166    }
167
168    #[tokio::test]
169    async fn test_bptree2_map_from_iter_1() {
170        let ins: Vec<usize> = (0..(L_CAPACITY << 4)).collect();
171
172        let map = BptreeMap::from_iter(ins.into_iter().map(|v| (v, v)));
173
174        {
175            let w = map.write().await;
176            assert!(w.verify());
177            println!("{:?}", w.tree_density());
178        }
179        // assert!(w.tree_density() == ((L_CAPACITY << 4), (L_CAPACITY << 4)));
180        std::mem::drop(map);
181        assert_released();
182    }
183
184    #[tokio::test]
185    async fn test_bptree2_map_from_iter_2() {
186        let mut rng = rand::rng();
187        let mut ins: Vec<usize> = (0..(L_CAPACITY << 4)).collect();
188        ins.shuffle(&mut rng);
189
190        let map = BptreeMap::from_iter(ins.into_iter().map(|v| (v, v)));
191
192        {
193            let w = map.write().await;
194            assert!(w.verify());
195            // w.compact_force();
196            assert!(w.verify());
197            // assert!(w.tree_density() == ((L_CAPACITY << 4), (L_CAPACITY << 4)));
198        }
199
200        std::mem::drop(map);
201        assert_released();
202    }
203
204    async fn bptree_map_basic_concurrency(lower: usize, upper: usize) {
205        // Create a map
206        let map = BptreeMap::new();
207
208        // add values
209        {
210            let mut w = map.write().await;
211            w.extend((0..lower).map(|v| (v, v)));
212            w.commit();
213        }
214
215        // read
216        let r = map.read();
217        assert!(r.len() == lower);
218        for i in 0..lower {
219            assert!(r.contains_key(&i))
220        }
221
222        // Check a second write doesn't interfere
223        {
224            let mut w = map.write().await;
225            w.extend((lower..upper).map(|v| (v, v)));
226            w.commit();
227        }
228
229        assert!(r.len() == lower);
230
231        // But a new write can see
232        let r2 = map.read();
233        assert!(r2.len() == upper);
234        for i in 0..upper {
235            assert!(r2.contains_key(&i))
236        }
237
238        // Now drain the tree, and the reader should be unaffected.
239        {
240            let mut w = map.write().await;
241            for i in 0..upper {
242                assert!(w.remove(&i).is_some())
243            }
244            w.commit();
245        }
246
247        // All consistent!
248        assert!(r.len() == lower);
249        assert!(r2.len() == upper);
250        for i in 0..upper {
251            assert!(r2.contains_key(&i))
252        }
253
254        let r3 = map.read();
255        // println!("{:?}", r3.len());
256        assert!(r3.is_empty());
257
258        std::mem::drop(r);
259        std::mem::drop(r2);
260        std::mem::drop(r3);
261
262        std::mem::drop(map);
263        assert_released();
264    }
265
266    #[tokio::test]
267    async fn test_bptree2_map_acb_order() {
268        // Need to ensure that txns are dropped in order.
269
270        // Add data, enough to cause a split. All data should be *2
271        let map = BptreeMap::new();
272        // add values
273        {
274            let mut w = map.write().await;
275            w.extend((0..(L_CAPACITY * 2)).map(|v| (v * 2, v * 2)));
276            w.commit();
277        }
278        let ro_txn_a = map.read();
279
280        // New write, add 1 val
281        {
282            let mut w = map.write().await;
283            w.insert(1, 1);
284            w.commit();
285        }
286
287        let ro_txn_b = map.read();
288        // ro_txn_b now owns nodes from a
289
290        // New write, update a value
291        {
292            let mut w = map.write().await;
293            w.insert(1, 10001);
294            w.commit();
295        }
296
297        let ro_txn_c = map.read();
298        // ro_txn_c
299        // Drop ro_txn_b
300        assert!(ro_txn_b.verify());
301        std::mem::drop(ro_txn_b);
302        // Are both still valid?
303        assert!(ro_txn_a.verify());
304        assert!(ro_txn_c.verify());
305        // Drop remaining
306        std::mem::drop(ro_txn_a);
307        std::mem::drop(ro_txn_c);
308        std::mem::drop(map);
309        assert_released();
310    }
311
312    #[tokio::test]
313    async fn test_bptree2_map_weird_txn_behaviour() {
314        let map: BptreeMap<usize, usize> = BptreeMap::new();
315
316        let mut wr = map.write().await;
317        let rd = map.read();
318
319        wr.insert(1, 1);
320        assert!(rd.get(&1).is_none());
321        wr.commit();
322        assert!(rd.get(&1).is_none());
323    }
324
325    #[tokio::test]
326    #[cfg_attr(miri, ignore)]
327    async fn test_bptree2_map_basic_concurrency_small() {
328        bptree_map_basic_concurrency(100, 200).await
329    }
330
331    #[tokio::test]
332    #[cfg_attr(miri, ignore)]
333    async fn test_bptree2_map_basic_concurrency_large() {
334        bptree_map_basic_concurrency(10_000, 20_000).await
335    }
336
337    #[cfg(feature = "serde")]
338    #[tokio::test]
339    async fn test_bptree2_serialize_deserialize() {
340        let map: BptreeMap<usize, usize> = vec![(10, 11), (15, 16), (20, 21)].into_iter().collect();
341
342        let value = serde_json::to_value(&map).unwrap();
343        assert_eq!(value, serde_json::json!({ "10": 11, "15": 16, "20": 21 }));
344
345        let map: BptreeMap<usize, usize> = serde_json::from_value(value).unwrap();
346        let mut vec: Vec<(usize, usize)> = map.read().iter().map(|(k, v)| (*k, *v)).collect();
347        vec.sort_unstable();
348        assert_eq!(vec, [(10, 11), (15, 16), (20, 21)]);
349    }
350}