uefi_async/no_alloc/
lifo.rs

1//! This code is inspired by the approach in this algorithm Rust crate: st3 lifo queue.
2
3use crate::no_alloc::task::TaskHeader;
4use core::iter::FusedIterator;
5use core::panic::{RefUnwindSafe, UnwindSafe};
6use core::ptr::null_mut;
7use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
8use core::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum StealError { Empty, Busy }
12#[inline] fn pack(v1: u32, v2: u32) -> u64 { ((v1 as u64) << u32::BITS) | v2 as u64 }
13#[inline] fn unpack(v: u64) -> (u32, u32) { ((v >> u32::BITS) as u32, v as u32) }
14
15#[derive(Debug)] #[repr(C, align(128))]
16pub struct Ptr<const N: usize> ([AtomicPtr<TaskHeader>; N]);
17#[derive(Debug)] #[repr(C, align(128))]
18pub struct StealerData { pub pop_count_and_head: AtomicU64, pub head: AtomicU32 }
19#[derive(Debug)] #[repr(C, align(128))]
20pub struct Queue<const N: usize> {
21    pub push_count: AtomicU32, pub stealer_data: StealerData, pub buffer: Ptr<N>,
22}
23#[derive(Debug)] #[repr(transparent)]
24pub struct Stealer<const N: usize> (pub &'static Queue<N>);
25#[derive(Debug)] #[repr(transparent)]
26pub struct Worker<const N: usize> (pub &'static Queue<N>);
27#[derive(Debug)]
28pub struct Drain<'a, const N: usize> { queue: &'a Queue<N>, current: u32, end: u32 }
29
30impl<const N: usize> Ptr<N> {
31    pub const fn new() -> Self {
32        const EMPTY_PTR: AtomicPtr<TaskHeader> = AtomicPtr::new(null_mut());
33        Self([EMPTY_PTR; N])
34    }
35}
36impl<const N: usize> Queue<N> {
37    const _CHECK_N: () = assert!(N.is_power_of_two(), "N must be a power of two");
38    const MASK: u32 = (N - 1) as u32;
39    pub const fn new() -> Self {
40        Self {
41            push_count: AtomicU32::new(0),
42            stealer_data: StealerData {
43                pop_count_and_head: AtomicU64::new(0),
44                head: AtomicU32::new(0),
45            },
46            buffer: Ptr::new(),
47        }
48    }
49    #[inline(always)]
50    unsafe fn read_at(&self, position: u32) -> *mut TaskHeader {
51        let index = (position & Self::MASK) as usize;
52        self.buffer.0[index].load(Acquire)
53    }
54    #[inline(always)]
55    unsafe fn write_at(&self, position: u32, item: *mut TaskHeader) {
56        let index = (position & Self::MASK) as usize;
57        self.buffer.0[index].store(item, Release);
58    }
59    #[inline]
60    fn book_items<C>(&self, mut count_fn: C, max_count: u32, ) -> Result<(u32, u32, u32), StealError>
61    where C: FnMut(usize) -> usize {
62        let mut pop_count_and_head = self.stealer_data.pop_count_and_head.load(Acquire);
63        let old_head = self.stealer_data.head.load(Acquire);
64        loop {
65            let (pop_count, head) = unpack(pop_count_and_head);
66            if old_head != head {
67                return Err(StealError::Busy);
68            }
69            let push_count = self.push_count.load(Acquire);
70            let tail = push_count.wrapping_sub(pop_count);
71            let item_count = tail.wrapping_sub(head);
72            if item_count == 0 {
73                return Err(StealError::Empty);
74            }
75            let count = (count_fn(item_count as usize).min(max_count as usize) as u32)
76                .min(item_count);
77            if count == 0 {
78                return Err(StealError::Empty);
79            }
80            let new_head = head.wrapping_add(count);
81            let new_pop_count_and_head = pack(pop_count, new_head);
82            match self.stealer_data.pop_count_and_head.compare_exchange_weak(
83                pop_count_and_head,
84                new_pop_count_and_head,
85                Acquire,
86                Acquire,
87            ) {
88                Ok(_) => return Ok((head, new_head, count)),
89                Err(current) => pop_count_and_head = current,
90            }
91        }
92    }
93}
94impl<const N: usize> Worker<N> {
95    pub const fn new(queue: &'static Queue<N>) -> Self { Worker(queue) }
96    #[inline(always)]
97    pub fn stealer(&self) -> Stealer<N> { Stealer(self.0) }
98    pub fn spare_capacity(&self) -> usize {
99        let push_count = self.0.push_count.load(Relaxed);
100        let pop_count = unpack(self.0.stealer_data.pop_count_and_head.load(Relaxed)).0;
101        let tail = push_count.wrapping_sub(pop_count);
102        let head = self.0.stealer_data.head.load(Relaxed);
103        let len = tail.wrapping_sub(head) as usize;
104        N - len
105    }
106    pub fn is_empty(&self) -> bool {
107        let push_count = self.0.push_count.load(Relaxed);
108        let (pop_count, head) = unpack(self.0.stealer_data.pop_count_and_head.load(Relaxed));
109        push_count.wrapping_sub(pop_count) == head
110    }
111    pub fn push(&self, item: *mut TaskHeader) -> Result<(), *mut TaskHeader> {
112        let push_count = self.0.push_count.load(Relaxed);
113        let pop_count_and_head = self.0.stealer_data.pop_count_and_head.load(Acquire);
114        let (pop_count, head) = unpack(pop_count_and_head);
115        let tail = push_count.wrapping_sub(pop_count);
116        if tail.wrapping_sub(head) >= N as u32 {
117            return Err(item);
118        }
119        unsafe { self.0.write_at(tail, item) };
120        self.0.push_count.store(push_count.wrapping_add(1), Release);
121        Ok(())
122    }
123    pub fn extend<I: IntoIterator<Item = *mut TaskHeader>>(&self, iter: I) {
124        let push_count = self.0.push_count.load(Relaxed);
125        let pop_count = unpack(self.0.stealer_data.pop_count_and_head.load(Relaxed)).0;
126        let mut tail = push_count.wrapping_sub(pop_count);
127        let head = self.0.stealer_data.head.load(Acquire);
128
129        let max_tail = head.wrapping_add(N as u32);
130        for item in iter {
131            if tail == max_tail { break }
132            unsafe { self.0.write_at(tail, item) };
133            tail = tail.wrapping_add(1);
134        }
135        self.0.push_count.store(tail.wrapping_add(pop_count), Release);
136    }
137    pub fn pop(&self) -> Option<*mut TaskHeader> {
138        let mut pop_count_and_head = self.0.stealer_data.pop_count_and_head.load(Relaxed);
139        let push_count = self.0.push_count.load(Relaxed);
140        let (pop_count, mut head) = unpack(pop_count_and_head);
141        let tail = push_count.wrapping_sub(pop_count);
142        let new_pop_count = pop_count.wrapping_add(1);
143
144        loop {
145            if tail == head { return None }
146            let new_pop_count_and_head = pack(new_pop_count, head);
147            match self.0.stealer_data.pop_count_and_head.compare_exchange_weak(
148                pop_count_and_head, new_pop_count_and_head, Release, Relaxed,
149            ) {
150                Ok(_) => break,
151                Err(current) => {
152                    pop_count_and_head = current;
153                    head = unpack(current).1;
154                }
155            }
156        }
157        unsafe { Some(self.0.read_at(tail.wrapping_sub(1))) }
158    }
159    pub fn drain<C>(&self, count_fn: C) -> Result<Drain<'_, N>, StealError>
160    where C: FnMut(usize) -> usize {
161        let (old_head, new_head, _) = self.0.book_items(count_fn, u32::MAX)?;
162        Ok(Drain { queue: &self.0, current: old_head, end: new_head })
163    }
164    pub fn clear<F>(&self, mut dropper: F) where F: FnMut(*mut TaskHeader) {
165        if let Ok(drain) = self.drain(|count| count) {
166            for ptr in drain { dropper(ptr) }
167        }
168    }
169}
170impl<const N: usize> Stealer<N> {
171    pub fn steal<C>(&self, dest: &Worker<N>, count_fn: C) -> Result<usize, StealError>
172    where C: FnMut(usize) -> usize {
173        let dest_push_count = dest.0.push_count.load(Relaxed);
174        let dest_pop_count = unpack(dest.0.stealer_data.pop_count_and_head.load(Relaxed)).0;
175        let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
176        let dest_head = dest.0.stealer_data.head.load(Acquire);
177        let dest_free_capacity = N as u32 - dest_tail.wrapping_sub(dest_head);
178        let (old_head, new_head, transfer_count) =
179            self.0.book_items(count_fn, dest_free_capacity)?;
180        for offset in 0..transfer_count {
181            unsafe {
182                let item = self.0.read_at(old_head.wrapping_add(offset));
183                dest.0.write_at(dest_tail.wrapping_add(offset), item);
184            }
185        }
186        dest.0.push_count.store(dest_push_count.wrapping_add(transfer_count), Release);
187        self.0.stealer_data.head.store(new_head, Release);
188        Ok(transfer_count as usize)
189    }
190    pub fn steal_and_pop<C>(&self, dest: &Worker<N>, count_fn: C) -> Result<(*mut TaskHeader, usize), StealError>
191    where C: FnMut(usize) -> usize {
192        let dest_push_count = dest.0.push_count.load(Relaxed);
193        let dest_pop_count = unpack(dest.0.stealer_data.pop_count_and_head.load(Relaxed)).0;
194        let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
195        let dest_head = dest.0.stealer_data.head.load(Acquire);
196        let dest_free_capacity = N as u32 - dest_tail.wrapping_sub(dest_head);
197        let (old_head, new_head, count) =
198            self.0.book_items(count_fn, dest_free_capacity + 1)?;
199        let transfer_count = count - 1;
200        for offset in 0..transfer_count {
201            unsafe {
202                let item = self.0.read_at(old_head.wrapping_add(offset));
203                dest.0.write_at(dest_tail.wrapping_add(offset), item);
204            }
205        }
206        let last_item = unsafe { self.0.read_at(old_head.wrapping_add(transfer_count)) };
207        dest.0.push_count.store(dest_push_count.wrapping_add(transfer_count), Release);
208        self.0.stealer_data.head.store(new_head, Release);
209        Ok((last_item, transfer_count as usize))
210    }
211}
212impl<'a, const N: usize> Iterator for Drain<'a, N> {
213    type Item = *mut TaskHeader;
214    fn next(&mut self) -> Option<*mut TaskHeader> {
215        if self.current == self.end { return None }
216        let item = unsafe { self.queue.read_at(self.current) };
217        self.current = self.current.wrapping_add(1);
218        if self.current == self.end { self.queue.stealer_data.head.store(self.end, Release) }
219        Some(item)
220    }
221    fn size_hint(&self) -> (usize, Option<usize>) {
222        let sz = self.end.wrapping_sub(self.current) as usize;
223        (sz, Some(sz))
224    }
225}
226impl<'a, const N: usize> Drop for Drain<'a, N> { fn drop(&mut self) { self.for_each(drop) } }
227impl<const N: usize> Clone for Stealer<N> { #[inline(always)] fn clone(&self) -> Self { *self } }
228impl<const N: usize> PartialEq for Stealer<N> {
229    #[inline(always)] fn eq(&self, other: &Self) -> bool { core::ptr::eq(self.0, other.0) }
230}
231impl<const N: usize> Eq for Stealer<N> {}
232impl<const N: usize> UnwindSafe for Stealer<N> {}
233impl<const N: usize> RefUnwindSafe for Stealer<N> {}
234unsafe impl<const N: usize> Send for Stealer<N> {}
235unsafe impl<const N: usize> Sync for Stealer<N> {}
236impl<const N: usize> Copy for Stealer<N> {}
237impl<const N: usize> UnwindSafe for Worker<N> {}
238impl<const N: usize> RefUnwindSafe for Worker<N> {}
239unsafe impl<const N: usize> Send for Worker<N> {}
240impl<'a, const N: usize> ExactSizeIterator for Drain<'a, N> {}
241impl<'a, const N: usize> FusedIterator for Drain<'a, N> {}
242impl<'a, const N: usize> UnwindSafe for Drain<'a, N> {}
243impl<'a, const N: usize> RefUnwindSafe for Drain<'a, N> {}
244unsafe impl<'a, const N: usize> Send for Drain<'a, N> {}
245unsafe impl<'a, const N: usize> Sync for Drain<'a, N> {}