1use std::sync::atomic::{AtomicU32, Ordering};
2
3use data_bucket::Link;
4use data_bucket::page::PageId;
5use derive_more::Into;
6use indexset::concurrent::multimap::BTreeMultiMap;
7use indexset::concurrent::set::BTreeSet;
8use parking_lot::FairMutex;
9
10use crate::in_memory::DATA_INNER_LENGTH;
11
12#[derive(Copy, Clone, Debug, Eq, PartialEq, Into)]
14pub struct IndexOrdLink<const DATA_LENGTH: usize = DATA_INNER_LENGTH>(pub Link);
15
16impl<const DATA_LENGTH: usize> IndexOrdLink<DATA_LENGTH> {
17 fn absolute_index(&self) -> u64 {
19 let page_id: u32 = self.0.page_id.into();
20 (page_id as u64 * DATA_LENGTH as u64) + self.0.offset as u64
21 }
22
23 fn unite_with_right_neighbor(&self, other: &Self) -> Option<Self> {
24 let self_end = self.absolute_index() + self.0.length as u64;
25 let other_start = other.absolute_index();
26
27 if self.0.page_id != other.0.page_id {
28 return None;
29 }
30
31 if self_end == other_start {
32 let new_length = self.0.length + other.0.length;
33 Some(IndexOrdLink(Link {
34 page_id: self.0.page_id,
35 offset: self.0.offset,
36 length: new_length,
37 }))
38 } else {
39 None
40 }
41 }
42
43 fn unite_with_left_neighbor(&self, other: &Self) -> Option<Self> {
44 let other_end = other.absolute_index() + other.0.length as u64;
45 let self_start = self.absolute_index();
46
47 if self.0.page_id != other.0.page_id {
48 return None;
49 }
50
51 if other_end == self_start {
52 let new_offset = other.0.offset;
53 let new_length = self.0.length + other.0.length;
54 Some(IndexOrdLink(Link {
55 page_id: other.0.page_id,
56 offset: new_offset,
57 length: new_length,
58 }))
59 } else {
60 None
61 }
62 }
63}
64
65impl<const DATA_LENGTH: usize> PartialOrd for IndexOrdLink<DATA_LENGTH> {
66 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
67 Some(self.cmp(other))
68 }
69}
70
71impl<const DATA_LENGTH: usize> Ord for IndexOrdLink<DATA_LENGTH> {
72 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
73 self.absolute_index().cmp(&other.absolute_index())
74 }
75}
76
77#[derive(Debug)]
78pub struct EmptyLinkRegistry<const DATA_LENGTH: usize = DATA_INNER_LENGTH> {
79 index_ord_links: BTreeSet<IndexOrdLink<DATA_LENGTH>>,
80 length_ord_links: BTreeMultiMap<u32, Link>,
81
82 pub(crate) page_links_map: BTreeMultiMap<PageId, Link>,
83
84 sum_links_len: AtomicU32,
85
86 pub(crate) op_lock: FairMutex<()>,
87 vacuum_lock: tokio::sync::Mutex<()>,
88}
89
90impl<const DATA_LENGTH: usize> Default for EmptyLinkRegistry<DATA_LENGTH> {
91 fn default() -> Self {
92 Self {
93 index_ord_links: BTreeSet::new(),
94 length_ord_links: BTreeMultiMap::new(),
95 page_links_map: BTreeMultiMap::new(),
96 sum_links_len: Default::default(),
97 op_lock: Default::default(),
98 vacuum_lock: Default::default(),
99 }
100 }
101}
102
103impl<const DATA_LENGTH: usize> EmptyLinkRegistry<DATA_LENGTH> {
104 pub fn remove_link<L: Into<Link>>(&self, link: L) {
105 let link = link.into();
106 self.index_ord_links.remove(&IndexOrdLink(link));
107 self.length_ord_links.remove(&link.length, &link);
108 self.page_links_map.remove(&link.page_id, &link);
109
110 self.sum_links_len.fetch_sub(link.length, Ordering::AcqRel);
111 }
112
113 fn insert_link<L: Into<Link>>(&self, link: L) {
114 let link = link.into();
115 self.index_ord_links.insert(IndexOrdLink(link));
116 self.length_ord_links.insert(link.length, link);
117 self.page_links_map.insert(link.page_id, link);
118
119 self.sum_links_len.fetch_add(link.length, Ordering::AcqRel);
120 }
121
122 pub fn remove_link_for_page(&self, page_id: PageId) {
123 let _g = self.op_lock.lock();
124 let links = self
125 .page_links_map
126 .get(&page_id)
127 .map(|(_, l)| *l)
128 .collect::<Vec<_>>();
129 for l in links {
130 self.remove_link(l);
131 }
132 }
133
134 pub fn push(&self, link: Link) {
135 let mut index_ord_link = IndexOrdLink(link);
136 let _g = self.op_lock.lock();
137
138 {
139 let mut iter = self.index_ord_links.range(..index_ord_link).rev();
140 if let Some(possible_left_neighbor) = iter.next() {
141 let possible_left_neighbor = *possible_left_neighbor;
142 if let Some(united_link) =
143 index_ord_link.unite_with_left_neighbor(&possible_left_neighbor)
144 {
145 drop(iter);
146
147 self.remove_link(possible_left_neighbor);
149
150 index_ord_link = united_link;
151 }
152 }
153 }
154
155 {
156 let mut iter = self.index_ord_links.range(index_ord_link..);
157 if let Some(possible_right_neighbor) = iter.next() {
158 let possible_right_neighbor = *possible_right_neighbor;
159 if let Some(united_link) =
160 index_ord_link.unite_with_right_neighbor(&possible_right_neighbor)
161 {
162 drop(iter);
163
164 self.remove_link(possible_right_neighbor);
166
167 index_ord_link = united_link;
168 }
169 }
170 }
171
172 self.insert_link(index_ord_link);
173 }
174
175 pub fn pop_max(&self) -> Option<Link> {
176 if self.vacuum_lock.try_lock().is_err() {
177 return None;
178 }
179
180 let _g = self.op_lock.lock();
181
182 let mut iter = self.length_ord_links.iter().rev();
183 let (_, max_length_link) = iter.next()?;
184 drop(iter);
185
186 self.remove_link(*max_length_link);
187
188 Some(*max_length_link)
189 }
190
191 pub fn iter(&self) -> impl Iterator<Item = Link> + '_ {
192 self.index_ord_links.iter().map(|l| l.0)
193 }
194
195 pub fn get_empty_links_size_bytes(&self) -> u32 {
196 self.sum_links_len.load(Ordering::Acquire)
197 }
198
199 pub async fn lock_vacuum(&self) -> tokio::sync::MutexGuard<'_, ()> {
200 self.vacuum_lock.lock().await
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_unite_with_right_neighbor() {
210 let left = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
211 page_id: 1.into(),
212 offset: 0,
213 length: 100,
214 });
215
216 let right = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
217 page_id: 1.into(),
218 offset: 100,
219 length: 50,
220 });
221
222 let united = left.unite_with_right_neighbor(&right).unwrap();
223 assert_eq!(united.0.page_id, 1.into());
224 assert_eq!(united.0.offset, 0);
225 assert_eq!(united.0.length, 150);
226 }
227
228 #[test]
229 fn test_unite_with_left_neighbor() {
230 let left = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
231 page_id: 1.into(),
232 offset: 0,
233 length: 100,
234 });
235
236 let right = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
237 page_id: 1.into(),
238 offset: 100,
239 length: 50,
240 });
241
242 let united = right.unite_with_left_neighbor(&left).unwrap();
243 assert_eq!(united.0.page_id, 1.into());
244 assert_eq!(united.0.offset, 0);
245 assert_eq!(united.0.length, 150);
246 }
247
248 #[test]
249 fn test_unite_fails_on_gap() {
250 let link1 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
251 page_id: 1.into(),
252 offset: 0,
253 length: 100,
254 });
255
256 let link2 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
257 page_id: 1.into(),
258 offset: 200,
259 length: 50,
260 });
261
262 assert!(link1.unite_with_right_neighbor(&link2).is_none());
263 assert!(link2.unite_with_left_neighbor(&link1).is_none());
264 }
265
266 #[test]
267 fn test_unite_fails_on_different_pages() {
268 let link1 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
269 page_id: 1.into(),
270 offset: 0,
271 length: 100,
272 });
273
274 let link2 = IndexOrdLink::<DATA_INNER_LENGTH>(Link {
275 page_id: 2.into(),
276 offset: 100,
277 length: 50,
278 });
279
280 assert!(link1.unite_with_right_neighbor(&link2).is_none());
281 assert!(link2.unite_with_left_neighbor(&link1).is_none());
282 }
283
284 #[test]
285 fn test_index_ord_link_ordering() {
286 const TEST_DATA_LENGTH: usize = 1000;
287
288 let link1 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
289 page_id: 1.into(),
290 offset: 0,
291 length: 100,
292 });
293
294 let link2 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
295 page_id: 1.into(),
296 offset: 100,
297 length: 50,
298 });
299
300 let link3 = IndexOrdLink::<TEST_DATA_LENGTH>(Link {
301 page_id: 2.into(),
302 offset: 0,
303 length: 200,
304 });
305
306 assert!(link1 < link2);
307 assert!(link2 < link3);
308 assert!(link1 < link3);
309 }
310
311 #[test]
312 fn test_push_merges_both_sides() {
313 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
314
315 let left = Link {
316 page_id: 1.into(),
317 offset: 0,
318 length: 100,
319 };
320
321 let middle = Link {
322 page_id: 1.into(),
323 offset: 100,
324 length: 50,
325 };
326
327 let right = Link {
328 page_id: 1.into(),
329 offset: 150,
330 length: 75,
331 };
332
333 registry.push(left);
334 registry.push(right);
335 registry.push(middle);
336
337 let result = registry.pop_max().unwrap();
338 assert_eq!(result.page_id, 1.into());
339 assert_eq!(result.offset, 0);
340 assert_eq!(result.length, 225);
341 }
342
343 #[test]
344 fn test_push_non_adjacent_no_merge() {
345 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
346
347 let link1 = Link {
348 page_id: 1.into(),
349 offset: 0,
350 length: 100,
351 };
352
353 let link2 = Link {
354 page_id: 1.into(),
355 offset: 200,
356 length: 50,
357 };
358
359 registry.push(link1);
360 registry.push(link2);
361
362 let pop1 = registry.pop_max().unwrap();
363 let pop2 = registry.pop_max().unwrap();
364
365 assert_eq!(pop1.length, 100);
366 assert_eq!(pop2.length, 50);
367 }
368
369 #[test]
370 fn test_pop_max_returns_largest() {
371 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
372
373 let small = Link {
374 page_id: 1.into(),
375 offset: 0,
376 length: 50,
377 };
378
379 let large = Link {
380 page_id: 1.into(),
381 offset: 100,
382 length: 200,
383 };
384
385 let medium = Link {
386 page_id: 1.into(),
387 offset: 300,
388 length: 100,
389 };
390
391 registry.push(small);
392 registry.push(large);
393 registry.push(medium);
394
395 assert_eq!(registry.pop_max().unwrap().length, 300); assert_eq!(registry.pop_max().unwrap().length, 50);
397 }
398
399 #[test]
400 fn test_iter_returns_all_links() {
401 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
402
403 let link1 = Link {
404 page_id: 1.into(),
405 offset: 0,
406 length: 100,
407 };
408
409 let link2 = Link {
410 page_id: 2.into(),
411 offset: 0,
412 length: 150,
413 };
414
415 let link3 = Link {
416 page_id: 3.into(),
417 offset: 0,
418 length: 200,
419 };
420
421 registry.push(link1);
422 registry.push(link2);
423 registry.push(link3);
424
425 let links: Vec<Link> = registry.iter().collect();
426 assert_eq!(links.len(), 3);
427 }
428
429 #[test]
430 fn test_empty_registry() {
431 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
432
433 assert_eq!(registry.pop_max(), None);
434 assert_eq!(registry.iter().count(), 0);
435 }
436
437 #[test]
438 fn test_sum_links_counter() {
439 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
440
441 let link1 = Link {
442 page_id: 1.into(),
443 offset: 0,
444 length: 100,
445 };
446
447 let link2 = Link {
448 page_id: 1.into(),
449 offset: 100,
450 length: 150,
451 };
452
453 registry.push(link1);
454 assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 100);
455
456 registry.push(link2);
457 assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 250);
458
459 registry.pop_max();
460 assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 0);
461 }
462
463 #[tokio::test]
464 async fn test_lock_vacuum_prevents_pop() {
465 let registry = EmptyLinkRegistry::<DATA_INNER_LENGTH>::default();
466
467 let link = Link {
468 page_id: 1.into(),
469 offset: 0,
470 length: 100,
471 };
472
473 registry.push(link);
474
475 let popped = registry.pop_max();
476 assert!(popped.is_some());
477 assert_eq!(popped.unwrap().length, 100);
478
479 registry.push(Link {
480 page_id: 1.into(),
481 offset: 0,
482 length: 100,
483 });
484
485 let _lock = registry.lock_vacuum().await;
486 let popped_locked = registry.pop_max();
487 assert!(
488 popped_locked.is_none(),
489 "pop_max should return None when vacuum lock is held"
490 );
491
492 drop(_lock);
493 let popped_after_unlock = registry.pop_max();
494 assert!(
495 popped_after_unlock.is_some(),
496 "pop_max should return link after vacuum lock is released"
497 );
498 assert_eq!(popped_after_unlock.unwrap().length, 100);
499 }
500}