#[doc(hidden)]
#[derive(Debug)]
pub struct XskRingQueue<XRQK: XskRingQueueKind, D: Descriptor>
{
ring_queue_depth: RingQueueDepth,
producer: NonNull<u32>,
cached_producer: Cell<u32>,
consumer: NonNull<u32>,
cached_consumer: Cell<u32>,
ring: *mut D,
flags: NonNull<u32>,
memory: MappedMemory,
marker: PhantomData<(XRQK, D)>,
}
impl<XRQK: XskRingQueueKind, D: Descriptor> XskRingQueue<XRQK, D>
{
#[inline(always)]
pub(crate) fn number_of_frames_to_transmit_is_within_or_at_capacity(&self, number_of_frames_to_transmit: NonZeroU32) -> bool
{
number_of_frames_to_transmit.get() <= (self.ring_queue_depth as u32)
}
#[inline(always)]
fn ring_entry(&self, index: RingQueueEntryIndex) -> &D
{
unsafe { & * self.ring_entry_raw(index) }
}
#[inline(always)]
fn ring_entry_mut(&self, index: RingQueueEntryIndex) -> NonNull<D>
{
new_non_null(unsafe { self.ring_entry_raw(index) })
}
#[inline(always)]
unsafe fn ring_entry_raw(&self, index: RingQueueEntryIndex) -> *mut D
{
self.ring.add(self.array_index(index))
}
#[inline(always)]
fn from_ring_queue_offsets(express_data_path_socket_file_descriptor: &ExpressDataPathSocketFileDescriptor, ring_queue_offsets: &xdp_ring_offset, ring_queue_depth: RingQueueDepth, offset: u64) -> Self
{
let page_size_or_huge_page_size_settings = PageSizeOrHugePageSizeSettings::for_default_page_size();
let length = ring_queue_offsets.length_of_memory_to_map::<D>(ring_queue_depth);
let memory = MappedMemory::from_file(express_data_path_socket_file_descriptor, offset, length, AddressHint::any(), Protection::ReadWrite, Sharing::Shared, true, false, &page_size_or_huge_page_size_settings).expect("Could not memory map XDP fill ring queue");
let producer = ring_queue_offsets.producer_pointer(&memory);
let consumer = ring_queue_offsets.consumer_pointer(&memory);
Self
{
ring_queue_depth,
producer,
cached_producer: Cell::new(Self::dereference_u32_pointer(producer)),
consumer,
cached_consumer: Cell::new
(
Self::dereference_u32_pointer(consumer) + if XRQK::UseRingQueueDepthForConsumer
{
ring_queue_depth as u32
}
else
{
0
}
),
ring: ring_queue_offsets.ring_pointer(&memory),
flags: ring_queue_offsets.flags_pointer(&memory),
memory,
marker: PhantomData,
}
}
#[inline(always)]
fn array_index(&self, index: RingQueueEntryIndex) -> usize
{
(index.0 & self.mask()) as usize
}
#[inline(always)]
fn set_cached_producer(&self, value: u32)
{
self.cached_producer.set(value)
}
#[inline(always)]
fn cached_producer(&self) -> u32
{
self.cached_producer.get()
}
#[inline(always)]
fn set_cached_consumer(&self, value: u32)
{
self.cached_consumer.set(value)
}
#[inline(always)]
fn cached_consumer(&self) -> u32
{
self.cached_consumer.get()
}
#[inline(always)]
fn set_producer(&self, value: u32)
{
Self::set_u32_pointer(self.producer, value)
}
#[inline(always)]
fn producer(&self) -> u32
{
Self::dereference_u32_pointer(self.producer)
}
#[inline(always)]
fn set_consumer(&self, value: u32)
{
Self::set_u32_pointer(self.consumer, value)
}
#[inline(always)]
fn consumer(&self) -> u32
{
Self::dereference_u32_pointer(self.consumer)
}
#[inline(always)]
fn dereference_u32_pointer(u32_pointer: NonNull<u32>) -> u32
{
unsafe { *u32_pointer.as_ptr() }
}
#[inline(always)]
fn set_u32_pointer(u32_pointer: NonNull<u32>, value: u32)
{
unsafe { *u32_pointer.as_ptr() = value }
}
#[inline(always)]
fn mask(&self) -> u32
{
self.ring_queue_depth.mask()
}
#[inline(always)]
fn flags(&self) -> u32
{
unsafe { *self.flags.as_ref() }
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn libbpf_smp_rmb()
{
Self::memory_barrier()
}
#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn libbpf_smp_rmb()
{
unsafe
{
llvm_asm!
(
"
dmb ishld
"
:
:
:
"memory"
:
"volatile"
);
}
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn libbpf_smp_wmb()
{
Self::memory_barrier()
}
#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn libbpf_smp_wmb()
{
unsafe
{
llvm_asm!
(
"
dmb ishst
"
:
:
:
"memory"
:
"volatile"
);
}
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn libbpf_smp_rwmb()
{
Self::memory_barrier()
}
#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn libbpf_smp_rwmb()
{
Self::memory_barrier()()
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn memory_barrier()
{
unsafe
{
llvm_asm!
(
""
:
:
:
"memory"
:
"volatile"
);
}
}
#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn memory_barrier()
{
unsafe
{
llvm_asm!
(
"
dmb ish
"
:
:
:
"memory"
:
"volatile"
);
}
}
}
impl<D: Descriptor> XskRingQueue<ProducerXskRingQueueKind, D>
{
#[inline(always)]
pub(super) fn needs_wake_up(&self) -> bool
{
self.flags() & XDP_RING_NEED_WAKEUP != 0
}
#[inline(always)]
pub(super) fn number_free(&self, number: u32) -> u32
{
let cached_producer = self.cached_producer();
let cached_consumer = self.cached_consumer();
debug_assert!(cached_consumer >= cached_producer, "cached_consumer is less than cached_producer");
let free_entries = cached_consumer - cached_producer;
if free_entries >= number
{
return free_entries
}
let cached_consumer = self.consumer() + self.ring_queue_depth as u32;
self.set_cached_consumer(cached_consumer);
debug_assert!(cached_consumer >= cached_producer, "cached_consumer is less than cached_producer");
cached_consumer - cached_producer
}
#[inline(always)]
pub(super) fn reserve(&self, number: NonZeroU32) -> Option<RingQueueIndex>
{
if self.number_free(number.get()) < number.get()
{
None
}
else
{
let cached_producer = self.cached_producer();
let index = cached_producer;
self.set_cached_producer(cached_producer + number.get());
Some(RingQueueIndex(index))
}
}
#[inline(always)]
pub(super) fn submit(&self, number: NonZeroU32)
{
Self::libbpf_smp_wmb();
self.set_producer(self.producer() + number.get());
}
}
impl<D: Descriptor> XskRingQueue<ConsumerXskRingQueueKind, D>
{
#[inline(always)]
fn number_available(&self, number: NonZeroU32) -> u32
{
let cached_producer = self.cached_producer();
let cached_consumer = self.cached_consumer();
debug_assert!(cached_producer >= cached_consumer, "cached_consumer is less than cached_producer");
let mut entries = cached_producer - cached_consumer;
if entries == 0
{
let new_cached_producer = self.producer();
self.set_cached_producer(new_cached_producer);
entries = new_cached_producer - cached_consumer;
}
min(number.get(), entries)
}
#[inline(always)]
pub(super) fn peek(&self, number: NonZeroU32) -> Option<(NonZeroU32, RingQueueIndex)>
{
let entries = self.number_available(number);
if entries == 0
{
None
}
else
{
Self::libbpf_smp_rmb();
let cached_consumer = self.cached_consumer();
let index = cached_consumer;
self.set_cached_consumer(cached_consumer + entries);
Some((new_non_zero_u32(entries), RingQueueIndex(index)))
}
}
#[inline(always)]
pub(super) fn release(&self, number: NonZeroU32)
{
Self::libbpf_smp_rwmb();
self.set_consumer(self.consumer() + number.get())
}
}