#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Instant;
use tokio::sync::mpsc;
use super::*;
use crate::connection::typed_event::TypedEvent;
fn make_sink(channel_cap: usize, hard_cap: usize) -> (DriverEventSink, mpsc::Receiver<TypedEvent>) {
let (tx, rx) = mpsc::channel(channel_cap);
let sink = DriverEventSink::new(tx, Some(hard_cap));
(sink, rx)
}
fn resyncable_event(n: u32) -> TypedEvent {
TypedEvent::Exists(n)
}
fn critical_event(msg: &str) -> TypedEvent {
TypedEvent::Alert(msg.to_owned())
}
fn requeryable_event() -> TypedEvent {
TypedEvent::CapabilityChange(vec![])
}
fn drain_rx(rx: &mut mpsc::Receiver<TypedEvent>) -> Vec<TypedEvent> {
let mut events = Vec::new();
while let Ok(ev) = rx.try_recv() {
events.push(ev);
}
events
}
#[test]
fn basic_emit_with_capacity() {
let (mut sink, mut rx) = make_sink(8, 16);
sink.emit(resyncable_event(1)).unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], TypedEvent::Exists(1)));
}
#[test]
fn resyncable_dropped_when_full() {
let (mut sink, _rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
}
#[test]
fn requeryable_dropped_when_full() {
let (mut sink, _rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(requeryable_event()).unwrap();
}
#[test]
fn critical_buffered_when_full() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(critical_event("alert1")).unwrap();
assert_eq!(sink.pending.len(), 1);
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
assert!(matches!(&events[0], TypedEvent::Alert(msg) if msg == "alert1"));
}
#[test]
fn overflow_marker_emitted_after_drops_and_drain() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(resyncable_event(3)).unwrap();
sink.emit(resyncable_event(4)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 3);
}
other => panic!("expected QueueOverflow, got {other:?}"),
}
}
#[test]
fn cumulative_counts_mixed_drops() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(requeryable_event()).unwrap();
sink.emit(resyncable_event(3)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 3); }
other => panic!("expected QueueOverflow, got {other:?}"),
}
}
#[test]
fn since_timestamp_is_from_first_drop() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
let before = Instant::now();
sink.emit(resyncable_event(2)).unwrap(); let after_first = Instant::now();
sink.emit(resyncable_event(3)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { since, .. } => {
assert!(*since >= before);
assert!(*since <= after_first);
}
other => panic!("expected QueueOverflow, got {other:?}"),
}
}
#[test]
fn no_marker_when_no_drops() {
let (mut sink, mut rx) = make_sink(8, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert!(events.is_empty(), "no marker expected, got {events:?}");
}
#[test]
fn stats_reset_after_marker_delivered() {
let (mut sink, mut rx) = make_sink(2, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(resyncable_event(10)).unwrap();
sink.emit(resyncable_event(11)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 2);
}
other => panic!("expected QueueOverflow window 1, got {other:?}"),
}
sink.emit(resyncable_event(20)).unwrap();
sink.emit(resyncable_event(21)).unwrap();
sink.emit(resyncable_event(22)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 1);
}
other => panic!("expected QueueOverflow window 2, got {other:?}"),
}
}
#[test]
fn critical_events_drain_before_marker() {
let (mut sink, mut rx) = make_sink(2, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(99)).unwrap();
sink.emit(critical_event("alert-first")).unwrap();
sink.emit(resyncable_event(2)).unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert!(
events.len() >= 2,
"expected at least 2 events, got {events:?}"
);
assert!(
matches!(&events[0], TypedEvent::Alert(msg) if msg == "alert-first"),
"first event should be alert, got {:?}",
events[0]
);
assert!(
matches!(
&events[1],
TypedEvent::QueueOverflow {
dropped_count: 1,
..
}
),
"second event should be QueueOverflow with count=1, got {:?}",
events[1]
);
}
#[test]
fn marker_does_not_amplify_under_sustained_backpressure() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(resyncable_event(3)).unwrap();
sink.drain_pending_nonblocking().unwrap();
sink.drain_pending_nonblocking().unwrap();
sink.drain_pending_nonblocking().unwrap();
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
let markers: Vec<_> = events
.iter()
.filter(|e| matches!(e, TypedEvent::QueueOverflow { .. }))
.collect();
assert_eq!(
markers.len(),
1,
"expected exactly 1 marker, got {markers:?}"
);
match markers[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 2);
}
_ => unreachable!(),
}
}
#[test]
fn hard_cap_fires_for_critical_overflow() {
let (mut sink, _rx) = make_sink(1, 2);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(critical_event("c1")).unwrap();
sink.emit(critical_event("c2")).unwrap();
let result = sink.emit(critical_event("c3"));
assert!(
matches!(result, Err(EventOverflow::HardCap)),
"expected HardCap, got {result:?}"
);
}
#[test]
fn caller_gone_when_receiver_dropped() {
let (mut sink, rx) = make_sink(1, 16);
drop(rx);
let result = sink.emit(resyncable_event(1));
assert!(
matches!(result, Err(EventOverflow::CallerGone)),
"expected CallerGone, got {result:?}"
);
}
#[test]
fn caller_gone_during_drain_pending() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(critical_event("c1")).unwrap();
drain_rx(&mut rx);
drop(rx);
let result = sink.drain_pending_nonblocking();
assert!(
matches!(result, Err(EventOverflow::CallerGone)),
"expected CallerGone, got {result:?}"
);
}
#[test]
fn marker_retried_on_next_drain_after_full() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.drain_pending_nonblocking().unwrap();
assert!(
sink.drop_stats.dropped_count() > 0,
"stats should be preserved when marker can't be sent"
);
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
assert!(
matches!(
&events[0],
TypedEvent::QueueOverflow {
dropped_count: 1,
..
}
),
"expected QueueOverflow with count=1, got {:?}",
events[0]
);
}
#[test]
fn drops_accumulate_within_one_window() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
for i in 2..=6 {
sink.emit(resyncable_event(i)).unwrap();
}
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 5);
}
other => panic!("expected QueueOverflow, got {other:?}"),
}
}
#[test]
fn no_double_marker_from_drain_then_emit() {
let (mut sink, mut rx) = make_sink(2, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(resyncable_event(3)).unwrap();
rx.try_recv().unwrap();
sink.drain_pending_nonblocking().unwrap();
rx.try_recv().unwrap();
sink.emit(resyncable_event(4)).unwrap();
let events = drain_rx(&mut rx);
let markers: Vec<_> = events
.iter()
.filter(|e| matches!(e, TypedEvent::QueueOverflow { .. }))
.collect();
assert_eq!(
markers.len(),
1,
"expected exactly 1 marker total, got {markers:?}"
);
}
#[test]
fn drain_pending_noop_when_nothing_to_do() {
let (mut sink, mut rx) = make_sink(8, 16);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert!(events.is_empty());
}
#[test]
fn caller_gone_during_marker_send() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
drain_rx(&mut rx);
drop(rx);
let result = sink.drain_pending_nonblocking();
assert!(
matches!(result, Err(EventOverflow::CallerGone)),
"expected CallerGone during marker send, got {result:?}"
);
}
#[test]
fn drops_accumulate_while_marker_is_deferred() {
let (mut sink, mut rx) = make_sink(1, 16);
sink.emit(resyncable_event(1)).unwrap();
sink.emit(resyncable_event(2)).unwrap();
sink.emit(resyncable_event(3)).unwrap();
sink.drain_pending_nonblocking().unwrap();
assert_eq!(
sink.drop_stats.dropped_count(),
2,
"stats should be preserved"
);
sink.emit(resyncable_event(4)).unwrap();
sink.emit(resyncable_event(5)).unwrap();
sink.emit(resyncable_event(6)).unwrap();
assert_eq!(
sink.drop_stats.dropped_count(),
5,
"new drops should accumulate"
);
drain_rx(&mut rx);
sink.drain_pending_nonblocking().unwrap();
let events = drain_rx(&mut rx);
assert_eq!(events.len(), 1);
match &events[0] {
TypedEvent::QueueOverflow { dropped_count, .. } => {
assert_eq!(*dropped_count, 5, "marker should have the combined count");
}
other => panic!("expected QueueOverflow, got {other:?}"),
}
}