use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use crossbeam::queue::ArrayQueue;
use uuid::Uuid;
use crate::{
cell::{Cell, CellImmutable, CellMutable},
signal::Signal,
subscription::SubscriptionGuard,
traits::{CellValue, DepNode, Gettable, Mutable, Watchable},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OverflowPolicy {
Block,
DropOldest,
DropNewest,
Error,
}
#[derive(Debug, Default)]
pub struct BoundedInputMetrics {
total_pushed: AtomicU64,
dropped_count: AtomicU64,
backpressure_events: AtomicU64,
}
impl BoundedInputMetrics {
#[inline]
pub fn record_push(&self) {
self.total_pushed.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_drop(&self) {
self.dropped_count.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_backpressure(&self) {
self.backpressure_events.fetch_add(1, Ordering::Relaxed);
}
pub fn total_pushed(&self) -> u64 {
self.total_pushed.load(Ordering::Relaxed)
}
pub fn dropped_count(&self) -> u64 {
self.dropped_count.load(Ordering::Relaxed)
}
pub fn backpressure_events(&self) -> u64 {
self.backpressure_events.load(Ordering::Relaxed)
}
}
struct BoundedInputInner<T> {
buffer: ArrayQueue<T>,
cell: Cell<T, CellMutable>,
policy: OverflowPolicy,
closed: AtomicBool,
metrics: BoundedInputMetrics,
capacity: usize,
}
pub struct BoundedInput<T> {
inner: Arc<BoundedInputInner<T>>,
}
impl<T> Clone for BoundedInput<T> {
fn clone(&self) -> Self {
BoundedInput {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: CellValue> BoundedInput<T> {
#[track_caller]
pub fn new(initial_value: T, capacity: usize, policy: OverflowPolicy) -> Self {
assert!(capacity > 0, "capacity must be positive");
BoundedInput {
inner: Arc::new(BoundedInputInner {
buffer: ArrayQueue::new(capacity),
cell: Cell::new(initial_value),
policy,
closed: AtomicBool::new(false),
metrics: BoundedInputMetrics::default(),
capacity,
}),
}
}
pub fn push(&self, value: T) -> Result<(), T> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(value);
}
match self.inner.buffer.push(value) {
Ok(()) => {
self.inner.metrics.record_push();
Ok(())
}
Err(value) => {
self.inner.metrics.record_backpressure();
self.handle_overflow(value)
}
}
}
pub fn push_flush(&self, value: T) -> Result<(), T> {
let result = self.push(value);
self.flush();
result
}
fn handle_overflow(&self, value: T) -> Result<(), T> {
match self.inner.policy {
OverflowPolicy::Block => {
let mut current_value = value;
loop {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(current_value);
}
match self.inner.buffer.push(current_value) {
Ok(()) => {
self.inner.metrics.record_push();
return Ok(());
}
Err(v) => {
current_value = v;
std::thread::yield_now();
}
}
}
}
OverflowPolicy::DropOldest => {
let _ = self.inner.buffer.pop();
self.inner.metrics.record_drop();
match self.inner.buffer.push(value) {
Ok(()) => {
self.inner.metrics.record_push();
Ok(())
}
Err(v) => {
Err(v)
}
}
}
OverflowPolicy::DropNewest => {
self.inner.metrics.record_drop();
Ok(())
}
OverflowPolicy::Error => {
self.inner.cell.fail(anyhow::anyhow!(
"BoundedInput overflow: buffer full (capacity {})",
self.inner.capacity
));
Err(value)
}
}
}
pub fn flush(&self) {
while let Some(value) = self.inner.buffer.pop() {
self.inner.cell.set(value);
}
}
pub fn close(&self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.flush();
self.inner.cell.complete();
}
pub fn metrics(&self) -> &BoundedInputMetrics {
&self.inner.metrics
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
pub fn is_closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
pub fn to_cell(&self) -> Cell<T, CellImmutable> {
self.inner.cell.clone().lock()
}
}
impl<T: CellValue> Gettable<T> for BoundedInput<T> {
fn get(&self) -> T {
self.flush();
self.inner.cell.get()
}
}
impl<T: CellValue> DepNode for BoundedInput<T> {
fn id(&self) -> Uuid {
self.inner.cell.inner.id
}
fn name(&self) -> Option<String> {
self.inner
.cell
.inner
.name
.lock()
.expect("cell name poisoned")
.as_ref()
.map(|s| s.to_string())
}
fn deps(&self) -> Vec<Arc<dyn DepNode>> {
vec![] }
}
impl<T: CellValue> Watchable<T> for BoundedInput<T> {
fn subscribe(
&self,
callback: impl Fn(&Signal<T>) + Send + Sync + 'static,
) -> SubscriptionGuard {
self.inner.cell.subscribe(callback)
}
fn unsubscribe(&self, id: Uuid) {
self.inner.cell.unsubscribe(id)
}
fn is_complete(&self) -> bool {
self.inner.cell.is_complete()
}
fn is_error(&self) -> bool {
self.inner.cell.is_error()
}
fn error(&self) -> Option<Arc<anyhow::Error>> {
self.inner.cell.error()
}
}