Skip to main content

worktable/in_memory/
empty_link_registry.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3use data_bucket::Link;
4use data_bucket::page::PageId;
5use derive_more::Into;
6use indexset::concurrent::multimap::BTreeMultiMap;
7use indexset::concurrent::set::BTreeSet;
8use parking_lot::FairMutex;
9
10use crate::in_memory::DATA_INNER_LENGTH;
11
12/// A link wrapper that implements `Ord` based on absolute index calculation.
13#[derive(Copy, Clone, Debug, Eq, PartialEq, Into)]
14pub struct IndexOrdLink<const DATA_LENGTH: usize = DATA_INNER_LENGTH>(pub Link);
15
16impl<const DATA_LENGTH: usize> IndexOrdLink<DATA_LENGTH> {
17    /// Calculates the absolute index of the link.
18    fn absolute_index(&self) -> u64 {
19        let page_id: u32 = self.0.page_id.into();
20        (page_id as u64 * DATA_LENGTH as u64) + self.0.offset as u64
21    }
22
23    fn unite_with_right_neighbor(&self, other: &Self) -> Option<Self> {
24        let self_end = self.absolute_index() + self.0.length as u64;
25        let other_start = other.absolute_index();
26
27        if self.0.page_id != other.0.page_id {
28            return None;
29        }
30
31        if self_end == other_start {
32            let new_length = self.0.length + other.0.length;
33            Some(IndexOrdLink(Link {
34                page_id: self.0.page_id,
35                offset: self.0.offset,
36                length: new_length,
37            }))
38        } else {
39            None
40        }
41    }
42
43    fn unite_with_left_neighbor(&self, other: &Self) -> Option<Self> {
44        let other_end = other.absolute_index() + other.0.length as u64;
45        let self_start = self.absolute_index();
46
47        if self.0.page_id != other.0.page_id {
48            return None;
49        }
50
51        if other_end == self_start {
52            let new_offset = other.0.offset;
53            let new_length = self.0.length + other.0.length;
54            Some(IndexOrdLink(Link {
55                page_id: other.0.page_id,
56                offset: new_offset,
57                length: new_length,
58            }))
59        } else {
60            None
61        }
62    }
63}
64
65impl<const DATA_LENGTH: usize> PartialOrd for IndexOrdLink<DATA_LENGTH> {
66    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
67        Some(self.cmp(other))
68    }
69}
70
71impl<const DATA_LENGTH: usize> Ord for IndexOrdLink<DATA_LENGTH> {
72    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
73        self.absolute_index().cmp(&other.absolute_index())
74    }
75}
76
77#[derive(Debug)]
78pub struct EmptyLinkRegistry<const DATA_LENGTH: usize = DATA_INNER_LENGTH> {
79    index_ord_links: BTreeSet<IndexOrdLink<DATA_LENGTH>>,
80    length_ord_links: BTreeMultiMap<u32, Link>,
81
82    pub(crate) page_links_map: BTreeMultiMap<PageId, Link>,
83
84    sum_links_len: AtomicU32,
85
86    pub(crate) op_lock: FairMutex<()>,
87    vacuum_lock: tokio::sync::Mutex<()>,
88}
89
90impl<const DATA_LENGTH: usize> Default for EmptyLinkRegistry<DATA_LENGTH> {
91    fn default() -> Self {
92        Self {
93            index_ord_links: BTreeSet::new(),
94            length_ord_links: BTreeMultiMap::new(),
95            page_links_map: BTreeMultiMap::new(),
96            sum_links_len: Default::default(),
97            op_lock: Default::default(),
98            vacuum_lock: Default::default(),
99        }
100    }
101}
102
103impl<const DATA_LENGTH: usize> EmptyLinkRegistry<DATA_LENGTH> {
104    pub fn remove_link<L: Into<Link>>(&self, link: L) {
105        let link = link.into();
106        self.index_ord_links.remove(&IndexOrdLink(link));
107        self.length_ord_links.remove(&link.length, &link);
108        self.page_links_map.remove(&link.page_id, &link);
109
110        self.sum_links_len.fetch_sub(link.length, Ordering::AcqRel);
111    }
112
113    fn insert_link<L: Into<Link>>(&self, link: L) {
114        let link = link.into();
115        self.index_ord_links.insert(IndexOrdLink(link));
116        self.length_ord_links.insert(link.length, link);
117        self.page_links_map.insert(link.page_id, link);
118
119        self.sum_links_len.fetch_add(link.length, Ordering::AcqRel);
120    }
121
122    pub fn remove_link_for_page(&self, page_id: PageId) {
123        let _g = self.op_lock.lock();
124        let links = self
125            .page_links_map
126            .get(&page_id)
127            .map(|(_, l)| *l)
128            .collect::<Vec<_>>();
129        for l in links {
130            self.remove_link(l);
131        }
132    }
133
134    pub fn push(&self, link: Link) {
135        let mut index_ord_link = IndexOrdLink(link);
136        let _g = self.op_lock.lock();
137
138        {
139            let mut iter = self.index_ord_links.range(..index_ord_link).rev();
140            if let Some(possible_left_neighbor) = iter.next() {
141                let possible_left_neighbor = *possible_left_neighbor;
142                if let Some(united_link) =
143                    index_ord_link.unite_with_left_neighbor(&possible_left_neighbor)
144                {
145                    drop(iter);
146
147                    // Remove left neighbor
148                    self.remove_link(possible_left_neighbor);
149
150                    index_ord_link = united_link;
151                }
152            }
153        }
154
155        {
156            let mut iter = self.index_ord_links.range(index_ord_link..);
157            if let Some(possible_right_neighbor) = iter.next() {
158                let possible_right_neighbor = *possible_right_neighbor;
159                if let Some(united_link) =
160                    index_ord_link.unite_with_right_neighbor(&possible_right_neighbor)
161                {
162                    drop(iter);
163
164                    // Remove right neighbor
165                    self.remove_link(possible_right_neighbor);
166
167                    index_ord_link = united_link;
168                }
169            }
170        }
171
172        self.insert_link(index_ord_link);
173    }
174
175    pub fn pop_max(&self) -> Option<Link> {
176        if self.vacuum_lock.try_lock().is_err() {
177            return None;
178        }
179
180        let _g = self.op_lock.lock();
181
182        let mut iter = self.length_ord_links.iter().rev();
183        let (_, max_length_link) = iter.next()?;
184        drop(iter);
185
186        self.remove_link(*max_length_link);
187
188        Some(*max_length_link)
189    }
190
191    pub fn iter(&self) -> impl Iterator<Item = Link> + '_ {
192        self.index_ord_links.iter().map(|l| l.0)
193    }
194
195    pub fn get_empty_links_size_bytes(&self) -> u32 {
196        self.sum_links_len.load(Ordering::Acquire)
197    }
198
199    pub async fn lock_vacuum(&self) -> tokio::sync::MutexGuard<'_, ()> {
200        self.vacuum_lock.lock().await
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn test_unite_with_right_neighbor() {
210        let left = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
211            page_id: 1.into(),
212            offset: 0,
213            length: 100,
214        });
215
216        let right = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
217            page_id: 1.into(),
218            offset: 100,
219            length: 50,
220        });
221
222        let united = left.unite_with_right_neighbor(&right).unwrap();
223        assert_eq!(united.0.page_id, 1.into());
224        assert_eq!(united.0.offset, 0);
225        assert_eq!(united.0.length, 150);
226    }
227
228    #[test]
229    fn test_unite_with_left_neighbor() {
230        let left = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
231            page_id: 1.into(),
232            offset: 0,
233            length: 100,
234        });
235
236        let right = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
237            page_id: 1.into(),
238            offset: 100,
239            length: 50,
240        });
241
242        let united = right.unite_with_left_neighbor(&left).unwrap();
243        assert_eq!(united.0.page_id, 1.into());
244        assert_eq!(united.0.offset, 0);
245        assert_eq!(united.0.length, 150);
246    }
247
248    #[test]
249    fn test_unite_fails_on_gap() {
250        let link1 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
251            page_id: 1.into(),
252            offset: 0,
253            length: 100,
254        });
255
256        let link2 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
257            page_id: 1.into(),
258            offset: 200,
259            length: 50,
260        });
261
262        assert!(link1.unite_with_right_neighbor(&link2).is_none());
263        assert!(link2.unite_with_left_neighbor(&link1).is_none());
264    }
265
266    #[test]
267    fn test_unite_fails_on_different_pages() {
268        let link1 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
269            page_id: 1.into(),
270            offset: 0,
271            length: 100,
272        });
273
274        let link2 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
275            page_id: 2.into(),
276            offset: 100,
277            length: 50,
278        });
279
280        assert!(link1.unite_with_right_neighbor(&link2).is_none());
281        assert!(link2.unite_with_left_neighbor(&link1).is_none());
282    }
283
284    #[test]
285    fn test_index_ord_link_ordering() {
286        const TEST_DATA_LENGTH: usize = 1000;
287
288        let link1 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
289            page_id: 1.into(),
290            offset: 0,
291            length: 100,
292        });
293
294        let link2 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
295            page_id: 1.into(),
296            offset: 100,
297            length: 50,
298        });
299
300        let link3 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
301            page_id: 2.into(),
302            offset: 0,
303            length: 200,
304        });
305
306        assert!(link1 < link2);
307        assert!(link2 < link3);
308        assert!(link1 < link3);
309    }
310
311    #[test]
312    fn test_push_merges_both_sides() {
313        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
314
315        let left = Link {
316            page_id: 1.into(),
317            offset: 0,
318            length: 100,
319        };
320
321        let middle = Link {
322            page_id: 1.into(),
323            offset: 100,
324            length: 50,
325        };
326
327        let right = Link {
328            page_id: 1.into(),
329            offset: 150,
330            length: 75,
331        };
332
333        registry.push(left);
334        registry.push(right);
335        registry.push(middle);
336
337        let result = registry.pop_max().unwrap();
338        assert_eq!(result.page_id, 1.into());
339        assert_eq!(result.offset, 0);
340        assert_eq!(result.length, 225);
341    }
342
343    #[test]
344    fn test_push_non_adjacent_no_merge() {
345        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
346
347        let link1 = Link {
348            page_id: 1.into(),
349            offset: 0,
350            length: 100,
351        };
352
353        let link2 = Link {
354            page_id: 1.into(),
355            offset: 200,
356            length: 50,
357        };
358
359        registry.push(link1);
360        registry.push(link2);
361
362        let pop1 = registry.pop_max().unwrap();
363        let pop2 = registry.pop_max().unwrap();
364
365        assert_eq!(pop1.length, 100);
366        assert_eq!(pop2.length, 50);
367    }
368
369    #[test]
370    fn test_pop_max_returns_largest() {
371        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
372
373        let small = Link {
374            page_id: 1.into(),
375            offset: 0,
376            length: 50,
377        };
378
379        let large = Link {
380            page_id: 1.into(),
381            offset: 100,
382            length: 200,
383        };
384
385        let medium = Link {
386            page_id: 1.into(),
387            offset: 300,
388            length: 100,
389        };
390
391        registry.push(small);
392        registry.push(large);
393        registry.push(medium);
394
395        assert_eq!(registry.pop_max().unwrap().length, 300); // two links were united
396        assert_eq!(registry.pop_max().unwrap().length, 50);
397    }
398
399    #[test]
400    fn test_iter_returns_all_links() {
401        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
402
403        let link1 = Link {
404            page_id: 1.into(),
405            offset: 0,
406            length: 100,
407        };
408
409        let link2 = Link {
410            page_id: 2.into(),
411            offset: 0,
412            length: 150,
413        };
414
415        let link3 = Link {
416            page_id: 3.into(),
417            offset: 0,
418            length: 200,
419        };
420
421        registry.push(link1);
422        registry.push(link2);
423        registry.push(link3);
424
425        let links: Vec<Link> = registry.iter().collect();
426        assert_eq!(links.len(), 3);
427    }
428
429    #[test]
430    fn test_empty_registry() {
431        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
432
433        assert_eq!(registry.pop_max(), None);
434        assert_eq!(registry.iter().count(), 0);
435    }
436
437    #[test]
438    fn test_sum_links_counter() {
439        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
440
441        let link1 = Link {
442            page_id: 1.into(),
443            offset: 0,
444            length: 100,
445        };
446
447        let link2 = Link {
448            page_id: 1.into(),
449            offset: 100,
450            length: 150,
451        };
452
453        registry.push(link1);
454        assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 100);
455
456        registry.push(link2);
457        assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 250);
458
459        registry.pop_max();
460        assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 0);
461    }
462
463    #[tokio::test]
464    async fn test_lock_vacuum_prevents_pop() {
465        let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
466
467        let link = Link {
468            page_id: 1.into(),
469            offset: 0,
470            length: 100,
471        };
472
473        registry.push(link);
474
475        let popped = registry.pop_max();
476        assert!(popped.is_some());
477        assert_eq!(popped.unwrap().length, 100);
478
479        registry.push(Link {
480            page_id: 1.into(),
481            offset: 0,
482            length: 100,
483        });
484
485        let _lock = registry.lock_vacuum().await;
486        let popped_locked = registry.pop_max();
487        assert!(
488            popped_locked.is_none(),
489            "pop_max should return None when vacuum lock is held"
490        );
491
492        drop(_lock);
493        let popped_after_unlock = registry.pop_max();
494        assert!(
495            popped_after_unlock.is_some(),
496            "pop_max should return link after vacuum lock is released"
497        );
498        assert_eq!(popped_after_unlock.unwrap().length, 100);
499    }
500}