use super::super::super::{
instruments::Instruments,
ogre_queues::{
OgreQueue,
OgreBlockingQueue,
atomic::atomic_zero_copy::AtomicZeroCopy,
meta_publisher::MetaPublisher,
meta_subscriber::MetaSubscriber,
meta_container::MetaContainer,
},
};
use std::{
fmt::Debug,
sync::atomic::{AtomicU64,Ordering::Relaxed},
time::Duration,
};
use parking_lot::{
RawMutex,
lock_api::{
RawMutex as _RawMutex,
RawMutexTimed,
},
};
use log::{trace};
use crate::ogre_std::ogre_alloc::ogre_array_pool_allocator::OgreArrayPoolAllocator;
#[repr(C,align(64))] pub struct BlockingQueue<SlotType: Copy+Debug + Send + Sync,
const BUFFER_SIZE: usize,
const LOCK_TIMEOUT_MILLIS: usize = 0,
const INSTRUMENTS: usize = 0> {
enqueue_count: AtomicU64,
queue_full_count: AtomicU64,
full_guard: RawMutex,
base_queue: AtomicZeroCopy<SlotType, OgreArrayPoolAllocator<SlotType, super::atomic_move::AtomicMove<u32, BUFFER_SIZE>, BUFFER_SIZE>, BUFFER_SIZE>,
empty_guard: RawMutex,
dequeue_count: AtomicU64,
queue_empty_count: AtomicU64,
queue_name: String,
}
impl<SlotType: Copy+Debug + Send + Sync,
const BUFFER_SIZE: usize,
const LOCK_TIMEOUT_MILLIS: usize,
const INSTRUMENTS: usize>
BlockingQueue<SlotType, BUFFER_SIZE, LOCK_TIMEOUT_MILLIS, INSTRUMENTS> {
const TRY_LOCK_DURATION: Duration = Duration::from_millis(LOCK_TIMEOUT_MILLIS as u64);
#[inline(always)]
fn report_empty(&self) -> bool {
if Instruments::from(INSTRUMENTS).metrics() {
self.queue_empty_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
self.empty_guard.try_lock();
if LOCK_TIMEOUT_MILLIS == 0 {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is empty. Waiting for a new element (indefinitely) so DEQUEUE may proceed...", self.queue_name);
}
self.empty_guard.lock();
} else {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is empty. Waiting for a new element (for up to {}ms) so DEQUEUE may proceed...", self.queue_name, LOCK_TIMEOUT_MILLIS);
}
if !self.empty_guard.try_lock_for(Self::TRY_LOCK_DURATION) {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("Blocking QUEUE '{}', said to be empty, waited too much for an element to be available so it could be dequeued. Bailing out after waiting for ~{}ms", self.queue_name, LOCK_TIMEOUT_MILLIS);
}
return false;
}
}
true
}
#[inline(always)]
fn report_no_longer_empty(&self) {
if self.empty_guard.is_locked() {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("Blocking QUEUE '{}' is said to have just come out of the EMPTY state...", self.queue_name);
}
unsafe { self.empty_guard.unlock() };
}
}
#[inline(always)]
fn report_full(&self) -> bool {
if Instruments::from(INSTRUMENTS).metrics() {
self.queue_full_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
self.full_guard.try_lock();
if LOCK_TIMEOUT_MILLIS == 0 {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is full. Waiting for a free slot (indefinitely) so ENQUEUE may proceed...", self.queue_name);
}
self.full_guard.lock();
} else {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is full. Waiting for a free slot (for up to {}ms) so ENQUEUE may proceed...", self.queue_name, LOCK_TIMEOUT_MILLIS);
}
if !self.full_guard.try_lock_for(Self::TRY_LOCK_DURATION) {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("Blocking QUEUE '{}', said to be full, waited too much for a free slot to enqueue a new element. Bailing out after waiting for ~{}ms", self.queue_name, LOCK_TIMEOUT_MILLIS);
}
return false;
}
}
true
}
#[inline(always)]
fn report_no_longer_full(&self) {
if self.full_guard.is_locked() {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("Blocking QUEUE '{}' is said to have just come out of the FULL state...", self.queue_name);
}
unsafe { self.full_guard.unlock() }
}
}
#[inline(always)]
fn metrics_diagnostics(&self) {
let locked_for_enqueueing = self.full_guard.is_locked();
let locked_for_dequeueing = self.empty_guard.is_locked();
let len = self.len();
let enqueue_count = self.enqueue_count.load(Relaxed);
let queue_full_count = self.queue_full_count.load(Relaxed);
let dequeue_count = self.dequeue_count.load(Relaxed);
let queue_empty_count = self.queue_empty_count.load(Relaxed);
if (enqueue_count + queue_full_count + dequeue_count + queue_empty_count) % (1<<20) == 0 {
println!("Atomic BlockingQueue '{}' state:", self.queue_name);
println!(" CONTENTS: {:12} elements, {:12} buffer -- locks: enqueueing={locked_for_enqueueing}; dequeueing={locked_for_dequeueing}", len, BUFFER_SIZE);
println!(" PRODUCTION: {:12} successful, {:12} reported queue was full", enqueue_count, queue_full_count);
println!(" CONSUMPTION: {:12} successful, {:12} reported queue was empty", dequeue_count, queue_empty_count);
}
}
}
impl<SlotType: Copy+Debug + Send + Sync,
const BUFFER_SIZE: usize,
const LOCK_TIMEOUT_MILLIS: usize,
const INSTRUMENTS: usize>
OgreQueue<SlotType>
for BlockingQueue<SlotType, BUFFER_SIZE, LOCK_TIMEOUT_MILLIS, INSTRUMENTS> {
fn new<IntoString: Into<String>>(queue_name: IntoString) -> Self {
let instance = Self {
enqueue_count: AtomicU64::new(0),
queue_full_count: AtomicU64::new(0),
full_guard: RawMutex::INIT,
base_queue: AtomicZeroCopy::new(),
empty_guard: RawMutex::INIT,
dequeue_count: AtomicU64::new(0),
queue_empty_count: AtomicU64::new(0),
queue_name: queue_name.into(),
};
instance.full_guard.try_lock();
instance.empty_guard.try_lock();
instance
}
#[inline(always)]
fn enqueue(&self, element: SlotType) -> Option<SlotType> {
let mut some_element = Some(element);
loop {
let element = some_element.unwrap();
some_element = match self.base_queue.publish_movable(element) {
( Some(len_after), _none_element ) => {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' enqueued element '{:?}'", self.queue_name, element);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.enqueue_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
if len_after.get() == 1 {
self.report_no_longer_empty();
}
break None
},
(None, some_element) => {
if !self.report_full() {
break some_element
} else {
some_element
}
},
};
}
}
#[inline(always)]
fn dequeue(&self) -> Option<SlotType> {
self.base_queue.consume(|slot| *slot,
|| self.report_empty(),
|len| {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' dequeued an element", self.queue_name);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.dequeue_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
if len == self.base_queue.max_size() as i32 - 1 {
self.report_no_longer_full();
}
})
}
#[inline(always)]
fn len(&self) -> usize {
self.base_queue.available_elements_count()
}
#[inline(always)]
fn max_size(&self) -> usize {
self.base_queue.max_size()
}
fn debug_enabled(&self) -> bool {
Instruments::from(INSTRUMENTS).tracing()
}
fn metrics_enabled(&self) -> bool {
Instruments::from(INSTRUMENTS).metrics()
}
fn queue_name(&self) -> &str {
self.queue_name.as_str()
}
fn implementation_name(&self) -> &str {
"Atomic/Parking-lot-Blocking Queue"
}
fn interrupt(&self) {
unsafe {
self.empty_guard.unlock();
self.full_guard.unlock();
}
panic!("interrupt() is not fully implemented: an 'interrupted' flag with checks on report_full() and report_empty() is missing. Since checking it impacts performance, it is not implemented by now");
}
}
impl<SlotType: Copy+Debug + Send + Sync,
const BUFFER_SIZE: usize,
const LOCK_TIMEOUT_MILLIS: usize,
const INSTRUMENTS: usize>
OgreBlockingQueue<'_, SlotType>
for BlockingQueue<SlotType, BUFFER_SIZE, LOCK_TIMEOUT_MILLIS, INSTRUMENTS> {
fn set_empty_guard_ref(&mut self, _empty_guard_ref: &'_ RawMutex) {
todo!("no longer used method. remove as soon as it is guaranteed to be useless")
}
fn try_enqueue(&self, element: SlotType) -> Option<SlotType> {
match self.base_queue.publish_movable(element) {
( Some(len_after), _none_element ) => {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' enqueued element '{:?}' in non-blocking mode", self.queue_name, element);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.enqueue_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
if len_after.get() == 1 {
self.report_no_longer_empty();
}
None
},
( None, some_element ) => {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is full when enqueueing element '{:?}' in non-blocking mode", self.queue_name, element);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.queue_full_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
self.full_guard.try_lock();
some_element
},
}
}
fn try_dequeue(&self) -> Option<SlotType> {
self.base_queue.consume(|slot| *slot,
|| {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' is empty when dequeueing an element in non-blocking mode", self.queue_name);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.queue_empty_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
self.empty_guard.try_lock();
false
},
|len| {
if Instruments::from(INSTRUMENTS).tracing() {
trace!("### QUEUE '{}' dequeued an element in non-blocking mode", self.queue_name);
}
if Instruments::from(INSTRUMENTS).metrics() {
self.dequeue_count.fetch_add(1, Relaxed);
}
if Instruments::from(INSTRUMENTS).metrics_diagnostics() {
self.metrics_diagnostics();
}
if len == self.base_queue.available_elements_count() as i32 - 1 {
self.report_no_longer_full();
}
})
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
use super::super::super::super::test_commons::{self,ContainerKind,Blocking};
#[cfg_attr(not(doc),test)]
fn basic_queue_use_cases() {
let queue = BlockingQueue::<i32, 16, {Instruments::MetricsWithDiagnostics.into()}>::new("basic_use_cases' test queue".to_string());
test_commons::basic_container_use_cases(queue.queue_name(), ContainerKind::Queue, Blocking::Blocking, queue.max_size(),
|e| queue.enqueue(e).is_none(), || queue.dequeue(), || queue.len());
}
#[cfg_attr(not(doc),test)]
#[ignore] fn single_producer_multiple_consumers() {
let queue = BlockingQueue::<u32, 65536, {Instruments::MetricsWithDiagnostics.into()}>::new("single_producer_multiple_consumers' test queue".to_string());
test_commons::container_single_producer_multiple_consumers(queue.queue_name(), |e| queue.enqueue(e).is_none(), || queue.dequeue());
}
#[cfg_attr(not(doc),test)]
#[ignore] fn multiple_producers_single_consumer() {
let queue = BlockingQueue::<u32, 65536, {Instruments::MetricsWithDiagnostics.into()}>::new("multiple_producers_single_consumer' test queue".to_string());
test_commons::container_multiple_producers_single_consumer(queue.queue_name(), |e| queue.enqueue(e).is_none(), || queue.dequeue());
}
#[cfg_attr(not(doc),test)]
#[ignore] pub fn multiple_producers_and_consumers_all_in_and_out() {
let queue = BlockingQueue::<u32, 65536, {Instruments::MetricsWithDiagnostics.into()}>::new("multiple_producers_and_consumers_all_in_and_out' test queue".to_string());
test_commons::container_multiple_producers_and_consumers_all_in_and_out(queue.queue_name(), Blocking::Blocking, queue.max_size(), |e| queue.enqueue(e).is_none(), || queue.dequeue());
}
#[cfg_attr(not(doc),test)]
#[ignore] pub fn multiple_producers_and_consumers_single_in_and_out() {
let queue = BlockingQueue::<u32, 65536, {Instruments::MetricsWithDiagnostics.into()}>::new("multiple_producers_and_consumers_single_in_and_out' test queue".to_string());
test_commons::container_multiple_producers_and_consumers_single_in_and_out(queue.queue_name(), |e| queue.enqueue(e).is_none(), || queue.dequeue());
}
#[cfg_attr(not(doc),test)]
#[ignore] fn test_blocking() {
const TIMEOUT_MILLIS: usize = 100;
const QUEUE_SIZE: usize = 16;
let queue = BlockingQueue::<usize, QUEUE_SIZE, TIMEOUT_MILLIS, {Instruments::MetricsWithDiagnostics.into()}>::new("test_blocking' queue".to_string());
test_commons::blocking_behavior(queue.queue_name(),
QUEUE_SIZE,
|e| queue.enqueue(e).is_none(),
|| queue.dequeue(),
|e| queue.try_enqueue(e).is_none(),
|| queue.try_dequeue(),
false, || {});
}
}