use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use chrono::{DateTime, Utc};
const MAX_BUFFERED_EVENTS: usize = 50_000;
#[derive(Debug, Clone)]
pub(super) struct EventRecord {
pub(super) at: DateTime<Utc>,
pub(super) kind: String,
pub(super) queue_name: String,
pub(super) job_id: Option<String>,
pub(super) event_type: &'static str,
}
impl EventRecord {
pub(super) fn new(
at: DateTime<Utc>,
kind: impl Into<String>,
queue_name: impl Into<String>,
job_id: Option<&str>,
event_type: &'static str,
) -> Self {
Self {
at,
kind: kind.into(),
queue_name: queue_name.into(),
job_id: job_id.map(ToOwned::to_owned),
event_type,
}
}
}
#[derive(Debug, Default)]
pub(super) struct EventBuffer {
events: Mutex<Vec<EventRecord>>,
dropped: AtomicU64,
}
impl EventBuffer {
pub(super) fn push(&self, ev: EventRecord) {
if let Ok(mut g) = self.events.lock() {
if g.len() < MAX_BUFFERED_EVENTS {
g.push(ev);
} else {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
pub(super) fn push_all(&self, evs: impl IntoIterator<Item = EventRecord>) {
if let Ok(mut g) = self.events.lock() {
for ev in evs {
if g.len() < MAX_BUFFERED_EVENTS {
g.push(ev);
} else {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
}
pub(super) fn drain(&self) -> (Vec<EventRecord>, u64) {
let events = self
.events
.lock()
.map(|mut g| std::mem::take(&mut *g))
.unwrap_or_default();
let dropped = self.dropped.swap(0, Ordering::Relaxed);
(events, dropped)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(event_type: &'static str) -> EventRecord {
EventRecord::new(Utc::now(), "k", "q", Some("id"), event_type)
}
#[test]
fn push_then_drain_returns_events_and_empties() {
let buf = EventBuffer::default();
buf.push(ev("enqueued"));
buf.push(ev("completed"));
let (drained, dropped) = buf.drain();
assert_eq!(drained.len(), 2);
assert_eq!(dropped, 0);
let (empty, _) = buf.drain();
assert!(empty.is_empty(), "drain leaves the buffer empty");
}
#[test]
fn push_caps_and_counts_drops() {
let buf = EventBuffer::default();
for _ in 0..MAX_BUFFERED_EVENTS {
buf.push(ev("started"));
}
for _ in 0..7 {
buf.push(ev("started"));
}
let (drained, dropped) = buf.drain();
assert_eq!(drained.len(), MAX_BUFFERED_EVENTS);
assert_eq!(dropped, 7, "past-cap pushes count as drops");
buf.push(ev("failed"));
let (_, dropped2) = buf.drain();
assert_eq!(dropped2, 0);
}
}