1use std::{cell::Cell, sync::atomic::Ordering};
2
3use crate::queue_meta::YCQueueU64Meta;
4
5use crate::utils::get_bit;
6use crate::{YCQueueError, YCQueueSharedMeta, utils};
7
8#[derive(Debug)]
9pub struct YCQueueProduceSlot<'a> {
10 pub index: u16,
11 pub data: &'a mut [u8],
12}
13
14#[derive(Debug)]
15pub struct YCQueueConsumeSlot<'a> {
16 pub index: u16,
17 pub data: &'a mut [u8],
18}
19
20#[derive(Clone, Copy, Eq, PartialEq, Debug)]
21pub enum YCQueueOwner {
22 Producer,
23 Consumer,
24}
25
26pub struct YCQueue<'a> {
27 shared_metadata: YCQueueSharedMeta<'a>,
28 slots: Vec<Cell<Option<&'a mut [u8]>>>,
29 slot_count: u16,
30 slot_size: u16,
31}
32
33impl<'a> YCQueue<'a> {
34 pub fn new(
54 shared_metadata: YCQueueSharedMeta<'a>,
55 data_region: &'a mut [u8],
56 ) -> Result<YCQueue<'a>, YCQueueError> {
57 let slot_size = shared_metadata.slot_size.load(Ordering::Acquire) as usize;
58 let slot_count = data_region.len() / slot_size;
59
60 if !data_region.len().is_multiple_of(slot_size) {
61 return Err(YCQueueError::InvalidArgs);
62 }
63
64 let mut slots = Vec::<Cell<Option<&'a mut [u8]>>>::with_capacity(slot_count);
65 for slot in data_region.chunks_exact_mut(slot_size) {
66 slots.push(Cell::new(Some(slot)));
67 }
68
69 if shared_metadata.slot_count.load(Ordering::Acquire) as usize != slot_count {
70 return Err(YCQueueError::InvalidArgs);
71 }
72
73 Ok(YCQueue {
74 shared_metadata,
75 slots,
76 slot_count: slot_count as u16,
77 slot_size: slot_size as u16,
78 })
79 }
80
81 fn check_owner(&self, idx: u16, range: u16, owner: YCQueueOwner) -> u16 {
86 if range == 0 || idx >= self.slot_count || range > self.slot_count {
87 return 0;
88 }
89
90 let mut processed: u16 = 0;
91 let mut remaining = range as u32;
92 let mut current = idx as u32;
93 let slot_count = self.slot_count as u32;
94
95 while remaining > 0 {
96 if current >= slot_count {
97 current -= slot_count;
98 }
99
100 let chunk_idx = (current / u64::BITS) as usize;
101 let bit_offset = (current % u64::BITS) as u8;
102 let bits_available = u64::BITS - bit_offset as u32;
103 let slots_until_end = slot_count - current;
104 let span = remaining.min(bits_available).min(slots_until_end);
105 debug_assert!(span > 0);
106
107 let span_mask = if span == u64::BITS {
108 !0u64
109 } else {
110 (1u64 << span) - 1
111 };
112 let value = self.shared_metadata.ownership[chunk_idx].load(Ordering::Acquire);
113 let masked = (value >> bit_offset) & span_mask;
114
115 match owner {
116 YCQueueOwner::Producer => {
117 if masked != 0 {
118 let offset = masked.trailing_zeros() as u16;
119 return processed + offset;
120 }
121 }
122 YCQueueOwner::Consumer => {
123 if masked != span_mask {
124 let missing = (!masked) & span_mask;
125 let offset = missing.trailing_zeros() as u16;
126 return processed + offset;
127 }
128 }
129 }
130
131 processed += span as u16;
132 remaining -= span;
133 current += span;
134 }
135
136 processed
137 }
138
139 fn get_owner(&self, idx: u16) -> YCQueueOwner {
144 let atomic_idx = idx / u64::BITS as u16;
145 let bit_idx = (idx % u64::BITS as u16) as u8;
146 let atomic = &self.shared_metadata.ownership[atomic_idx as usize];
147 let value = atomic.load(Ordering::Acquire);
148
149 match utils::get_bit(&value, bit_idx) {
150 false => YCQueueOwner::Producer,
151 true => YCQueueOwner::Consumer,
152 }
153 }
154
155 fn set_owner(&mut self, idx: u16, owner: YCQueueOwner) -> YCQueueOwner {
159 loop {
160 let atomic_idx = idx / u64::BITS as u16;
161 let bit_idx = (idx % u64::BITS as u16) as u8;
162 let atomic = &self.shared_metadata.ownership[atomic_idx as usize];
163 let value = atomic.load(Ordering::Acquire);
164
165 let new_value = match owner {
166 YCQueueOwner::Producer => utils::clear_bit(&value, bit_idx),
167 YCQueueOwner::Consumer => utils::set_bit(&value, bit_idx),
168 };
169
170 match atomic.compare_exchange(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
171 Ok(_) => {
172 if get_bit(&value, bit_idx) {
173 return YCQueueOwner::Consumer;
174 } else {
175 return YCQueueOwner::Producer;
176 }
177 }
178 Err(_) => continue,
179 }
180 }
181 }
182
183 fn set_owner_range(
188 &mut self,
189 idx: u16,
190 range: u16,
191 owner: YCQueueOwner,
192 ) -> Result<(), YCQueueError> {
193 if range == 0 {
194 return Ok(());
195 }
196 if idx >= self.slot_count || range > self.slot_count {
197 return Err(YCQueueError::InvalidArgs);
198 }
199 let mut remaining = range as u32;
200 let mut current = idx as u32;
201 let slot_count = self.slot_count as u32;
202
203 while remaining > 0 {
204 if current >= slot_count {
206 current -= slot_count;
208 }
209
210 let chunk_idx = (current / u64::BITS) as usize;
211 let bit_offset = (current % u64::BITS) as u8;
212 let bits_available = u64::BITS - bit_offset as u32;
213 let slots_until_end = slot_count - current;
214 let span = remaining.min(bits_available).min(slots_until_end);
215 debug_assert!(span > 0);
216
217 let mask = if span == u64::BITS {
218 !0u64
219 } else {
220 ((1u64 << span) - 1) << bit_offset
221 };
222
223 loop {
224 let value = self.shared_metadata.ownership[chunk_idx].load(Ordering::Acquire);
225
226 let new_value = match owner {
227 YCQueueOwner::Producer => {
228 debug_assert_eq!(value & mask, mask);
229 value & !mask
230 }
231 YCQueueOwner::Consumer => {
232 debug_assert_eq!(value & mask, 0);
233 value | mask
234 }
235 };
236
237 match self.shared_metadata.ownership[chunk_idx].compare_exchange(
238 value,
239 new_value,
240 Ordering::AcqRel,
241 Ordering::Acquire,
242 ) {
243 Ok(_) => break,
244 Err(_) => continue,
245 }
246 }
247
248 remaining -= span;
249 current += span;
250 }
251
252 Ok(())
253 }
254
255 fn get_u64_meta(&self) -> YCQueueU64Meta {
257 YCQueueU64Meta::from_u64(self.shared_metadata.u64_meta.load(Ordering::Acquire))
258 }
259
260 #[inline]
280 pub fn in_flight_count(&self) -> u16 {
281 self.get_u64_meta().in_flight
282 }
283
284 #[inline]
289 pub fn produce_idx(&self) -> u16 {
290 self.get_u64_meta().produce_idx
291 }
292
293 #[inline]
298 pub fn consume_idx(&self) -> u16 {
299 self.get_u64_meta().consume_idx
300 }
301
302 #[inline]
304 pub fn capacity(&self) -> u16 {
305 self.slot_count
306 }
307
308 #[inline]
343 pub fn get_produce_slots(
344 &mut self,
345 mut num_slots: u16,
346 best_effort: bool,
347 ) -> Result<Vec<YCQueueProduceSlot<'a>>, YCQueueError> {
348 if num_slots == 0 || num_slots > self.slot_count {
349 return Err(YCQueueError::InvalidArgs);
350 }
351
352 let start_index = loop {
353 let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
354 let mut meta = YCQueueU64Meta::from_u64(value);
355
356 if meta.in_flight as u32 + num_slots as u32 > self.slot_count as u32 {
357 return Err(YCQueueError::OutOfSpace);
358 }
359
360 let available_slots =
362 self.check_owner(meta.produce_idx, num_slots, YCQueueOwner::Producer);
363
364 if (!best_effort && available_slots != num_slots)
365 || (best_effort && available_slots == 0)
366 {
367 return Err(YCQueueError::SlotNotReady);
368 }
369
370 debug_assert!(available_slots > 0);
371 debug_assert!(available_slots <= num_slots);
372
373 num_slots = available_slots;
374 let produce_idx = meta.produce_idx;
375 meta.in_flight += num_slots;
376 meta.produce_idx += num_slots;
377 if meta.produce_idx >= self.slot_count {
379 meta.produce_idx -= self.slot_count;
381 }
382
383 let new_value = meta.to_u64();
384 match self.shared_metadata.u64_meta.compare_exchange(
385 value,
386 new_value,
387 Ordering::AcqRel,
388 Ordering::Acquire,
389 ) {
390 Ok(_) => break produce_idx,
391 Err(_) => continue,
392 }
393 };
394
395 let mut slots = Vec::with_capacity(num_slots as usize);
396 let mut index = start_index;
397 for _ in 0..num_slots {
398 debug_assert_eq!(self.get_owner(index), YCQueueOwner::Producer);
399
400 let slot_data = self.slots[index as usize].replace(None);
401 match slot_data {
402 Some(data) => slots.push(YCQueueProduceSlot { index, data }),
403 None => panic!("We double-loaned out produce index {index:?}"),
404 }
405
406 index += 1;
407 if index >= self.slot_count {
409 index -= self.slot_count;
411 }
412 }
413
414 Ok(slots)
415 }
416
417 #[inline]
440 pub fn get_produce_slot(&mut self) -> Result<YCQueueProduceSlot<'a>, YCQueueError> {
441 let mut slots = self.get_produce_slots(1, false)?;
442
443 Ok(slots
444 .pop()
445 .expect("get_produce_slots(1, false) returned without a slot"))
446 }
447
448 pub fn mark_slot_produced(
473 &mut self,
474 queue_slot: YCQueueProduceSlot<'a>,
475 ) -> Result<(), YCQueueError> {
476 if queue_slot.data.len() != self.slot_size as usize {
482 return Err(YCQueueError::InvalidArgs);
483 }
484
485 let produce_idx = queue_slot.index;
487 let old_data = self.slots[produce_idx as usize].replace(Some(queue_slot.data));
488
489 debug_assert_eq!(old_data, None);
490
491 let old_owner = self.set_owner(produce_idx, YCQueueOwner::Consumer);
493 debug_assert_eq!(old_owner, YCQueueOwner::Producer);
494
495 Ok(())
496 }
497
498 pub fn mark_slots_produced(
523 &mut self,
524 queue_slots: Vec<YCQueueProduceSlot<'a>>,
525 ) -> Result<(), YCQueueError> {
526 if queue_slots.is_empty() {
527 return Ok(());
528 }
529
530 let slot_size = self.slot_size as usize;
531 let count = queue_slots.len();
532 if count > self.slot_count as usize {
533 return Err(YCQueueError::InvalidArgs);
534 }
535
536 let start_index = queue_slots[0].index;
537 let slot_count = self.slot_count as usize;
538
539 for (offset, slot) in queue_slots.iter().enumerate() {
540 if slot.data.len() != slot_size {
541 return Err(YCQueueError::InvalidArgs);
542 }
543
544 let expected = ((start_index as usize + offset) % slot_count) as u16;
545 if slot.index != expected {
546 return Err(YCQueueError::InvalidArgs);
547 }
548 }
549
550 for slot in queue_slots.into_iter() {
551 let old_data = self.slots[slot.index as usize].replace(Some(slot.data));
552 debug_assert!(old_data.is_none());
553 }
554
555 self.set_owner_range(start_index, count as u16, YCQueueOwner::Consumer)
556 }
557
558 #[inline]
594 pub fn get_consume_slots(
595 &mut self,
596 mut num_slots: u16,
597 best_effort: bool,
598 ) -> Result<Vec<YCQueueConsumeSlot<'a>>, YCQueueError> {
599 if num_slots == 0 || num_slots > self.slot_count {
600 return Err(YCQueueError::InvalidArgs);
601 }
602
603 let start_index = loop {
604 let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
605 let mut meta = YCQueueU64Meta::from_u64(value);
606
607 if meta.in_flight < num_slots {
608 return Err(YCQueueError::EmptyQueue);
609 }
610
611 let available_slots =
612 self.check_owner(meta.consume_idx, num_slots, YCQueueOwner::Consumer);
613 if (!best_effort && available_slots != num_slots)
614 || (best_effort && available_slots == 0)
615 {
616 return Err(YCQueueError::SlotNotReady);
617 }
618
619 debug_assert!(available_slots > 0);
620 debug_assert!(available_slots <= num_slots);
621 num_slots = available_slots;
622
623 let consume_idx = meta.consume_idx;
624 let mut next_idx = consume_idx as u32 + num_slots as u32;
625 if next_idx >= self.slot_count as u32 {
626 next_idx -= self.slot_count as u32;
627 }
628 meta.consume_idx = next_idx as u16;
629 meta.in_flight -= num_slots;
630
631 let new_value = meta.to_u64();
632 match self.shared_metadata.u64_meta.compare_exchange(
633 value,
634 new_value,
635 Ordering::AcqRel,
636 Ordering::Acquire,
637 ) {
638 Ok(_) => break consume_idx,
639 Err(_) => continue,
640 }
641 };
642
643 let mut slots = Vec::with_capacity(num_slots as usize);
644 let mut index = start_index;
645 for _ in 0..num_slots {
646 debug_assert_eq!(self.get_owner(index), YCQueueOwner::Consumer);
647
648 let slot_data = self.slots[index as usize].replace(None);
649 match slot_data {
650 Some(data) => slots.push(YCQueueConsumeSlot { index, data }),
651 None => panic!("We double-loaned out consume index {index:?}"),
652 }
653
654 index += 1;
655 if index >= self.slot_count {
656 index -= self.slot_count;
657 }
658 }
659
660 Ok(slots)
661 }
662
663 pub fn get_consume_slot(&mut self) -> Result<YCQueueConsumeSlot<'a>, YCQueueError> {
687 let mut slots = self.get_consume_slots(1, false)?;
688
689 Ok(slots
690 .pop()
691 .expect("get_consume_slots(1, false) returned without a slot"))
692 }
693
694 pub fn mark_slot_consumed(
721 &mut self,
722 queue_slot: YCQueueConsumeSlot<'a>,
723 ) -> Result<(), YCQueueError> {
724 if queue_slot.data.len() != self.slot_size as usize {
725 return Err(YCQueueError::InvalidArgs);
726 }
727
728 let consume_idx = queue_slot.index;
730 let old_data = self.slots[consume_idx as usize].replace(Some(queue_slot.data));
731
732 debug_assert_eq!(old_data, None);
733
734 let old_owner = self.set_owner(consume_idx, YCQueueOwner::Producer);
736 debug_assert_eq!(old_owner, YCQueueOwner::Consumer);
737
738 Ok(())
739 }
740
741 pub fn mark_slots_consumed(
768 &mut self,
769 queue_slots: Vec<YCQueueConsumeSlot<'a>>,
770 ) -> Result<(), YCQueueError> {
771 if queue_slots.is_empty() {
772 return Ok(());
773 }
774
775 let slot_size = self.slot_size as usize;
776 let count = queue_slots.len();
777 if count > self.slot_count as usize {
778 return Err(YCQueueError::InvalidArgs);
779 }
780
781 let start_index = queue_slots[0].index;
782 let slot_count = self.slot_count as usize;
783
784 for (offset, slot) in queue_slots.iter().enumerate() {
785 if slot.data.len() != slot_size {
786 return Err(YCQueueError::InvalidArgs);
787 }
788
789 let expected = ((start_index as usize + offset) % slot_count) as u16;
790 if slot.index != expected {
791 return Err(YCQueueError::InvalidArgs);
792 }
793 }
794
795 for slot in queue_slots.into_iter() {
796 let old_data = self.slots[slot.index as usize].replace(Some(slot.data));
797 debug_assert!(old_data.is_none());
798 }
799
800 self.set_owner_range(start_index, count as u16, YCQueueOwner::Producer)
801 }
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807 use crate::YCQueueError;
808 use crate::queue_alloc_helpers::YCQueueOwnedData;
809
810 #[test]
811 fn simple_produce_consume_test() {
812 let slot_count: u16 = 4;
813 let slot_size: u16 = 32;
814
815 let owned = YCQueueOwnedData::new(slot_count, slot_size);
816 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
817
818 assert_eq!(
819 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
820 slot_count
821 );
822 assert_eq!(queue.in_flight_count(), 0);
823 assert_eq!(queue.produce_idx(), 0);
824 assert_eq!(queue.consume_idx(), 0);
825
826 let slot = queue.get_produce_slot().unwrap();
827 assert_eq!(slot.index, 0);
828 assert_eq!(queue.produce_idx(), 1);
829 assert_eq!(queue.in_flight_count(), 1);
830 assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Producer), 1u16);
831
832 slot.data.fill(0xAB);
833 queue.mark_slot_produced(slot).unwrap();
834 assert_eq!(queue.in_flight_count(), 1);
835 assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Consumer), 1u16);
836
837 let consume_slot = queue.get_consume_slot().unwrap();
838 assert_eq!(consume_slot.index, 0);
839 assert_eq!(queue.consume_idx(), 1);
840 assert_eq!(queue.in_flight_count(), 0);
841 assert_eq!(queue.check_owner(0, 1, YCQueueOwner::Consumer), 1u16);
842
843 queue.mark_slot_consumed(consume_slot).unwrap();
844 assert_eq!(
845 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
846 slot_count
847 );
848 assert_eq!(queue.in_flight_count(), 0);
849 assert_eq!(queue.produce_idx(), 1);
850 assert_eq!(queue.consume_idx(), 1);
851 }
852
853 #[test]
854 fn batched_produce_consume_test() {
855 let slot_count: u16 = 8;
856 let slot_size: u16 = 64;
857
858 let owned = YCQueueOwnedData::new(slot_count, slot_size);
859 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
860
861 assert_eq!(
862 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
863 slot_count
864 );
865 assert_eq!(queue.in_flight_count(), 0);
866 assert_eq!(queue.produce_idx(), 0);
867 assert_eq!(queue.consume_idx(), 0);
868
869 let produce_batch = 3;
870 let produce_slots = queue.get_produce_slots(produce_batch, false).unwrap();
871 let produced_indices: Vec<_> = produce_slots.iter().map(|slot| slot.index).collect();
872 assert_eq!(produced_indices, vec![0, 1, 2]);
873 assert_eq!(queue.in_flight_count(), produce_batch);
874 assert_eq!(queue.produce_idx(), produce_batch);
875 assert_eq!(
876 queue.check_owner(0, produce_batch, YCQueueOwner::Producer),
877 produce_batch
878 );
879
880 queue.mark_slots_produced(produce_slots).unwrap();
881 assert_eq!(
882 queue.check_owner(0, produce_batch, YCQueueOwner::Consumer),
883 produce_batch
884 );
885 assert_eq!(queue.in_flight_count(), produce_batch);
886
887 let consume_slots_first = queue.get_consume_slots(2, false).unwrap();
888 let consumed_indices: Vec<_> = consume_slots_first.iter().map(|slot| slot.index).collect();
889 assert_eq!(consumed_indices, vec![0, 1]);
890 assert_eq!(queue.consume_idx(), 2);
891 assert_eq!(queue.in_flight_count(), 1);
892 assert_eq!(queue.check_owner(0, 2, YCQueueOwner::Consumer), 2u16);
893 assert_eq!(queue.check_owner(2, 1, YCQueueOwner::Consumer), 1u16);
894
895 queue.mark_slots_consumed(consume_slots_first).unwrap();
896 assert_eq!(queue.check_owner(0, 2, YCQueueOwner::Producer), 2u16);
897 assert_eq!(queue.check_owner(2, 1, YCQueueOwner::Consumer), 1u16);
898 assert_eq!(queue.in_flight_count(), 1);
899
900 let final_slot = queue.get_consume_slots(1, false).unwrap();
901 assert_eq!(final_slot[0].index, 2);
902 assert_eq!(queue.in_flight_count(), 0);
903 assert_eq!(queue.consume_idx(), 3);
904
905 queue.mark_slots_consumed(final_slot).unwrap();
906 assert_eq!(
907 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
908 slot_count
909 );
910 assert_eq!(queue.in_flight_count(), 0);
911 assert_eq!(queue.produce_idx(), 3);
912 assert_eq!(queue.consume_idx(), 3);
913 }
914
915 #[test]
916 fn best_effort_produce_partial_batch() {
917 let slot_count: u16 = 4;
918 let slot_size: u16 = 32;
919
920 let owned = YCQueueOwnedData::new(slot_count, slot_size);
921 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
922
923 let produce_slots = queue.get_produce_slots(3, false).unwrap();
925 queue.mark_slots_produced(produce_slots).unwrap();
926
927 let mut consume_slots = queue.get_consume_slots(2, false).unwrap();
929 let first = consume_slots.remove(0);
930 queue.mark_slot_consumed(first).unwrap();
931 let pending = consume_slots
932 .pop()
933 .expect("expect a remaining slot to stay in consumer hands");
934
935 let partial = queue.get_produce_slots(3, true).unwrap();
937 assert_eq!(partial.len(), 2);
938 assert_eq!(partial[0].index, 3);
939 assert_eq!(partial[1].index, 0);
940
941 queue.mark_slots_produced(partial).unwrap();
943 queue.mark_slot_consumed(pending).unwrap();
944
945 let remaining = queue.get_consume_slots(3, false).unwrap();
946 queue.mark_slots_consumed(remaining).unwrap();
947 }
948
949 #[test]
950 fn best_effort_consume_partial_batch() {
951 let slot_count: u16 = 4;
952 let slot_size: u16 = 32;
953
954 let owned = YCQueueOwnedData::new(slot_count, slot_size);
955 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
956
957 let mut produce = queue.get_produce_slots(2, false).unwrap();
958 let first_ready = produce.remove(0);
959 let second_in_progress = produce
960 .pop()
961 .expect("second slot should be available for deferred publish");
962
963 assert_eq!(
965 queue.get_consume_slots(1, true).unwrap_err(),
966 YCQueueError::SlotNotReady
967 );
968
969 queue.mark_slot_produced(first_ready).unwrap();
970
971 let mut consume = queue.get_consume_slots(2, true).unwrap();
973 assert_eq!(consume.len(), 1);
974 assert_eq!(consume[0].index, 0);
975
976 let ready = consume.pop().unwrap();
978 queue.mark_slot_consumed(ready).unwrap();
979
980 queue.mark_slot_produced(second_in_progress).unwrap();
981 let leftover = queue.get_consume_slot().unwrap();
982 queue.mark_slot_consumed(leftover).unwrap();
983 }
984
985 #[test]
986 fn wrap_test() {
987 let slot_count: u16 = 4;
988 let slot_size: u16 = 32;
989
990 let owned = YCQueueOwnedData::new(slot_count, slot_size);
991 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
992
993 let initial_slots = queue.get_produce_slots(slot_count, false).unwrap();
994 assert_eq!(queue.in_flight_count(), slot_count);
995 assert_eq!(queue.produce_idx(), 0);
996
997 queue.mark_slots_produced(initial_slots).unwrap();
998 assert_eq!(
999 queue.check_owner(0, slot_count, YCQueueOwner::Consumer),
1000 slot_count
1001 );
1002
1003 let first_consumed = queue.get_consume_slots(slot_count, false).unwrap();
1004 assert_eq!(queue.in_flight_count(), 0);
1005 assert_eq!(queue.consume_idx(), 0);
1006
1007 queue.mark_slots_consumed(first_consumed).unwrap();
1008 assert_eq!(
1009 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1010 slot_count
1011 );
1012 assert_eq!(queue.in_flight_count(), 0);
1013 assert_eq!(queue.produce_idx(), 0);
1014 assert_eq!(queue.consume_idx(), 0);
1015
1016 let mut wrap_slots = queue.get_produce_slots(3, false).unwrap();
1017 let start_idx = wrap_slots[0].index;
1018 assert!(start_idx <= slot_count - 3 || start_idx == slot_count - 3);
1019
1020 wrap_slots[0].data.fill(0xAA);
1021 wrap_slots[1].data.fill(0xBB);
1022 wrap_slots[2].data.fill(0xCC);
1023
1024 queue.mark_slots_produced(wrap_slots).unwrap();
1025 assert_eq!(queue.in_flight_count(), 3);
1026
1027 let consumed = queue.get_consume_slots(3, false).unwrap();
1028 let values: Vec<u8> = consumed.iter().map(|slot| slot.data[0]).collect();
1029 assert_eq!(values, vec![0xAA, 0xBB, 0xCC]);
1030 assert_eq!(queue.consume_idx(), (start_idx + 3) % slot_count);
1031
1032 queue.mark_slots_consumed(consumed).unwrap();
1033 assert_eq!(
1034 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1035 slot_count
1036 );
1037 assert_eq!(queue.in_flight_count(), 0);
1038 }
1039
1040 #[test]
1041 fn batched_produce_consume_crossing_word_boundaries() {
1042 let slot_count: u16 = 128;
1043 let slot_size: u16 = 16;
1044 let batch_size: u16 = 67;
1045 let iterations = 5;
1046
1047 let owned = YCQueueOwnedData::new(slot_count, slot_size);
1048 let mut queue = YCQueue::from_owned_data(&owned).unwrap();
1049
1050 assert_eq!(
1051 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1052 slot_count
1053 );
1054 assert_eq!(queue.in_flight_count(), 0);
1055 assert_eq!(queue.produce_idx(), 0);
1056 assert_eq!(queue.consume_idx(), 0);
1057
1058 for iteration in 0..iterations {
1059 let expected_start = ((batch_size as usize * iteration) % slot_count as usize) as u16;
1060 assert_eq!(queue.produce_idx(), expected_start);
1061
1062 let produce_slots = queue.get_produce_slots(batch_size, false).unwrap();
1063 assert_eq!(produce_slots.len(), batch_size as usize);
1064 assert_eq!(produce_slots[0].index, expected_start);
1065
1066 for (offset, slot) in produce_slots.iter().enumerate() {
1067 let expected_index =
1068 ((expected_start as usize + offset) % slot_count as usize) as u16;
1069 assert_eq!(slot.index, expected_index);
1070 }
1071
1072 queue.mark_slots_produced(produce_slots).unwrap();
1073 assert_eq!(queue.in_flight_count(), batch_size);
1074
1075 let consume_slots = queue.get_consume_slots(batch_size, false).unwrap();
1076 assert_eq!(consume_slots.len(), batch_size as usize);
1077
1078 for (offset, slot) in consume_slots.iter().enumerate() {
1079 let expected_index =
1080 ((expected_start as usize + offset) % slot_count as usize) as u16;
1081 assert_eq!(slot.index, expected_index);
1082 }
1083
1084 queue.mark_slots_consumed(consume_slots).unwrap();
1085
1086 let expected_idx =
1087 ((batch_size as usize * (iteration + 1)) % slot_count as usize) as u16;
1088 assert_eq!(queue.in_flight_count(), 0);
1089 assert_eq!(queue.produce_idx(), expected_idx);
1090 assert_eq!(queue.consume_idx(), expected_idx);
1091 assert_eq!(
1092 queue.check_owner(0, slot_count, YCQueueOwner::Producer),
1093 slot_count
1094 );
1095 }
1096 }
1097}