use std::collections::VecDeque;
use tokio::sync::mpsc;
use super::super::typed_event::{Priority, TypedEvent};
pub(in crate::connection) struct DriverEventSink {
external: mpsc::Sender<TypedEvent>,
pending: VecDeque<TypedEvent>,
hard_cap: usize,
drop_stats: DropStats,
}
#[derive(Default)]
struct DropStats {
dropped_resyncable: usize,
dropped_requeryable: usize,
since: Option<std::time::Instant>,
}
impl DropStats {
fn dropped_count(&self) -> usize {
self.dropped_resyncable + self.dropped_requeryable
}
}
const DEFAULT_HARD_CAP: usize = 4096;
#[derive(Debug, thiserror::Error)]
pub(in crate::connection::driver) enum EventOverflow {
#[error("critical event pending buffer exceeded hard cap")]
HardCap,
#[error("event receiver dropped")]
CallerGone,
}
impl DriverEventSink {
pub(in crate::connection) fn new(
external: mpsc::Sender<TypedEvent>,
hard_cap: Option<usize>,
) -> Self {
Self {
external,
pending: VecDeque::new(),
hard_cap: hard_cap.unwrap_or(DEFAULT_HARD_CAP).max(1),
drop_stats: DropStats::default(),
}
}
pub(in crate::connection::driver) fn emit(
&mut self,
ev: TypedEvent,
) -> Result<(), EventOverflow> {
self.drain_pending()?;
self.try_emit(ev)
}
pub(in crate::connection::driver) fn drain_pending_nonblocking(
&mut self,
) -> Result<(), EventOverflow> {
self.drain_pending()
}
#[allow(clippy::expect_used)] fn drain_pending(&mut self) -> Result<(), EventOverflow> {
while let Some(ev) = self.pending.pop_front() {
match self.external.try_send(ev) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(ev)) => {
self.pending.push_front(ev);
return Ok(());
}
Err(mpsc::error::TrySendError::Closed(_)) => {
self.pending.clear();
return Err(EventOverflow::CallerGone);
}
}
}
if self.drop_stats.dropped_count() > 0 {
let stats = std::mem::take(&mut self.drop_stats);
let marker = TypedEvent::QueueOverflow {
dropped_count: stats.dropped_count(),
since: stats.since.expect(
"since is always Some when dropped_count > 0 \
(set by record_drop_requeryable/record_drop_resyncable)",
),
};
match self.external.try_send(marker) {
Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => {
self.drop_stats = stats;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Err(EventOverflow::CallerGone);
}
}
}
Ok(())
}
fn try_emit(&mut self, ev: TypedEvent) -> Result<(), EventOverflow> {
match self.external.try_send(ev) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(ev)) => match ev.priority() {
Priority::Critical => {
if self.pending.len() >= self.hard_cap {
Err(EventOverflow::HardCap)
} else {
self.pending.push_back(ev);
Ok(())
}
}
Priority::Requeryable => {
self.record_drop_requeryable();
Ok(())
}
Priority::Resyncable => {
self.record_drop_resyncable();
Ok(())
}
},
Err(mpsc::error::TrySendError::Closed(_)) => Err(EventOverflow::CallerGone),
}
}
fn record_drop_requeryable(&mut self) {
if self.drop_stats.since.is_none() {
self.drop_stats.since = Some(std::time::Instant::now());
}
self.drop_stats.dropped_requeryable += 1;
}
fn record_drop_resyncable(&mut self) {
if self.drop_stats.since.is_none() {
self.drop_stats.since = Some(std::time::Instant::now());
}
self.drop_stats.dropped_resyncable += 1;
}
}
#[cfg(test)]
#[path = "event_sink_tests.rs"]
mod tests;