yep_coc/
queue.rs

1use std::{cell::Cell, sync::atomic::Ordering};
2
3use crate::queue_meta::YCQueueU64Meta;
4
5use crate::utils::get_bit;
6use crate::{YCQueueError, YCQueueSharedMeta, utils};
7
8#[derive(Debug)]
9pub struct YCQueueProduceSlot<'a> {
10    pub index: u16,
11    pub data: &'a mut [u8],
12}
13
14#[derive(Debug)]
15pub struct YCQueueConsumeSlot<'a> {
16    pub index: u16,
17    pub data: &'a mut [u8],
18}
19
20#[derive(Clone, Copy, Eq, PartialEq, Debug)]
21pub enum YCQueueOwner {
22    Producer,
23    Consumer,
24}
25
26pub struct YCQueue<'a> {
27    shared_metadata: YCQueueSharedMeta<'a>,
28    slots: Vec<Cell<Option<&'a mut [u8]>>>,
29    slot_count: u16,
30    slot_size: u16,
31}
32
33impl<'a> YCQueue<'a> {
34    /// Create a queue backed by shared metadata and a contiguous data region.
35    ///
36    /// # Arguments
37    /// * `shared_metadata` - Shared ownership state that tracks slot usage across threads.
38    /// * `data_region` - Contiguous slice that will be divided into fixed-size slots.
39    ///
40    /// # Returns
41    /// `Ok(YCQueue)` when `data_region` aligns with the metadata configuration, or `Err(YCQueueError::InvalidArgs)` if the arguments disagree.
42    ///
43    /// # Examples
44    /// ```
45    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
46    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
47    ///
48    /// let mut owned = YCQueueOwnedData::new(4, 32);
49    /// let shared = YCQueueSharedMeta::new(&owned.meta);
50    /// let queue = YCQueue::new(shared, owned.data.as_mut_slice());
51    /// assert!(queue.is_ok());
52    /// ```
53    pub fn new(
54        shared_metadata: YCQueueSharedMeta<'a>,
55        data_region: &'a mut [u8],
56    ) -> Result<YCQueue<'a>, YCQueueError> {
57        let slot_size = shared_metadata.slot_size.load(Ordering::Acquire) as usize;
58        let slot_count = data_region.len() / slot_size;
59
60        if !data_region.len().is_multiple_of(slot_size) {
61            return Err(YCQueueError::InvalidArgs);
62        }
63
64        let mut slots = Vec::<Cell<Option<&'a mut [u8]>>>::with_capacity(slot_count);
65        for slot in data_region.chunks_exact_mut(slot_size) {
66            slots.push(Cell::new(Some(slot)));
67        }
68
69        if shared_metadata.slot_count.load(Ordering::Acquire) as usize != slot_count {
70            return Err(YCQueueError::InvalidArgs);
71        }
72
73        Ok(YCQueue {
74            shared_metadata,
75            slots,
76            slot_count: slot_count as u16,
77            slot_size: slot_size as u16,
78        })
79    }
80
81    /// Count how many slots starting at `idx` are currently owned by `owner`, up to `range`.
82    ///
83    /// Returns the number of consecutive slots that matched before encountering one owned by the
84    /// opposite party, wrapping around the ring as needed.
85    fn check_owner(&self, idx: u16, range: u16, owner: YCQueueOwner) -> u16 {
86        if range == 0 || idx >= self.slot_count || range > self.slot_count {
87            return 0;
88        }
89
90        let mut processed: u16 = 0;
91        let mut remaining = range as u32;
92        let mut current = idx as u32;
93        let slot_count = self.slot_count as u32;
94
95        while remaining > 0 {
96            if current >= slot_count {
97                current -= slot_count;
98            }
99
100            let chunk_idx = (current / u64::BITS) as usize;
101            let bit_offset = (current % u64::BITS) as u8;
102            let bits_available = u64::BITS - bit_offset as u32;
103            let slots_until_end = slot_count - current;
104            let span = remaining.min(bits_available).min(slots_until_end);
105            debug_assert!(span > 0);
106
107            let span_mask = if span == u64::BITS {
108                !0u64
109            } else {
110                (1u64 << span) - 1
111            };
112            let value = self.shared_metadata.ownership[chunk_idx].load(Ordering::Acquire);
113            let masked = (value >> bit_offset) & span_mask;
114
115            match owner {
116                YCQueueOwner::Producer => {
117                    if masked != 0 {
118                        let offset = masked.trailing_zeros() as u16;
119                        return processed + offset;
120                    }
121                }
122                YCQueueOwner::Consumer => {
123                    if masked != span_mask {
124                        let missing = (!masked) & span_mask;
125                        let offset = missing.trailing_zeros() as u16;
126                        return processed + offset;
127                    }
128                }
129            }
130
131            processed += span as u16;
132            remaining -= span;
133            current += span;
134        }
135
136        processed
137    }
138
139    /// Load the current owner bit for a single slot.
140    ///
141    /// Returns `YCQueueOwner::Producer` when the slot is available to producers and `Consumer`
142    /// otherwise.
143    fn get_owner(&self, idx: u16) -> YCQueueOwner {
144        let atomic_idx = idx / u64::BITS as u16;
145        let bit_idx = (idx % u64::BITS as u16) as u8;
146        let atomic = &self.shared_metadata.ownership[atomic_idx as usize];
147        let value = atomic.load(Ordering::Acquire);
148
149        match utils::get_bit(&value, bit_idx) {
150            false => YCQueueOwner::Producer,
151            true => YCQueueOwner::Consumer,
152        }
153    }
154
155    /// Atomically set the owner of a single slot, returning the previous owner.
156    ///
157    /// Used by producer/consumer transitions to flip the ownership bit while maintaining ordering.
158    fn set_owner(&mut self, idx: u16, owner: YCQueueOwner) -> YCQueueOwner {
159        loop {
160            let atomic_idx = idx / u64::BITS as u16;
161            let bit_idx = (idx % u64::BITS as u16) as u8;
162            let atomic = &self.shared_metadata.ownership[atomic_idx as usize];
163            let value = atomic.load(Ordering::Acquire);
164
165            let new_value = match owner {
166                YCQueueOwner::Producer => utils::clear_bit(&value, bit_idx),
167                YCQueueOwner::Consumer => utils::set_bit(&value, bit_idx),
168            };
169
170            match atomic.compare_exchange(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
171                Ok(_) => {
172                    if get_bit(&value, bit_idx) {
173                        return YCQueueOwner::Consumer;
174                    } else {
175                        return YCQueueOwner::Producer;
176                    }
177                }
178                Err(_) => continue,
179            }
180        }
181    }
182
183    /// Atomically set the owner for a contiguous range of slots.
184    ///
185    /// Returns `Err(YCQueueError::InvalidArgs)` when the starting index is out of bounds or the
186    /// range length exceeds the queue capacity.
187    fn set_owner_range(
188        &mut self,
189        idx: u16,
190        range: u16,
191        owner: YCQueueOwner,
192    ) -> Result<(), YCQueueError> {
193        if range == 0 {
194            return Ok(());
195        }
196        if idx >= self.slot_count || range > self.slot_count {
197            return Err(YCQueueError::InvalidArgs);
198        }
199        let mut remaining = range as u32;
200        let mut current = idx as u32;
201        let slot_count = self.slot_count as u32;
202
203        while remaining > 0 {
204            // wrap around if needed
205            if current >= slot_count {
206                // TODO: tag as cold path
207                current -= slot_count;
208            }
209
210            let chunk_idx = (current / u64::BITS) as usize;
211            let bit_offset = (current % u64::BITS) as u8;
212            let bits_available = u64::BITS - bit_offset as u32;
213            let slots_until_end = slot_count - current;
214            let span = remaining.min(bits_available).min(slots_until_end);
215            debug_assert!(span > 0);
216
217            let mask = if span == u64::BITS {
218                !0u64
219            } else {
220                ((1u64 << span) - 1) << bit_offset
221            };
222
223            loop {
224                let value = self.shared_metadata.ownership[chunk_idx].load(Ordering::Acquire);
225
226                let new_value = match owner {
227                    YCQueueOwner::Producer => {
228                        debug_assert_eq!(value & mask, mask);
229                        value & !mask
230                    }
231                    YCQueueOwner::Consumer => {
232                        debug_assert_eq!(value & mask, 0);
233                        value | mask
234                    }
235                };
236
237                match self.shared_metadata.ownership[chunk_idx].compare_exchange(
238                    value,
239                    new_value,
240                    Ordering::AcqRel,
241                    Ordering::Acquire,
242                ) {
243                    Ok(_) => break,
244                    Err(_) => continue,
245                }
246            }
247
248            remaining -= span;
249            current += span;
250        }
251
252        Ok(())
253    }
254
255    /// Snapshot the packed metadata that tracks indices and in-flight count.
256    fn get_u64_meta(&self) -> YCQueueU64Meta {
257        YCQueueU64Meta::from_u64(self.shared_metadata.u64_meta.load(Ordering::Acquire))
258    }
259
260    /// Returns the number of slots that have been produced (or are being produced into) but not yet consumed.
261    ///
262    /// # Returns
263    /// Count of slots currently in flight between producers and consumers.
264    ///
265    /// # Examples
266    /// ```
267    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
268    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
269    ///
270    /// let mut owned = YCQueueOwnedData::new(2, 16);
271    /// let shared = YCQueueSharedMeta::new(&owned.meta);
272    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
273    ///
274    /// assert_eq!(queue.in_flight_count(), 0);
275    /// let slot = queue.get_produce_slot().unwrap();
276    /// queue.mark_slot_produced(slot).unwrap();
277    /// assert_eq!(queue.in_flight_count(), 1);
278    /// ```
279    #[inline]
280    pub fn in_flight_count(&self) -> u16 {
281        self.get_u64_meta().in_flight
282    }
283
284    /// Returns the circular index that will be reserved by the next producer call.
285    ///
286    /// # Returns
287    /// The slot index measured modulo the queue capacity.
288    #[inline]
289    pub fn produce_idx(&self) -> u16 {
290        self.get_u64_meta().produce_idx
291    }
292
293    /// Returns the circular index that will be reserved by the next consumer call.
294    ///
295    /// # Returns
296    /// The slot index measured modulo the queue capacity.
297    #[inline]
298    pub fn consume_idx(&self) -> u16 {
299        self.get_u64_meta().consume_idx
300    }
301
302    /// Returns the total number of slots managed by this queue.
303    #[inline]
304    pub fn capacity(&self) -> u16 {
305        self.slot_count
306    }
307
308    /// Reserve contiguous slots for producers, optionally in best-effort mode.
309    ///
310    /// When `best_effort` is `false`, the function succeeds only if all `num_slots` are available;
311    /// otherwise it returns as many contiguous slots as currently ready.
312    ///
313    /// # Arguments
314    /// * `num_slots` - Maximum number of contiguous slots to attempt to reserve.
315    /// * `best_effort` - When `true`, grants a partial batch instead of requiring `num_slots`.
316    ///
317    /// # Returns
318    /// `Ok` containing one or more slots when reservation succeeds.
319    ///
320    /// # Errors
321    /// Returns `YCQueueError::InvalidArgs` when `num_slots` is zero or exceeds capacity,
322    /// `YCQueueError::OutOfSpace` when there is no remaining capacity, and
323    /// `YCQueueError::SlotNotReady` when the requested slots are not owned by the producer.
324    ///
325    /// # Examples
326    /// ```
327    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
328    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
329    ///
330    /// let mut owned = YCQueueOwnedData::new(4, 16);
331    /// let shared = YCQueueSharedMeta::new(&owned.meta);
332    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
333    ///
334    /// let slots = queue.get_produce_slots(2, false).unwrap();
335    /// assert_eq!(slots.len(), 2);
336    /// queue.mark_slots_produced(slots).unwrap();
337    ///
338    /// let partial = queue.get_produce_slots(2, true).unwrap();
339    /// assert!(!partial.is_empty());
340    /// queue.mark_slots_produced(partial).unwrap();
341    /// ```
342    #[inline]
343    pub fn get_produce_slots(
344        &mut self,
345        mut num_slots: u16,
346        best_effort: bool,
347    ) -> Result<Vec<YCQueueProduceSlot<'a>>, YCQueueError> {
348        if num_slots == 0 || num_slots > self.slot_count {
349            return Err(YCQueueError::InvalidArgs);
350        }
351
352        let start_index = loop {
353            let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
354            let mut meta = YCQueueU64Meta::from_u64(value);
355
356            if meta.in_flight as u32 + num_slots as u32 > self.slot_count as u32 {
357                return Err(YCQueueError::OutOfSpace);
358            }
359
360            // make sure all the slots we want are owned by the producer
361            let available_slots =
362                self.check_owner(meta.produce_idx, num_slots, YCQueueOwner::Producer);
363
364            if (!best_effort && available_slots != num_slots)
365                || (best_effort && available_slots == 0)
366            {
367                return Err(YCQueueError::SlotNotReady);
368            }
369
370            debug_assert!(available_slots > 0);
371            debug_assert!(available_slots <= num_slots);
372
373            num_slots = available_slots;
374            let produce_idx = meta.produce_idx;
375            meta.in_flight += num_slots;
376            meta.produce_idx += num_slots;
377            // wrap around if needed
378            if meta.produce_idx >= self.slot_count {
379                // TODO: tag as cold path
380                meta.produce_idx -= self.slot_count;
381            }
382
383            let new_value = meta.to_u64();
384            match self.shared_metadata.u64_meta.compare_exchange(
385                value,
386                new_value,
387                Ordering::AcqRel,
388                Ordering::Acquire,
389            ) {
390                Ok(_) => break produce_idx,
391                Err(_) => continue,
392            }
393        };
394
395        let mut slots = Vec::with_capacity(num_slots as usize);
396        let mut index = start_index;
397        for _ in 0..num_slots {
398            debug_assert_eq!(self.get_owner(index), YCQueueOwner::Producer);
399
400            let slot_data = self.slots[index as usize].replace(None);
401            match slot_data {
402                Some(data) => slots.push(YCQueueProduceSlot { index, data }),
403                None => panic!("We double-loaned out produce index {index:?}"),
404            }
405
406            index += 1;
407            // wrap around if needed
408            if index >= self.slot_count {
409                // TODO: tag as cold path
410                index -= self.slot_count;
411            }
412        }
413
414        Ok(slots)
415    }
416
417    /// Reserve a single slot to produce into.
418    ///
419    /// # Returns
420    /// `Ok` with the next available `YCQueueProduceSlot` when space exists.
421    ///
422    /// # Errors
423    /// Propagates the same errors as [`get_produce_slots`](Self::get_produce_slots) when no capacity is available.
424    ///
425    /// # Examples
426    /// ```
427    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
428    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
429    ///
430    /// let mut owned = YCQueueOwnedData::new(2, 16);
431    /// let shared = YCQueueSharedMeta::new(&owned.meta);
432    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
433    ///
434    /// // Reserve a slot
435    /// let slot = queue.get_produce_slot().unwrap();
436    /// // Fill it with data
437    /// slot.data.fill(0xAB);
438    /// ```
439    #[inline]
440    pub fn get_produce_slot(&mut self) -> Result<YCQueueProduceSlot<'a>, YCQueueError> {
441        let mut slots = self.get_produce_slots(1, false)?;
442
443        Ok(slots
444            .pop()
445            .expect("get_produce_slots(1, false) returned without a slot"))
446    }
447
448    /// Mark a slot as produced into. This makes it available to consumers.
449    ///
450    /// # Arguments
451    /// * `queue_slot` - Slot previously obtained from [`get_produce_slot`](Self::get_produce_slot) or [`get_produce_slots`](Self::get_produce_slots).
452    ///
453    /// # Returns
454    /// `Ok(())` when the slot is successfully handed off to consumers.
455    ///
456    /// # Errors
457    /// Returns `YCQueueError::InvalidArgs` when the slot has an unexpected size.
458    ///
459    /// # Examples
460    /// ```
461    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
462    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
463    ///
464    /// let mut owned = YCQueueOwnedData::new(2, 16);
465    /// let shared = YCQueueSharedMeta::new(&owned.meta);
466    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
467    ///
468    /// let slot = queue.get_produce_slot().unwrap();
469    /// queue.mark_slot_produced(slot).unwrap();
470    /// assert_eq!(queue.in_flight_count(), 1);
471    /// ```
472    pub fn mark_slot_produced(
473        &mut self,
474        queue_slot: YCQueueProduceSlot<'a>,
475    ) -> Result<(), YCQueueError> {
476        /*
477         * Marking a slot as produced gives it to the consumer to consume. These are not required
478         * to happen in the same order the slots were reserved. This updates the in-flight count.
479         */
480
481        if queue_slot.data.len() != self.slot_size as usize {
482            return Err(YCQueueError::InvalidArgs);
483        }
484
485        // yoink back the slot data
486        let produce_idx = queue_slot.index;
487        let old_data = self.slots[produce_idx as usize].replace(Some(queue_slot.data));
488
489        debug_assert_eq!(old_data, None);
490
491        // update the bitfield.
492        let old_owner = self.set_owner(produce_idx, YCQueueOwner::Consumer);
493        debug_assert_eq!(old_owner, YCQueueOwner::Producer);
494
495        Ok(())
496    }
497
498    /// Mark multiple slots as produced into. This makes them available to consumers.
499    ///
500    /// # Arguments
501    /// * `queue_slots` - Contiguous slots previously obtained from [`get_produce_slots`](Self::get_produce_slots).
502    ///
503    /// # Returns
504    /// `Ok(())` when ownership is returned to consumers.
505    ///
506    /// # Errors
507    /// Returns `YCQueueError::InvalidArgs` when the slots are empty, non-contiguous, exceed capacity, or hold slices of the wrong length.
508    ///
509    /// # Examples
510    /// ```
511    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
512    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
513    ///
514    /// let mut owned = YCQueueOwnedData::new(4, 16);
515    /// let shared = YCQueueSharedMeta::new(&owned.meta);
516    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
517    ///
518    /// let slots = queue.get_produce_slots(4, false).unwrap();
519    /// queue.mark_slots_produced(slots).unwrap();
520    /// assert_eq!(queue.in_flight_count(), 4);
521    /// ```
522    pub fn mark_slots_produced(
523        &mut self,
524        queue_slots: Vec<YCQueueProduceSlot<'a>>,
525    ) -> Result<(), YCQueueError> {
526        if queue_slots.is_empty() {
527            return Ok(());
528        }
529
530        let slot_size = self.slot_size as usize;
531        let count = queue_slots.len();
532        if count > self.slot_count as usize {
533            return Err(YCQueueError::InvalidArgs);
534        }
535
536        let start_index = queue_slots[0].index;
537        let slot_count = self.slot_count as usize;
538
539        for (offset, slot) in queue_slots.iter().enumerate() {
540            if slot.data.len() != slot_size {
541                return Err(YCQueueError::InvalidArgs);
542            }
543
544            let expected = ((start_index as usize + offset) % slot_count) as u16;
545            if slot.index != expected {
546                return Err(YCQueueError::InvalidArgs);
547            }
548        }
549
550        for slot in queue_slots.into_iter() {
551            let old_data = self.slots[slot.index as usize].replace(Some(slot.data));
552            debug_assert!(old_data.is_none());
553        }
554
555        self.set_owner_range(start_index, count as u16, YCQueueOwner::Consumer)
556    }
557
558    /// Reserve contiguous slots for consumers, optionally in best-effort mode.
559    ///
560    /// When `best_effort` is `false`, the function succeeds only if all `num_slots` are ready;
561    /// otherwise it returns as many contiguous slots as currently published.
562    ///
563    /// # Arguments
564    /// * `num_slots` - Maximum number of contiguous slots to attempt to dequeue.
565    /// * `best_effort` - When `true`, grants a partial batch instead of requiring `num_slots`.
566    ///
567    /// # Returns
568    /// `Ok` containing one or more ready slots when the reservation succeeds.
569    ///
570    /// # Errors
571    /// Returns `YCQueueError::InvalidArgs` when `num_slots` is zero or exceeds capacity,
572    /// `YCQueueError::EmptyQueue` when nothing has been produced yet, and
573    /// `YCQueueError::SlotNotReady` when the requested slots have not all been published.
574    ///
575    /// # Examples
576    /// ```
577    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
578    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
579    ///
580    /// let mut owned = YCQueueOwnedData::new(4, 16);
581    /// let shared = YCQueueSharedMeta::new(&owned.meta);
582    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
583    ///
584    /// let produce = queue.get_produce_slots(4, false).unwrap();
585    /// queue.mark_slots_produced(produce).unwrap();
586    ///
587    /// let consume = queue.get_consume_slots(2, false).unwrap();
588    /// assert_eq!(consume.len(), 2);
589    ///
590    /// let partial = queue.get_consume_slots(2, true).unwrap();
591    /// assert!(!partial.is_empty());
592    /// ```
593    #[inline]
594    pub fn get_consume_slots(
595        &mut self,
596        mut num_slots: u16,
597        best_effort: bool,
598    ) -> Result<Vec<YCQueueConsumeSlot<'a>>, YCQueueError> {
599        if num_slots == 0 || num_slots > self.slot_count {
600            return Err(YCQueueError::InvalidArgs);
601        }
602
603        let start_index = loop {
604            let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
605            let mut meta = YCQueueU64Meta::from_u64(value);
606
607            if meta.in_flight < num_slots {
608                return Err(YCQueueError::EmptyQueue);
609            }
610
611            let available_slots =
612                self.check_owner(meta.consume_idx, num_slots, YCQueueOwner::Consumer);
613            if (!best_effort && available_slots != num_slots)
614                || (best_effort && available_slots == 0)
615            {
616                return Err(YCQueueError::SlotNotReady);
617            }
618
619            debug_assert!(available_slots > 0);
620            debug_assert!(available_slots <= num_slots);
621            num_slots = available_slots;
622
623            let consume_idx = meta.consume_idx;
624            let mut next_idx = consume_idx as u32 + num_slots as u32;
625            if next_idx >= self.slot_count as u32 {
626                next_idx -= self.slot_count as u32;
627            }
628            meta.consume_idx = next_idx as u16;
629            meta.in_flight -= num_slots;
630
631            let new_value = meta.to_u64();
632            match self.shared_metadata.u64_meta.compare_exchange(
633                value,
634                new_value,
635                Ordering::AcqRel,
636                Ordering::Acquire,
637            ) {
638                Ok(_) => break consume_idx,
639                Err(_) => continue,
640            }
641        };
642
643        let mut slots = Vec::with_capacity(num_slots as usize);
644        let mut index = start_index;
645        for _ in 0..num_slots {
646            debug_assert_eq!(self.get_owner(index), YCQueueOwner::Consumer);
647
648            let slot_data = self.slots[index as usize].replace(None);
649            match slot_data {
650                Some(data) => slots.push(YCQueueConsumeSlot { index, data }),
651                None => panic!("We double-loaned out consume index {index:?}"),
652            }
653
654            index += 1;
655            if index >= self.slot_count {
656                index -= self.slot_count;
657            }
658        }
659
660        Ok(slots)
661    }
662
663    /// Reserve a single slot for consumption.
664    ///
665    /// # Returns
666    /// `Ok` with the next ready `YCQueueConsumeSlot` when data is available.
667    ///
668    /// # Errors
669    /// Propagates the same errors as [`get_consume_slots`](Self::get_consume_slots) when no slots are ready.
670    ///
671    /// # Examples
672    /// ```
673    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
674    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
675    ///
676    /// let mut owned = YCQueueOwnedData::new(2, 16);
677    /// let shared = YCQueueSharedMeta::new(&owned.meta);
678    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
679    ///
680    /// let slot = queue.get_produce_slot().unwrap();
681    /// queue.mark_slot_produced(slot).unwrap();
682    ///
683    /// let consume = queue.get_consume_slot().unwrap();
684    /// assert_eq!(consume.index, 0);
685    /// ```
686    pub fn get_consume_slot(&mut self) -> Result<YCQueueConsumeSlot<'a>, YCQueueError> {
687        let mut slots = self.get_consume_slots(1, false)?;
688
689        Ok(slots
690            .pop()
691            .expect("get_consume_slots(1, false) returned without a slot"))
692    }
693
694    /// Return an individual consumption slot back to the producer pool.
695    ///
696    /// # Arguments
697    /// * `queue_slot` - Slot previously obtained from [`get_consume_slot`](Self::get_consume_slot) or [`get_consume_slots`](Self::get_consume_slots).
698    ///
699    /// # Returns
700    /// `Ok(())` when the slot is successfully reclaimed for producers.
701    ///
702    /// # Errors
703    /// Returns `YCQueueError::InvalidArgs` when the slot data length is unexpected.
704    ///
705    /// # Examples
706    /// ```
707    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
708    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
709    ///
710    /// let mut owned = YCQueueOwnedData::new(2, 16);
711    /// let shared = YCQueueSharedMeta::new(&owned.meta);
712    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
713    ///
714    /// let slot = queue.get_produce_slot().unwrap();
715    /// queue.mark_slot_produced(slot).unwrap();
716    /// let consume = queue.get_consume_slot().unwrap();
717    /// queue.mark_slot_consumed(consume).unwrap();
718    /// assert_eq!(queue.in_flight_count(), 0);
719    /// ```
720    pub fn mark_slot_consumed(
721        &mut self,
722        queue_slot: YCQueueConsumeSlot<'a>,
723    ) -> Result<(), YCQueueError> {
724        if queue_slot.data.len() != self.slot_size as usize {
725            return Err(YCQueueError::InvalidArgs);
726        }
727
728        // yoink back the slot data
729        let consume_idx = queue_slot.index;
730        let old_data = self.slots[consume_idx as usize].replace(Some(queue_slot.data));
731
732        debug_assert_eq!(old_data, None);
733
734        // update the bitfield now
735        let old_owner = self.set_owner(consume_idx, YCQueueOwner::Producer);
736        debug_assert_eq!(old_owner, YCQueueOwner::Consumer);
737
738        Ok(())
739    }
740
741    /// Return multiple consumption slots back to the producer pool at once.
742    ///
743    /// # Arguments
744    /// * `queue_slots` - Contiguous slots previously obtained from [`get_consume_slots`](Self::get_consume_slots).
745    ///
746    /// # Returns
747    /// `Ok(())` when all slots are reclaimed for producers.
748    ///
749    /// # Errors
750    /// Returns `YCQueueError::InvalidArgs` when any slot has an unexpected length, is out of order, or when the batch length exceeds the queue capacity.
751    ///
752    /// # Examples
753    /// ```
754    /// use yep_coc::queue_alloc_helpers::YCQueueOwnedData;
755    /// use yep_coc::{YCQueue, YCQueueSharedMeta};
756    ///
757    /// let mut owned = YCQueueOwnedData::new(4, 16);
758    /// let shared = YCQueueSharedMeta::new(&owned.meta);
759    /// let mut queue = YCQueue::new(shared, owned.data.as_mut_slice()).unwrap();
760    ///
761    /// let produce = queue.get_produce_slots(4, false).unwrap();
762    /// queue.mark_slots_produced(produce).unwrap();
763    /// let consume = queue.get_consume_slots(4, false).unwrap();
764    /// queue.mark_slots_consumed(consume).unwrap();
765    /// assert_eq!(queue.in_flight_count(), 0);
766    /// ```
767    pub fn mark_slots_consumed(
768        &mut self,
769        queue_slots: Vec<YCQueueConsumeSlot<'a>>,
770    ) -> Result<(), YCQueueError> {
771        if queue_slots.is_empty() {
772            return Ok(());
773        }
774
775        let slot_size = self.slot_size as usize;
776        let count = queue_slots.len();
777        if count > self.slot_count as usize {
778            return Err(YCQueueError::InvalidArgs);
779        }
780
781        let start_index = queue_slots[0].index;
782        let slot_count = self.slot_count as usize;
783
784        for (offset, slot) in queue_slots.iter().enumerate() {
785            if slot.data.len() != slot_size {
786                return Err(YCQueueError::InvalidArgs);
787            }
788
789            let expected = ((start_index as usize + offset) % slot_count) as u16;
790            if slot.index != expected {
791                return Err(YCQueueError::InvalidArgs);
792            }
793        }
794
795        for slot in queue_slots.into_iter() {
796            let old_data = self.slots[slot.index as usize].replace(Some(slot.data));
797            debug_assert!(old_data.is_none());
798        }
799
800        self.set_owner_range(start_index, count as u16, YCQueueOwner::Producer)
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use crate::YCQueueError;
808    use crate::queue_alloc_helpers::YCQueueOwnedData;
809
810    #[test]
811    fn simple_produce_consume_test() {
812        let slot_count: u16 = 4;
813        let slot_size: u16 = 32;
814
815        let owned = YCQueueOwnedData::new(slot_count, slot_size);
816        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
817
818        assert_eq!(
819            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
820            slot_count
821        );
822        assert_eq!(queue.in_flight_count(), 0);
823        assert_eq!(queue.produce_idx(), 0);
824        assert_eq!(queue.consume_idx(), 0);
825
826        let slot = queue.get_produce_slot().unwrap();
827        assert_eq!(slot.index, 0);
828        assert_eq!(queue.produce_idx(), 1);
829        assert_eq!(queue.in_flight_count(), 1);
830        assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Producer), 1u16);
831
832        slot.data.fill(0xAB);
833        queue.mark_slot_produced(slot).unwrap();
834        assert_eq!(queue.in_flight_count(), 1);
835        assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Consumer), 1u16);
836
837        let consume_slot = queue.get_consume_slot().unwrap();
838        assert_eq!(consume_slot.index, 0);
839        assert_eq!(queue.consume_idx(), 1);
840        assert_eq!(queue.in_flight_count(), 0);
841        assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Consumer), 1u16);
842
843        queue.mark_slot_consumed(consume_slot).unwrap();
844        assert_eq!(
845            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
846            slot_count
847        );
848        assert_eq!(queue.in_flight_count(), 0);
849        assert_eq!(queue.produce_idx(), 1);
850        assert_eq!(queue.consume_idx(), 1);
851    }
852
853    #[test]
854    fn batched_produce_consume_test() {
855        let slot_count: u16 = 8;
856        let slot_size: u16 = 64;
857
858        let owned = YCQueueOwnedData::new(slot_count, slot_size);
859        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
860
861        assert_eq!(
862            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
863            slot_count
864        );
865        assert_eq!(queue.in_flight_count(), 0);
866        assert_eq!(queue.produce_idx(), 0);
867        assert_eq!(queue.consume_idx(), 0);
868
869        let produce_batch = 3;
870        let produce_slots = queue.get_produce_slots(produce_batch, false).unwrap();
871        let produced_indices: Vec<_> = produce_slots.iter().map(|slot| slot.index).collect();
872        assert_eq!(produced_indices, vec![0, 1, 2]);
873        assert_eq!(queue.in_flight_count(), produce_batch);
874        assert_eq!(queue.produce_idx(), produce_batch);
875        assert_eq!(
876            queue.check_owner(0, produce_batch, YCQueueOwner::Producer),
877            produce_batch
878        );
879
880        queue.mark_slots_produced(produce_slots).unwrap();
881        assert_eq!(
882            queue.check_owner(0, produce_batch, YCQueueOwner::Consumer),
883            produce_batch
884        );
885        assert_eq!(queue.in_flight_count(), produce_batch);
886
887        let consume_slots_first = queue.get_consume_slots(2, false).unwrap();
888        let consumed_indices: Vec<_> = consume_slots_first.iter().map(|slot| slot.index).collect();
889        assert_eq!(consumed_indices, vec![0, 1]);
890        assert_eq!(queue.consume_idx(), 2);
891        assert_eq!(queue.in_flight_count(), 1);
892        assert_eq!(queue.check_owner(0, 2, YCQueueOwner::Consumer), 2u16);
893        assert_eq!(queue.check_owner(2, 1, YCQueueOwner::Consumer), 1u16);
894
895        queue.mark_slots_consumed(consume_slots_first).unwrap();
896        assert_eq!(queue.check_owner(0, 2, YCQueueOwner::Producer), 2u16);
897        assert_eq!(queue.check_owner(2, 1, YCQueueOwner::Consumer), 1u16);
898        assert_eq!(queue.in_flight_count(), 1);
899
900        let final_slot = queue.get_consume_slots(1, false).unwrap();
901        assert_eq!(final_slot[0].index, 2);
902        assert_eq!(queue.in_flight_count(), 0);
903        assert_eq!(queue.consume_idx(), 3);
904
905        queue.mark_slots_consumed(final_slot).unwrap();
906        assert_eq!(
907            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
908            slot_count
909        );
910        assert_eq!(queue.in_flight_count(), 0);
911        assert_eq!(queue.produce_idx(), 3);
912        assert_eq!(queue.consume_idx(), 3);
913    }
914
915    #[test]
916    fn best_effort_produce_partial_batch() {
917        let slot_count: u16 = 4;
918        let slot_size: u16 = 32;
919
920        let owned = YCQueueOwnedData::new(slot_count, slot_size);
921        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
922
923        // Publish three slots so the consumer can hold one and block the wrap-around slot.
924        let produce_slots = queue.get_produce_slots(3, false).unwrap();
925        queue.mark_slots_produced(produce_slots).unwrap();
926
927        // Hold one consumed slot to keep ownership on the far side of the ring.
928        let mut consume_slots = queue.get_consume_slots(2, false).unwrap();
929        let first = consume_slots.remove(0);
930        queue.mark_slot_consumed(first).unwrap();
931        let pending = consume_slots
932            .pop()
933            .expect("expect a remaining slot to stay in consumer hands");
934
935        // Best-effort reservation should only hand out the contiguous run before the pending slot.
936        let partial = queue.get_produce_slots(3, true).unwrap();
937        assert_eq!(partial.len(), 2);
938        assert_eq!(partial[0].index, 3);
939        assert_eq!(partial[1].index, 0);
940
941        // Return the slots to keep the queue consistent for the rest of the test harness.
942        queue.mark_slots_produced(partial).unwrap();
943        queue.mark_slot_consumed(pending).unwrap();
944
945        let remaining = queue.get_consume_slots(3, false).unwrap();
946        queue.mark_slots_consumed(remaining).unwrap();
947    }
948
949    #[test]
950    fn best_effort_consume_partial_batch() {
951        let slot_count: u16 = 4;
952        let slot_size: u16 = 32;
953
954        let owned = YCQueueOwnedData::new(slot_count, slot_size);
955        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
956
957        let mut produce = queue.get_produce_slots(2, false).unwrap();
958        let first_ready = produce.remove(0);
959        let second_in_progress = produce
960            .pop()
961            .expect("second slot should be available for deferred publish");
962
963        // No slots have been published yet, so best-effort should report a temporary stall.
964        assert_eq!(
965            queue.get_consume_slots(1, true).unwrap_err(),
966            YCQueueError::SlotNotReady
967        );
968
969        queue.mark_slot_produced(first_ready).unwrap();
970
971        // Only the published slot should be returned even though two were reserved.
972        let mut consume = queue.get_consume_slots(2, true).unwrap();
973        assert_eq!(consume.len(), 1);
974        assert_eq!(consume[0].index, 0);
975
976        // Clean up by finishing both outstanding slots.
977        let ready = consume.pop().unwrap();
978        queue.mark_slot_consumed(ready).unwrap();
979
980        queue.mark_slot_produced(second_in_progress).unwrap();
981        let leftover = queue.get_consume_slot().unwrap();
982        queue.mark_slot_consumed(leftover).unwrap();
983    }
984
985    #[test]
986    fn wrap_test() {
987        let slot_count: u16 = 4;
988        let slot_size: u16 = 32;
989
990        let owned = YCQueueOwnedData::new(slot_count, slot_size);
991        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
992
993        let initial_slots = queue.get_produce_slots(slot_count, false).unwrap();
994        assert_eq!(queue.in_flight_count(), slot_count);
995        assert_eq!(queue.produce_idx(), 0);
996
997        queue.mark_slots_produced(initial_slots).unwrap();
998        assert_eq!(
999            queue.check_owner(0, slot_count, YCQueueOwner::Consumer),
1000            slot_count
1001        );
1002
1003        let first_consumed = queue.get_consume_slots(slot_count, false).unwrap();
1004        assert_eq!(queue.in_flight_count(), 0);
1005        assert_eq!(queue.consume_idx(), 0);
1006
1007        queue.mark_slots_consumed(first_consumed).unwrap();
1008        assert_eq!(
1009            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1010            slot_count
1011        );
1012        assert_eq!(queue.in_flight_count(), 0);
1013        assert_eq!(queue.produce_idx(), 0);
1014        assert_eq!(queue.consume_idx(), 0);
1015
1016        let mut wrap_slots = queue.get_produce_slots(3, false).unwrap();
1017        let start_idx = wrap_slots[0].index;
1018        assert!(start_idx <= slot_count - 3 || start_idx == slot_count - 3);
1019
1020        wrap_slots[0].data.fill(0xAA);
1021        wrap_slots[1].data.fill(0xBB);
1022        wrap_slots[2].data.fill(0xCC);
1023
1024        queue.mark_slots_produced(wrap_slots).unwrap();
1025        assert_eq!(queue.in_flight_count(), 3);
1026
1027        let consumed = queue.get_consume_slots(3, false).unwrap();
1028        let values: Vec<u8> = consumed.iter().map(|slot| slot.data[0]).collect();
1029        assert_eq!(values, vec![0xAA, 0xBB, 0xCC]);
1030        assert_eq!(queue.consume_idx(), (start_idx + 3) % slot_count);
1031
1032        queue.mark_slots_consumed(consumed).unwrap();
1033        assert_eq!(
1034            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1035            slot_count
1036        );
1037        assert_eq!(queue.in_flight_count(), 0);
1038    }
1039
1040    #[test]
1041    fn batched_produce_consume_crossing_word_boundaries() {
1042        let slot_count: u16 = 128;
1043        let slot_size: u16 = 16;
1044        let batch_size: u16 = 67;
1045        let iterations = 5;
1046
1047        let owned = YCQueueOwnedData::new(slot_count, slot_size);
1048        let mut queue = YCQueue::from_owned_data(&owned).unwrap();
1049
1050        assert_eq!(
1051            queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1052            slot_count
1053        );
1054        assert_eq!(queue.in_flight_count(), 0);
1055        assert_eq!(queue.produce_idx(), 0);
1056        assert_eq!(queue.consume_idx(), 0);
1057
1058        for iteration in 0..iterations {
1059            let expected_start = ((batch_size as usize * iteration) % slot_count as usize) as u16;
1060            assert_eq!(queue.produce_idx(), expected_start);
1061
1062            let produce_slots = queue.get_produce_slots(batch_size, false).unwrap();
1063            assert_eq!(produce_slots.len(), batch_size as usize);
1064            assert_eq!(produce_slots[0].index, expected_start);
1065
1066            for (offset, slot) in produce_slots.iter().enumerate() {
1067                let expected_index =
1068                    ((expected_start as usize + offset) % slot_count as usize) as u16;
1069                assert_eq!(slot.index, expected_index);
1070            }
1071
1072            queue.mark_slots_produced(produce_slots).unwrap();
1073            assert_eq!(queue.in_flight_count(), batch_size);
1074
1075            let consume_slots = queue.get_consume_slots(batch_size, false).unwrap();
1076            assert_eq!(consume_slots.len(), batch_size as usize);
1077
1078            for (offset, slot) in consume_slots.iter().enumerate() {
1079                let expected_index =
1080                    ((expected_start as usize + offset) % slot_count as usize) as u16;
1081                assert_eq!(slot.index, expected_index);
1082            }
1083
1084            queue.mark_slots_consumed(consume_slots).unwrap();
1085
1086            let expected_idx =
1087                ((batch_size as usize * (iteration + 1)) % slot_count as usize) as u16;
1088            assert_eq!(queue.in_flight_count(), 0);
1089            assert_eq!(queue.produce_idx(), expected_idx);
1090            assert_eq!(queue.consume_idx(), expected_idx);
1091            assert_eq!(
1092                queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1093                slot_count
1094            );
1095        }
1096    }
1097}