async_priority_lock/queue/
arena.rs1use super::*;
6use crate::priority::Priority;
7#[cfg(not(feature = "std"))]
8use alloc::alloc::{alloc, dealloc, realloc};
9use core::{alloc::Layout, cmp::Ordering, marker::PhantomData, mem::MaybeUninit};
10#[cfg(feature = "std")]
11use std::alloc::{alloc, dealloc, realloc};
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[doc(hidden)]
21pub trait ArenaQueueNode: Sized {
22 type Data: Priority;
23 fn next(&self) -> usize;
24 fn set_next(&mut self, val: usize);
25 fn prev(&self) -> usize;
26 fn set_prev(&mut self, val: usize);
27 fn data(&self) -> &Self::Data;
28 fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data>;
29
30 const HAS_PREV: bool;
31}
32
33pub type SingleLinkArenaQueue<P> = ArenaQueue<SingleLinkArenaQueueNode<P>>;
37pub type DualLinkArenaQueue<P> = ArenaQueue<DualLinkArenaQueueNode<P>>;
41
42pub struct ArenaQueue<N: ArenaQueueNode> {
55 data: *mut N,
57 size: usize,
59 used: usize,
61 good: usize,
64 head: usize,
66}
67
68#[cfg(feature = "const-default")]
69impl<N: ArenaQueueNode> const_default::ConstDefault for ArenaQueue<N> {
70 const DEFAULT: Self = Self {
71 data: std::ptr::null_mut(),
72 size: 0,
73 used: 0,
74 good: 0,
75 head: 0,
76 };
77}
78
79impl<D: ArenaQueueNode> core::fmt::Debug for ArenaQueue<D> {
80 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
81 f.debug_struct("SeqQueue")
82 .field("data", &self.data)
83 .field("size", &self.size)
84 .field("used", &self.used)
85 .field("good", &self.good)
86 .field("head", &self.head)
87 .finish()
88 }
89}
90
91unsafe impl<D: Send + ArenaQueueNode> Send for ArenaQueue<D> {}
92unsafe impl<D: Sync + ArenaQueueNode> Sync for ArenaQueue<D> {}
93
94impl<N: ArenaQueueNode> Default for ArenaQueue<N> {
95 fn default() -> Self {
96 Self {
97 data: Default::default(),
98 size: Default::default(),
99 used: Default::default(),
100 good: Default::default(),
101 head: 1,
102 }
103 }
104}
105
106const SENTINEL: usize = 1;
108const UNALLOCATED: usize = usize::MAX;
110
111#[repr(align(2))]
117
118pub struct SingleLinkArenaQueueNode<D: Priority> {
123 next: usize,
124 data: MaybeUninit<D>,
125}
126
127impl<D: Priority> ArenaQueueNode for SingleLinkArenaQueueNode<D> {
128 type Data = D;
129
130 #[inline(always)]
131 fn next(&self) -> usize {
132 self.next
133 }
134
135 #[inline(always)]
136 fn set_next(&mut self, val: usize) {
137 self.next = val
138 }
139
140 #[inline(always)]
141 fn prev(&self) -> usize {
142 unreachable!("this fn should never be compiled")
143 }
144
145 #[inline(always)]
146 fn set_prev(&mut self, _: usize) {}
147 #[inline(always)]
149 fn data(&self) -> &Self::Data {
150 unsafe { self.data.assume_init_ref() }
151 }
152
153 #[inline(always)]
154 fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data> {
155 &mut self.data
156 }
157
158 const HAS_PREV: bool = false;
159}
160
161pub struct DualLinkArenaQueueNode<P: Priority> {
166 next: usize,
167 prev: usize,
168 data: MaybeUninit<P>,
169}
170
171impl<P: Priority> ArenaQueueNode for DualLinkArenaQueueNode<P> {
172 type Data = P;
173
174 #[inline(always)]
175 fn next(&self) -> usize {
176 self.next
177 }
178
179 #[inline(always)]
180 fn set_next(&mut self, val: usize) {
181 self.next = val;
182 }
183
184 #[inline(always)]
185 fn prev(&self) -> usize {
186 self.prev
187 }
188
189 #[inline(always)]
190 fn set_prev(&mut self, val: usize) {
191 self.prev = val
192 }
193
194 #[inline(always)]
195 fn data(&self) -> &Self::Data {
196 unsafe { self.data.assume_init_ref() }
197 }
198
199 #[inline(always)]
200 fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data> {
201 &mut self.data
202 }
203
204 const HAS_PREV: bool = true;
205}
206
207pub struct ArenaQueueHandle<N: ArenaQueueNode> {
209 offset: usize,
210 data: PhantomData<N>,
211}
212
213impl<N: ArenaQueueNode> ArenaQueueHandle<N> {
214 const fn new(offset: usize) -> Self {
215 Self {
216 offset,
217 data: PhantomData,
218 }
219 }
220}
221
222#[repr(transparent)]
223pub struct SharedArenaQueueHandle<N: ArenaQueueNode>(ArenaQueueHandle<N>);
225
226impl<N: ArenaQueueNode> SharedArenaQueueHandle<N> {
227 #[inline(always)]
228 const fn maybe_new(offset: usize) -> Option<Self> {
229 if offset != SENTINEL {
230 Some(Self(ArenaQueueHandle::new(offset)))
231 } else {
232 None
233 }
234 }
235}
236
237impl<N: ArenaQueueNode> AsRef<ArenaQueueHandle<N>> for SharedArenaQueueHandle<N> {
238 #[inline(always)]
239 fn as_ref(&self) -> &ArenaQueueHandle<N> {
240 &self.0
241 }
242}
243
244impl<N: ArenaQueueNode> From<ArenaQueueHandle<N>> for SharedArenaQueueHandle<N> {
245 #[inline(always)]
246 fn from(value: ArenaQueueHandle<N>) -> Self {
247 Self(value)
248 }
249}
250
251impl<N: ArenaQueueNode> PriorityQueueHandle<N::Data> for ArenaQueueHandle<N> {
252 const LOAD_PURE: Option<unsafe fn(&Self) -> &N::Data> = None;
253}
254
255impl<N: ArenaQueueNode> ArenaQueue<N> {
256 #[inline]
257 fn nodes<'a>(&self) -> &'a mut [N] {
258 unsafe { core::slice::from_raw_parts_mut(self.data, self.size) }
259 }
260
261 #[inline]
262 fn node<'a>(&self, offset: usize) -> &'a mut N {
263 unsafe { &mut *self.data.byte_add(offset) }
264 }
265
266 #[inline]
267 fn offset_of(&self, node: *const N) -> usize {
268 node as usize - self.data as usize
269 }
270
271 #[inline]
272 fn iter_at(&self, start: usize) -> ArenaQueueIterator<'_, N> {
273 ArenaQueueIterator {
274 queue: self,
275 current: if start != SENTINEL {
276 Some(self.node(start))
277 } else {
278 None
279 },
280 }
281 }
282
283 #[inline]
284 fn expand(&mut self) {
285 let old_size = self.size;
286 self.data = if self.size == 0 {
287 self.size = 4;
288
289 unsafe {
290 alloc(Layout::array::<N>(self.size).unwrap())
292 }
293 } else {
294 let old_layout = Layout::array::<N>(self.size).unwrap();
295 self.size <<= 1;
296
297 unsafe { realloc(self.data.cast(), old_layout, old_layout.size() << 1) }
298 }
299 .cast::<N>();
300
301 for item in &mut self.nodes()[old_size..] {
302 item.set_next(UNALLOCATED);
303 }
304
305 self.good = old_size;
306 }
307
308 #[inline]
314 fn emplace<'a>(&mut self, data: N::Data) -> (&'a mut N, Option<&'a mut N>, bool) {
315 let nodes = self.nodes();
316
317 let mut lowest_prio_prev: Option<&mut N> = None;
318 let mut found_prev: bool = false;
319 let idx_mask = self.size - 1;
321 let mut idx_to_try = self.good;
322
323 loop {
324 let node = &mut self.nodes()[idx_to_try];
325 let node_data = node.data();
326
327 if node.next() == UNALLOCATED {
328 break;
329 }
330
331 idx_to_try = (idx_to_try + 1) & idx_mask;
333
334 if found_prev {
335 continue;
336 }
337
338 match data.compare_new(node_data) {
339 Ordering::Greater => {}
340 Ordering::Equal => {
341 lowest_prio_prev = Some(node);
342 found_prev = true;
343 }
344 Ordering::Less => {
345 if lowest_prio_prev
346 .as_ref()
347 .is_none_or(|b| node_data.compare(b.data()).is_gt())
348 {
349 lowest_prio_prev = Some(node);
350 }
351 }
352 }
353 }
354
355 let new_node = &mut nodes[idx_to_try];
356 self.good = (idx_to_try + 1) & idx_mask;
357 self.used += 1;
358 new_node.data_mut().write(data);
359
360 (new_node, lowest_prio_prev, found_prev)
361 }
362
363 #[inline]
364 fn prev_for<'a>(&self, node: &N, offset: usize) -> &'a mut N {
366 if N::HAS_PREV {
367 return self.node(node.prev());
368 }
369
370 for x in self.nodes() {
372 if x.next() == offset {
373 return x;
374 }
375 }
376
377 unreachable!()
378 }
379
380 #[inline]
381 fn reposition(
383 &mut self,
384 node: &mut N,
385 node_offset: usize,
386 prev: Option<&mut N>,
387 next: Option<&mut N>,
388 ) {
389 if let Some(pr) = prev {
390 pr.set_next(node.next());
391 if N::HAS_PREV {
392 if let Some(nxt) = next {
393 nxt.set_prev(node.prev())
394 }
395 }
396
397 return self.insert(node, node_offset, None);
398 }
399
400 self.head = node.next();
401 if let Some(nxt) = next {
402 nxt.set_prev(SENTINEL);
403 }
404
405 self.insert(node, node_offset, Some(self.node(self.head)));
407 }
408
409 #[inline]
410 fn insert(&mut self, new_node: &mut N, new_offset: usize, after: Option<&mut N>) {
411 let new_data = new_node.data();
412 let mut prev = match after {
414 Some(x) => x,
417 None => {
418 if self.head == SENTINEL {
420 self.head = new_offset;
423 new_node.set_next(SENTINEL);
424
425 new_node.set_prev(SENTINEL);
426
427 return;
428 }
429
430 let head = self.node(self.head);
431
432 if new_data.compare_new(head.data()).is_ge() {
435 new_node.set_next(self.head);
436 self.head = new_offset;
437 new_node.set_prev(SENTINEL);
438 head.set_prev(new_offset);
439
440 return;
441 }
442
443 head
444 }
445 };
446
447 while prev.next() != SENTINEL {
449 let next = self.node(prev.next());
450
451 if new_data.compare_new(next.data()).is_ge() {
452 next.set_prev(new_offset);
454 break;
455 }
456
457 prev = next;
458 }
459
460 new_node.set_next(prev.next());
461 new_node.set_prev(self.offset_of(prev));
462 prev.set_next(new_offset);
463 }
464}
465
466struct ArenaQueueIterator<'a, N: ArenaQueueNode> {
467 queue: &'a ArenaQueue<N>,
468 current: Option<&'a N>,
469}
470
471impl<'a, N: ArenaQueueNode> Iterator for ArenaQueueIterator<'a, N> {
472 type Item = &'a N::Data;
473
474 #[inline]
475 fn next(&mut self) -> Option<Self::Item> {
476 let current = self.current.take()?;
477
478 if current.next() != 1 {
479 self.current = Some(self.queue.node(current.next()))
480 }
481
482 Some(current.data())
483 }
484}
485
486impl<N: ArenaQueueNode> Drop for ArenaQueue<N> {
487 fn drop(&mut self) {
488 if self.used != 0 {
489 #[cfg(debug_assertions)]
490 panic!("dropped a queue with remaining nodes!!!! {:?}", self);
493
494 #[cfg(not(debug_assertions))]
495 for node in self.nodes() {
496 if node.next() != UNALLOCATED {
497 unsafe {
498 node.data_mut().assume_init_drop();
499 }
500 }
501 }
502 }
503
504 if self.size != 0 {
505 unsafe { dealloc(self.data.cast(), Layout::array::<N>(self.size).unwrap()) };
506 };
507 }
508}
509
510unsafe impl<N: ArenaQueueNode> PriorityQueue<N::Data> for ArenaQueue<N> {
511 type Handle = ArenaQueueHandle<N>;
512 type SharedHandle = SharedArenaQueueHandle<N>;
513
514 #[inline]
515 fn enqueue(&mut self, data: N::Data) -> Self::Handle {
516 if self.size == self.used {
517 self.expand();
518 }
519
520 let (new_node, biggest_prev, found_prev) = self.emplace(data);
521 let new_offset = self.offset_of(new_node);
522 let handle = ArenaQueueHandle {
523 offset: new_offset,
524 data: PhantomData,
525 };
526
527 if found_prev {
529 let prev = biggest_prev.unwrap();
530 let ex_next = prev.next();
531
532 new_node.set_next(ex_next);
533 prev.set_next(new_offset);
534 if ex_next != SENTINEL {
537 self.node(ex_next).set_prev(new_offset);
538 }
539 new_node.set_prev(self.offset_of(prev));
540
541 return handle;
542 }
543
544 self.insert(new_node, new_offset, biggest_prev);
545
546 handle
547 }
548
549 #[inline]
550 fn dequeue(&mut self, handle: Self::Handle) -> (Option<&N::Data>, Option<Self::SharedHandle>) {
551 let old_node = self.node(handle.offset);
552 self.good = handle.offset / core::mem::size_of::<N>();
553
554 unsafe { old_node.data_mut().assume_init_drop() };
555 let old_next = old_node.next();
556
557 old_node.set_next(UNALLOCATED);
558 self.used -= 1;
559
560 if N::HAS_PREV && old_next != SENTINEL {
561 self.node(old_next).set_prev(old_node.prev());
562 }
563
564 if handle.offset == self.head {
567 self.head = old_next;
568
569 return (None, SharedArenaQueueHandle::maybe_new(old_next));
570 };
571
572 let prev = self.prev_for(&old_node, handle.offset);
574 prev.set_next(old_next);
575
576 return (
577 Some(prev.data()),
578 SharedArenaQueueHandle::maybe_new(old_next),
579 );
580 }
581
582 #[inline]
583 fn get_by_handle(&self, handle: &Self::Handle) -> &N::Data {
584 unsafe { &*self.data.byte_add(handle.offset) }.data()
585 }
586
587 #[inline(always)]
588 fn head_handle(&self) -> Option<Self::SharedHandle> {
589 (self.head != SENTINEL).then(|| ArenaQueueHandle::new(self.head).into())
590 }
591
592 #[inline]
593 fn iter_at<'a>(&'a self, after: Option<&Self::Handle>) -> impl Iterator<Item = &'a N::Data>
594 where
595 N::Data: 'a,
596 {
597 self.iter_at(after.map(|x| x.offset).unwrap_or_else(|| self.head))
598 }
599
600 fn update_node(&mut self, handle: &Self::Handle, update: impl FnOnce(&mut N::Data) -> bool) {
601 let node = self.node(handle.offset);
602
603 if !update(unsafe { node.data_mut().assume_init_mut() }) {
604 return;
605 }
606 let node_off = self.offset_of(node);
607
608 let data = node.data();
609 let next_off = node.next();
610
611 let mut maybe_prev =
612 (handle.offset != self.head).then(|| self.prev_for(node, handle.offset));
613
614 if let Some(prev) = maybe_prev.as_mut() {
617 if data.compare(prev.data()).is_gt() {
618 self.reposition(
619 node,
620 node_off,
621 Some(prev),
622 (next_off != SENTINEL).then(|| self.node(next_off)),
623 );
624
625 return;
626 }
627 }
628
629 if next_off != SENTINEL {
631 let next = self.node(next_off);
632 if data.compare(next.data()).is_lt() {
633 self.reposition(node, node_off, maybe_prev, Some(next));
634 }
635 }
636 }
637
638 #[inline]
639 fn get_next_handle(&self, handle: &Self::Handle) -> Option<Self::SharedHandle> {
640 let next = self.node(handle.offset).next();
641
642 (next != SENTINEL).then(|| {
643 ArenaQueueHandle {
644 offset: next,
645 data: PhantomData,
646 }
647 .into()
648 })
649 }
650}