use std::collections::VecDeque;
use chrono::{DateTime, Utc};
use zero_engine_client::EngineEvent;
pub const DEFAULT_CAPACITY: usize = 200;
#[derive(Debug, Clone)]
pub enum RingItem {
Event(RingEntry),
Lagged {
ts: DateTime<Utc>,
skipped: u64,
},
}
#[derive(Debug, Clone)]
pub struct RingEntry {
pub ts: DateTime<Utc>,
pub event: EngineEvent,
}
#[derive(Debug)]
pub struct EventRing {
items: VecDeque<RingItem>,
cap: usize,
}
impl Default for EventRing {
fn default() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
}
impl EventRing {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_capacity(cap: usize) -> Self {
Self {
items: VecDeque::with_capacity(cap),
cap,
}
}
#[must_use]
pub const fn capacity(&self) -> usize {
self.cap
}
#[must_use]
pub fn len(&self) -> usize {
self.items.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub fn push_event(&mut self, event: EngineEvent) {
let ts = event_timestamp(&event).unwrap_or_else(Utc::now);
self.push(RingItem::Event(RingEntry { ts, event }));
}
pub fn push_event_at(&mut self, event: EngineEvent, ts: DateTime<Utc>) {
self.push(RingItem::Event(RingEntry { ts, event }));
}
pub fn push_lagged(&mut self, skipped: u64) {
self.push(RingItem::Lagged {
ts: Utc::now(),
skipped,
});
}
pub fn push_lagged_at(&mut self, skipped: u64, ts: DateTime<Utc>) {
self.push(RingItem::Lagged { ts, skipped });
}
fn push(&mut self, item: RingItem) {
if self.cap == 0 {
return;
}
while self.items.len() >= self.cap {
self.items.pop_front();
}
self.items.push_back(item);
}
pub fn iter(&self) -> impl DoubleEndedIterator<Item = &RingItem> {
self.items.iter()
}
pub fn tail(&self, n: usize) -> impl DoubleEndedIterator<Item = &RingItem> {
let start = self.items.len().saturating_sub(n);
self.items.iter().skip(start)
}
}
fn event_timestamp(evt: &EngineEvent) -> Option<DateTime<Utc>> {
match evt {
EngineEvent::Heartbeat(ts) | EngineEvent::Unknown { ts, .. } => Some(*ts),
EngineEvent::Status(_)
| EngineEvent::Positions(_)
| EngineEvent::Risk(_)
| EngineEvent::Regime(_) => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn heartbeat_at(sec: i64) -> EngineEvent {
EngineEvent::Heartbeat(DateTime::<Utc>::from_timestamp(sec, 0).unwrap())
}
#[test]
fn fresh_ring_is_empty_and_reports_capacity() {
let r = EventRing::new();
assert!(r.is_empty());
assert_eq!(r.len(), 0);
assert_eq!(r.capacity(), DEFAULT_CAPACITY);
}
#[test]
fn push_increments_len_up_to_capacity() {
let mut r = EventRing::with_capacity(4);
for s in 0..3 {
r.push_event(heartbeat_at(s));
}
assert_eq!(r.len(), 3);
assert!(!r.is_empty());
}
#[test]
fn push_beyond_capacity_drops_oldest() {
let mut r = EventRing::with_capacity(3);
for s in 1..=5 {
r.push_event(heartbeat_at(s));
}
assert_eq!(r.len(), 3);
let tss: Vec<i64> = r
.iter()
.map(|it| match it {
RingItem::Event(e) => e.ts.timestamp(),
RingItem::Lagged { ts, .. } => ts.timestamp(),
})
.collect();
assert_eq!(tss, vec![3, 4, 5]);
}
#[test]
fn zero_capacity_accepts_no_items() {
let mut r = EventRing::with_capacity(0);
r.push_event(heartbeat_at(1));
r.push_lagged(42);
assert!(r.is_empty());
assert_eq!(r.capacity(), 0);
}
#[test]
fn push_lagged_records_marker_with_count() {
let mut r = EventRing::with_capacity(2);
r.push_event(heartbeat_at(1));
r.push_lagged(7);
let items: Vec<_> = r.iter().collect();
assert_eq!(items.len(), 2);
assert!(matches!(items[1], RingItem::Lagged { skipped: 7, .. }));
}
#[test]
fn tail_clamps_to_ring_len() {
let mut r = EventRing::with_capacity(10);
for s in 1..=3 {
r.push_event(heartbeat_at(s));
}
assert_eq!(r.tail(100).count(), 3);
let last2: Vec<i64> = r
.tail(2)
.map(|it| match it {
RingItem::Event(e) => e.ts.timestamp(),
RingItem::Lagged { ts, .. } => ts.timestamp(),
})
.collect();
assert_eq!(last2, vec![2, 3]);
}
#[test]
fn push_event_without_direct_ts_falls_back_to_now() {
let mut r = EventRing::with_capacity(2);
r.push_event(EngineEvent::Heartbeat(Utc::now())); assert_eq!(r.len(), 1);
}
}