use std::ops::Deref;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Waker;
use atomic_waker::AtomicWaker;
use concurrent_queue::{ConcurrentQueue, PushError};
use crate::bus::{AppIngressErrorKind, AppIngressSource};
use crate::ids::CommandId;
pub const COMPLETION_DETAIL_CAP: usize = 4 * 1024;
const DEFAULT_BUS_CAPACITY: usize = 1024;
pub const DEFAULT_INGRESS_CAPACITY: usize = DEFAULT_BUS_CAPACITY * 4;
#[derive(Debug)]
pub enum IngressEvent {
EnvelopeFrom {
src_peer: crate::ids::PeerId,
envelope: crate::envelope::WireEnvelope,
src_observed_address: Option<crate::framework::Address>,
},
AppEvent {
module_name: String,
input_name: String,
value_bytes: Vec<u8>,
},
TimerMatured {
at_ns: u64,
},
Invoke {
module_name: String,
inputs: Vec<(String, Vec<u8>)>,
exec_id: crate::ids::ExecId,
},
Completion {
cmd_id: CommandId,
results: Vec<Vec<u8>>,
},
CompletionFailed {
cmd_id: CommandId,
detail: String,
},
SendFailed {
wire_req_id: u64,
peer: Vec<u8>,
reason: &'static str,
},
AppIngressError {
source: AppIngressSource,
byte_count: usize,
kind: AppIngressErrorKind,
},
}
impl IngressEvent {
pub fn from_in_process(
src_peer: crate::ids::PeerId,
envelope: crate::envelope::WireEnvelope,
) -> Self {
Self::EnvelopeFrom {
src_peer,
envelope,
src_observed_address: Some(crate::framework::Address::empty().p2p(src_peer)),
}
}
}
pub struct IngressQueue {
queue: ConcurrentQueue<IngressEvent>,
waker: AtomicWaker,
dropped_overflow: AtomicU64,
completion_result_cap: AtomicUsize,
}
impl IngressQueue {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_INGRESS_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
queue: ConcurrentQueue::bounded(capacity),
waker: AtomicWaker::new(),
dropped_overflow: AtomicU64::new(0),
completion_result_cap: AtomicUsize::new(usize::MAX),
}
}
pub(crate) fn set_completion_result_cap(&self, cap: usize) {
self.completion_result_cap.store(cap, Ordering::Relaxed);
}
pub fn completion_result_cap(&self) -> usize {
self.completion_result_cap.load(Ordering::Relaxed)
}
#[allow(clippy::result_large_err)]
pub fn push(&self, event: IngressEvent) -> Result<(), IngressEvent> {
match self.queue.push(event) {
Ok(()) => {
self.waker.wake();
Ok(())
}
Err(PushError::Full(ev)) => {
self.dropped_overflow.fetch_add(1, Ordering::Relaxed);
Err(ev)
}
Err(PushError::Closed(ev)) => Err(ev),
}
}
pub fn drain_all(&self) -> Vec<IngressEvent> {
let mut out = Vec::with_capacity(self.queue.capacity().unwrap_or(0));
while let Ok(event) = self.queue.pop() {
out.push(event);
}
out
}
pub fn register_waker(&self, waker: &Waker) {
self.waker.register(waker);
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn capacity(&self) -> usize {
self.queue.capacity().unwrap_or(usize::MAX)
}
pub fn dropped_overflow(&self) -> u64 {
self.dropped_overflow.load(Ordering::Relaxed)
}
}
impl Default for IngressQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct IngressQueueRef(Arc<IngressQueue>);
impl IngressQueueRef {
pub fn new(queue: Arc<IngressQueue>) -> Self {
Self(queue)
}
}
impl IngressQueueRef {
pub fn arc(&self) -> &Arc<IngressQueue> {
&self.0
}
}
impl Deref for IngressQueueRef {
type Target = IngressQueue;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::fmt::Debug for IngressQueueRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IngressQueueRef")
.field("len", &self.len())
.field("dropped_overflow", &self.dropped_overflow())
.finish()
}
}