use crate::errors::QueueError;
use crate::message::Message;
use crate::policy::{AdmissionDecision, BatchingPolicy, EdgePolicy, WatermarkState};
use crate::prelude::{BatchView, HeaderStore, Payload};
use crate::types::MessageToken;
pub mod link;
pub mod spsc_array;
#[cfg(feature = "alloc")]
pub mod spsc_vecdeque;
#[cfg(feature = "std")]
pub mod spsc_concurrent;
#[cfg(feature = "spsc_raw")]
pub mod spsc_raw;
pub mod spsc_priority2;
#[cfg(any(test, feature = "bench"))]
pub mod bench;
#[cfg(any(test, feature = "bench"))]
pub mod contract_tests;
#[cfg(any(test, feature = "bench"))]
pub use contract_tests::*;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnqueueResult {
Enqueued,
DroppedNewest,
Rejected,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EdgeOccupancy {
items: usize,
bytes: usize,
watermark: WatermarkState,
}
impl EdgeOccupancy {
#[inline]
pub const fn new(items: usize, bytes: usize, watermark: WatermarkState) -> Self {
Self {
items,
bytes,
watermark,
}
}
#[inline]
pub fn items(&self) -> &usize {
&self.items
}
#[inline]
pub fn bytes(&self) -> &usize {
&self.bytes
}
#[inline]
pub fn watermark(&self) -> &WatermarkState {
&self.watermark
}
}
pub trait Edge {
fn try_push<H: HeaderStore>(
&mut self,
token: MessageToken,
policy: &EdgePolicy,
headers: &H,
) -> EnqueueResult;
fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError>;
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
fn is_empty(&self) -> bool;
fn try_peek(&self) -> Result<MessageToken, QueueError>;
fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError>;
fn peek_header<'h, H: HeaderStore>(
&self,
headers: &'h H,
) -> Result<<H as HeaderStore>::HeaderGuard<'h>, QueueError> {
let token = self.try_peek()?;
headers.peek_header(token).map_err(|_| QueueError::Empty)
}
fn try_pop_batch<H: HeaderStore>(
&mut self,
policy: &BatchingPolicy,
headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError>;
fn get_admission_decision<H: HeaderStore>(
&self,
policy: &EdgePolicy,
token: MessageToken,
headers: &H,
) -> AdmissionDecision {
let occ = self.occupancy(policy);
match headers.peek_header(token) {
Ok(h) => policy.decide(
occ.items,
occ.bytes,
*h.payload_size_bytes(),
*h.deadline_ns(),
*h.qos(),
),
Err(_) => AdmissionDecision::Reject,
}
}
fn get_admission_decision_from_message<P: Payload>(
&self,
policy: &EdgePolicy,
message: &Message<P>,
) -> AdmissionDecision {
let occ = self.occupancy(policy);
let h = message.header();
policy.decide(
occ.items,
occ.bytes,
*h.payload_size_bytes(),
*h.deadline_ns(),
*h.qos(),
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EdgeHandleKind {
Producer,
Consumer,
}
#[cfg(feature = "std")]
pub trait ScopedEdge: Edge {
type Handle<'a>: Edge + Send + 'a
where
Self: 'a;
fn scoped_handle<'a>(&'a self, kind: EdgeHandleKind) -> Self::Handle<'a>
where
Self: 'a;
}
pub struct NoQueue;
impl Edge for NoQueue {
#[inline]
fn try_push<H: HeaderStore>(
&mut self,
_token: MessageToken,
_policy: &EdgePolicy,
_headers: &H,
) -> EnqueueResult {
EnqueueResult::Rejected
}
#[inline]
fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard)
}
fn is_empty(&self) -> bool {
true
}
fn try_peek(&self) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
Err(QueueError::Empty)
}
#[inline]
fn try_pop_batch<H: HeaderStore>(
&mut self,
_policy: &BatchingPolicy,
_headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
Err(QueueError::Empty)
}
}