concread/bptree/
asynch.rs1#[cfg(feature = "serde")]
4use serde::{
5 de::{Deserialize, Deserializer},
6 ser::{Serialize, SerializeMap, Serializer},
7};
8
9#[cfg(feature = "serde")]
10use crate::utils::MapCollector;
11
12use crate::internals::lincowcell_async::{LinCowCell, LinCowCellReadTxn, LinCowCellWriteTxn};
13
14include!("impl.rs");
15
16impl<K: Clone + Ord + Debug + Sync + Send + 'static, V: Clone + Sync + Send + 'static>
17 BptreeMap<K, V>
18{
19 pub fn read<'x>(&'x self) -> BptreeMapReadTxn<'x, K, V> {
22 let inner = self.inner.read();
23 BptreeMapReadTxn { inner }
24 }
25
26 pub async fn write<'x>(&'x self) -> BptreeMapWriteTxn<'x, K, V> {
29 let inner = self.inner.write().await;
30 BptreeMapWriteTxn { inner }
31 }
32}
33
34impl<K: Clone + Ord + Debug + Sync + Send + 'static, V: Clone + Sync + Send + 'static>
35 BptreeMapWriteTxn<'_, K, V>
36{
37 pub fn commit(self) {
42 self.inner.commit();
43 }
44}
45
46#[cfg(feature = "serde")]
47impl<K, V> Serialize for BptreeMapReadTxn<'_, K, V>
48where
49 K: Serialize + Clone + Ord + Debug + Sync + Send + 'static,
50 V: Serialize + Clone + Sync + Send + 'static,
51{
52 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
53 where
54 S: Serializer,
55 {
56 let mut state = serializer.serialize_map(Some(self.len()))?;
57
58 for (key, val) in self.iter() {
59 state.serialize_entry(key, val)?;
60 }
61
62 state.end()
63 }
64}
65
66#[cfg(feature = "serde")]
67impl<K, V> Serialize for BptreeMap<K, V>
68where
69 K: Serialize + Clone + Ord + Debug + Sync + Send + 'static,
70 V: Serialize + Clone + Sync + Send + 'static,
71{
72 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73 where
74 S: Serializer,
75 {
76 self.read().serialize(serializer)
77 }
78}
79
80#[cfg(feature = "serde")]
81impl<'de, K, V> Deserialize<'de> for BptreeMap<K, V>
82where
83 K: Deserialize<'de> + Clone + Ord + Debug + Sync + Send + 'static,
84 V: Deserialize<'de> + Clone + Sync + Send + 'static,
85{
86 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
87 where
88 D: Deserializer<'de>,
89 {
90 deserializer.deserialize_map(MapCollector::new())
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::BptreeMap;
97 use crate::internals::bptree::node::{assert_released, L_CAPACITY};
98 use rand::seq::SliceRandom;
100
101 #[tokio::test]
102 async fn test_bptree2_map_basic_write() {
103 let bptree: BptreeMap<usize, usize> = BptreeMap::new();
104 {
105 let mut bpwrite = bptree.write().await;
106 bpwrite.insert(0, 0);
108 bpwrite.insert(1, 1);
109 assert!(bpwrite.get(&0) == Some(&0));
110 assert!(bpwrite.get(&1) == Some(&1));
111 bpwrite.insert(2, 2);
112 bpwrite.commit();
113 }
115 {
116 let mut bpwrite = bptree.write().await;
118 bpwrite.clear();
119 }
122 {
123 let bpwrite = bptree.write().await;
124 assert!(bpwrite.get(&0) == Some(&0));
125 assert!(bpwrite.get(&1) == Some(&1));
126 }
128 std::mem::drop(bptree);
129 assert_released();
130 }
131
132 #[tokio::test]
133 async fn test_bptree2_map_cursed_get_mut() {
134 let bptree: BptreeMap<usize, usize> = BptreeMap::new();
135 {
136 let mut w = bptree.write().await;
137 w.insert(0, 0);
138 w.commit();
139 }
140 let r1 = bptree.read();
141 {
142 let mut w = bptree.write().await;
143 let cursed_zone = w.get_mut(&0).unwrap();
144 *cursed_zone = 1;
145 w.commit();
150 }
151 let r2 = bptree.read();
152 assert!(r1.get(&0) == Some(&0));
153 assert!(r2.get(&0) == Some(&1));
154
155 std::mem::drop(r1);
163 std::mem::drop(r2);
164 std::mem::drop(bptree);
165 assert_released();
166 }
167
168 #[tokio::test]
169 async fn test_bptree2_map_from_iter_1() {
170 let ins: Vec<usize> = (0..(L_CAPACITY << 4)).collect();
171
172 let map = BptreeMap::from_iter(ins.into_iter().map(|v| (v, v)));
173
174 {
175 let w = map.write().await;
176 assert!(w.verify());
177 println!("{:?}", w.tree_density());
178 }
179 std::mem::drop(map);
181 assert_released();
182 }
183
184 #[tokio::test]
185 async fn test_bptree2_map_from_iter_2() {
186 let mut rng = rand::rng();
187 let mut ins: Vec<usize> = (0..(L_CAPACITY << 4)).collect();
188 ins.shuffle(&mut rng);
189
190 let map = BptreeMap::from_iter(ins.into_iter().map(|v| (v, v)));
191
192 {
193 let w = map.write().await;
194 assert!(w.verify());
195 assert!(w.verify());
197 }
199
200 std::mem::drop(map);
201 assert_released();
202 }
203
204 async fn bptree_map_basic_concurrency(lower: usize, upper: usize) {
205 let map = BptreeMap::new();
207
208 {
210 let mut w = map.write().await;
211 w.extend((0..lower).map(|v| (v, v)));
212 w.commit();
213 }
214
215 let r = map.read();
217 assert!(r.len() == lower);
218 for i in 0..lower {
219 assert!(r.contains_key(&i))
220 }
221
222 {
224 let mut w = map.write().await;
225 w.extend((lower..upper).map(|v| (v, v)));
226 w.commit();
227 }
228
229 assert!(r.len() == lower);
230
231 let r2 = map.read();
233 assert!(r2.len() == upper);
234 for i in 0..upper {
235 assert!(r2.contains_key(&i))
236 }
237
238 {
240 let mut w = map.write().await;
241 for i in 0..upper {
242 assert!(w.remove(&i).is_some())
243 }
244 w.commit();
245 }
246
247 assert!(r.len() == lower);
249 assert!(r2.len() == upper);
250 for i in 0..upper {
251 assert!(r2.contains_key(&i))
252 }
253
254 let r3 = map.read();
255 assert!(r3.is_empty());
257
258 std::mem::drop(r);
259 std::mem::drop(r2);
260 std::mem::drop(r3);
261
262 std::mem::drop(map);
263 assert_released();
264 }
265
266 #[tokio::test]
267 async fn test_bptree2_map_acb_order() {
268 let map = BptreeMap::new();
272 {
274 let mut w = map.write().await;
275 w.extend((0..(L_CAPACITY * 2)).map(|v| (v * 2, v * 2)));
276 w.commit();
277 }
278 let ro_txn_a = map.read();
279
280 {
282 let mut w = map.write().await;
283 w.insert(1, 1);
284 w.commit();
285 }
286
287 let ro_txn_b = map.read();
288 {
292 let mut w = map.write().await;
293 w.insert(1, 10001);
294 w.commit();
295 }
296
297 let ro_txn_c = map.read();
298 assert!(ro_txn_b.verify());
301 std::mem::drop(ro_txn_b);
302 assert!(ro_txn_a.verify());
304 assert!(ro_txn_c.verify());
305 std::mem::drop(ro_txn_a);
307 std::mem::drop(ro_txn_c);
308 std::mem::drop(map);
309 assert_released();
310 }
311
312 #[tokio::test]
313 async fn test_bptree2_map_weird_txn_behaviour() {
314 let map: BptreeMap<usize, usize> = BptreeMap::new();
315
316 let mut wr = map.write().await;
317 let rd = map.read();
318
319 wr.insert(1, 1);
320 assert!(rd.get(&1).is_none());
321 wr.commit();
322 assert!(rd.get(&1).is_none());
323 }
324
325 #[tokio::test]
326 #[cfg_attr(miri, ignore)]
327 async fn test_bptree2_map_basic_concurrency_small() {
328 bptree_map_basic_concurrency(100, 200).await
329 }
330
331 #[tokio::test]
332 #[cfg_attr(miri, ignore)]
333 async fn test_bptree2_map_basic_concurrency_large() {
334 bptree_map_basic_concurrency(10_000, 20_000).await
335 }
336
337 #[cfg(feature = "serde")]
338 #[tokio::test]
339 async fn test_bptree2_serialize_deserialize() {
340 let map: BptreeMap<usize, usize> = vec![(10, 11), (15, 16), (20, 21)].into_iter().collect();
341
342 let value = serde_json::to_value(&map).unwrap();
343 assert_eq!(value, serde_json::json!({ "10": 11, "15": 16, "20": 21 }));
344
345 let map: BptreeMap<usize, usize> = serde_json::from_value(value).unwrap();
346 let mut vec: Vec<(usize, usize)> = map.read().iter().map(|(k, v)| (*k, *v)).collect();
347 vec.sort_unstable();
348 assert_eq!(vec, [(10, 11), (15, 16), (20, 21)]);
349 }
350}