1use core::sync::atomic::{AtomicUsize, AtomicU32};
4use core::sync::atomic::Ordering::{SeqCst, Relaxed};
5use core::array::from_fn;
6
7use alloc::boxed::Box;
8use alloc::vec::Vec;
9
10use crate::slot::Slot;
11
12#[doc(hidden)]
13pub use storage::TmpArray;
14
15pub use storage::{Storage, InternalStorageApi};
16pub use block_size::{BlockSize, SmallBlockSize, DefaultBlockSize, LargeBlockSize, HugeBlockSize};
17use block_ptr::{BlockPointer, CollectedBlock};
18
19mod block;
20mod block_ptr;
21mod block_size;
22mod storage;
23
24const REV_CAP: usize = 8;
25
26type RecycleBin<const L: usize, const F: usize, T> = Vec<CollectedBlock<L, F, T>>;
27
28fn try_swap_int(atomic_int: &AtomicUsize, old: usize, new: usize) -> bool {
29 atomic_int.compare_exchange(old, new, SeqCst, Relaxed).is_ok()
30}
31
32pub struct Fifo<const L: usize, const F: usize, T> {
66 first_block: BlockPointer<L, F, T>,
68 prod_cursor: AtomicUsize,
70 cons_cursor: AtomicUsize,
72 revision: AtomicU32,
74 recycle_bin: Slot<RecycleBin<L, F, T>>,
76 visitors: [AtomicU32; REV_CAP],
78}
79
80impl<const L: usize, const F: usize, T> Default for Fifo<L, F, T> {
81 fn default() -> Self {
82 assert_eq!(F * 8, L);
83 let recycle_bin = Box::new(RecycleBin::new());
84
85 Self {
86 first_block: BlockPointer::new(),
87 prod_cursor: AtomicUsize::new(0),
88 cons_cursor: AtomicUsize::new(0),
89 revision: AtomicU32::new(0),
90 recycle_bin: Slot::new(recycle_bin),
91 visitors: from_fn(|_| AtomicU32::new(0)),
92 }
93 }
94}
95
96impl<const L: usize, const F: usize, T> Fifo<L, F, T> {
97 fn init_visit(&self) -> usize {
98 loop {
99 let rev = self.revision.load(SeqCst) as usize;
100 let rev_refcount = &self.visitors[rev % REV_CAP];
101 rev_refcount.fetch_add(1, SeqCst);
102
103 let new_rev = self.revision.load(SeqCst) as usize;
104 match (new_rev - rev) < REV_CAP {
105 true => break rev,
106 false => _ = rev_refcount.fetch_sub(1, SeqCst),
108 }
109 }
110 }
111
112 fn stop_visit(&self, rev: usize) {
113 self.visitors[rev % REV_CAP].fetch_sub(1, SeqCst);
114 }
115
116 fn try_maintain(&self) {
117 let Some(mut bin) = self.recycle_bin.try_take(false) else {
118 return;
120 };
121
122 let current_rev = self.revision.load(SeqCst) as usize;
123 let oldest_rev = current_rev.saturating_sub(REV_CAP - 1);
124
125 let mut oldest_visited_rev = current_rev;
127 for rev in oldest_rev..current_rev {
128 let rc_slot = rev % REV_CAP;
129 if self.visitors[rc_slot].load(SeqCst) != 0 {
130 oldest_visited_rev = rev;
131 break;
132 }
133 }
134
135 let oldest_used_slot = oldest_visited_rev % REV_CAP;
136
137 let next_rev = current_rev + 1;
138 let next_slot = next_rev % REV_CAP;
139
140 let can_increment = next_slot != oldest_used_slot;
142
143 let mut i = 0;
144 while i < bin.len() {
145 if bin[i].revision < oldest_visited_rev {
146 let block = bin.remove(i);
149 self.first_block.recycle(block);
150 } else {
151 i += 1;
152 }
153 }
154
155 if can_increment {
156 let mut has_collected = false;
157
158 while let Some(block) = self.first_block.try_collect(current_rev) {
159 bin.push(block);
160 has_collected = true;
161 }
162
163 if has_collected {
164 self.revision.store(next_rev as u32, SeqCst);
165 }
166 }
167
168 assert!(self.recycle_bin.try_insert(bin).is_ok());
169 }
170
171 fn produced(&self) -> usize {
173 let mut is_first_block = true;
174 let mut maybe_block = &self.first_block;
175 let mut total_produced = 0;
176
177 'outer: while let Some(block) = maybe_block.load() {
178 if is_first_block {
179 total_produced = block.offset.load(SeqCst);
180 is_first_block = false;
181 }
182
183 for i in 0..L {
184 match block.is_produced(i) {
185 false => break 'outer,
186 true => total_produced += 1,
187 }
188 }
189
190 maybe_block = &block.next;
191 }
192
193 total_produced
194 }
195}
196
197unsafe impl<const L: usize, const F: usize, T> Send for Fifo<L, F, T> {}
198unsafe impl<const L: usize, const F: usize, T> Sync for Fifo<L, F, T> {}
199
200pub trait FifoApi<T>: Send + Sync {
202 fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>);
203 fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize;
204 fn iter(&self) -> PullIter<'_, T>;
205 #[doc(hidden)]
206 fn consume_item(&self, i: usize) -> T;
207 #[doc(hidden)]
208 fn iter_drop(&self);
209}
210
211impl<const L: usize, const F: usize, T> FifoApi<T> for Fifo<L, F, T> {
212 fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>) {
221 let revision = self.init_visit();
222
223 let mut remaining = iter.len();
224 let mut i = self.prod_cursor.fetch_add(remaining, SeqCst);
225
226 let mut is_first_block = true;
227 let mut block_offset = 0;
228 let mut maybe_block = &self.first_block;
229
230 while remaining > 0 {
231 let Some(block) = maybe_block.load() else {
232 maybe_block.append_new();
233 continue;
234 };
235
236 if is_first_block {
237 block_offset = block.offset.load(SeqCst);
238 is_first_block = false;
239 }
240
241 let next_block_offset = block_offset + L;
242 let block_range = block_offset..next_block_offset;
243
244 while block_range.contains(&i) && remaining > 0 {
246 let item = iter.next().unwrap();
247
248 let slot_i = i - block_offset;
249 block.produce(slot_i, item);
250
251 i += 1;
252 remaining -= 1;
253 }
254
255 block_offset = next_block_offset;
256 maybe_block = &block.next;
257 }
258
259 self.stop_visit(revision);
260 self.try_maintain();
261 }
262
263 fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize {
272 let (min, max) = storage.bounds();
273 let max = max.unwrap_or(usize::MAX);
274 let min = min.unwrap_or(1);
275 let revision = self.init_visit();
276
277 let mut success = false;
278 let mut i = 0;
279 let mut negotiated = 0;
280
281 while !success {
282 let produced = self.produced();
283 i = self.cons_cursor.load(SeqCst);
284 negotiated = match produced.checked_sub(i) {
285 Some(available) => available.min(max),
286 None => continue,
287 };
288
289 if negotiated < min {
290 negotiated = 0;
291 break;
292 }
293
294 success = try_swap_int(&self.cons_cursor, i, i + negotiated);
295 }
296
297 storage.reserve(negotiated);
298 let mut remaining = negotiated;
299 let mut is_first_block = true;
300 let mut block_offset = 0;
301 let mut maybe_block = &self.first_block;
302
303 while remaining > 0 {
304 let Some(block) = maybe_block.load() else {
305 maybe_block.append_new();
306 continue;
307 };
308
309 if is_first_block {
310 block_offset = block.offset.load(SeqCst);
311 is_first_block = false;
312 }
313
314 let next_block_offset = block_offset + L;
315 let block_range = block_offset..next_block_offset;
316
317 while block_range.contains(&i) && remaining > 0 {
319 let slot_i = i - block_offset;
320 let item = block.consume(slot_i);
321
322 let storage_index = negotiated - remaining;
323 storage.insert(storage_index, item);
324
325 i += 1;
326 remaining -= 1;
327 }
328
329 block_offset = next_block_offset;
330 maybe_block = &block.next;
331 }
332
333 self.stop_visit(revision);
334 self.try_maintain();
335
336 negotiated
337 }
338
339 fn iter(&self) -> PullIter<'_, T> {
340 let revision = self.init_visit();
341 let mut success = false;
342 let mut i = 0;
343 let mut negotiated = 0;
344
345 while !success {
346 let produced = self.produced();
347 i = self.cons_cursor.load(SeqCst);
348
349 negotiated = match produced.checked_sub(i) {
350 Some(available) => available,
351 None => continue,
352 };
353
354 success = try_swap_int(&self.cons_cursor, i, i + negotiated);
355 }
356
357 self.stop_visit(revision);
358
359 PullIter {
360 fifo: self,
361 i,
362 remaining: negotiated,
363 }
364 }
365
366 fn consume_item(&self, i: usize) -> T {
367 let revision = self.init_visit();
368 let mut is_first_block = true;
369 let mut block_offset = 0;
370 let mut maybe_block = &self.first_block;
371
372 let item = loop {
373 let block = maybe_block.load().unwrap();
374
375 if is_first_block {
376 block_offset = block.offset.load(SeqCst);
377 is_first_block = false;
378 }
379
380 let next_block_offset = block_offset + L;
381 let block_range = block_offset..next_block_offset;
382
383 if block_range.contains(&i) {
385 let slot_i = i - block_offset;
386 break block.consume(slot_i);
387 }
388
389 block_offset = next_block_offset;
390 maybe_block = &block.next;
391 };
392
393 self.stop_visit(revision);
394
395 item
396 }
397
398 fn iter_drop(&self) {
399 self.try_maintain();
400 }
401}
402
403pub struct PullIter<'a, T> {
404 fifo: &'a dyn FifoApi<T>,
405 i: usize,
406 remaining: usize,
407}
408
409impl<'a, T> Iterator for PullIter<'a, T> {
410 type Item = T;
411
412 fn next(&mut self) -> Option<Self::Item> {
413 let next_rem = self.remaining.checked_sub(1)?;
414 let item = self.fifo.consume_item(self.i);
415
416 self.remaining = next_rem;
417 self.i += 1;
418
419 Some(item)
420 }
421
422 fn size_hint(&self) -> (usize, Option<usize>) {
423 (self.remaining, Some(self.remaining))
424 }
425}
426
427impl<'a, T> ExactSizeIterator for PullIter<'a, T> {}
428
429impl<'a, T> Drop for PullIter<'a, T> {
430 fn drop(&mut self) {
431 let _ = self.count();
433 self.fifo.iter_drop();
434 }
435}