use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::types::{EventEnvelope, UnifiedEvent};
pub type EventLogError = Box<dyn std::error::Error + Send>;
type EventFilter = Box<dyn Fn(&UnifiedEvent) -> bool + Send + Sync>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedEvent {
pub id: String,
pub seq: u64,
pub timestamp_ms: u64,
pub member_id: Option<String>,
pub event: UnifiedEvent,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EventQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub since_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub until_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub member_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub identity: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mob_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub step_id: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub event_types: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub after_seq: Option<u64>,
}
pub trait EventLogStore: Send + Sync {
fn append_batch(
&self,
events: Vec<PersistedEvent>,
) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>>;
fn query(
&self,
query: EventQuery,
) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>;
}
pub struct EventLogConfig {
pub store: Box<dyn EventLogStore>,
pub filter: Option<EventFilter>,
pub batch_size: usize,
pub flush_interval: Duration,
}
impl Default for EventLogConfig {
fn default() -> Self {
Self {
store: Box::new(NullEventLogStore),
filter: None,
batch_size: 64,
flush_interval: Duration::from_secs(1),
}
}
}
struct NullEventLogStore;
impl EventLogStore for NullEventLogStore {
fn append_batch(
&self,
_events: Vec<PersistedEvent>,
) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
fn query(
&self,
_query: EventQuery,
) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>> {
Box::pin(async { Ok(Vec::new()) })
}
}
pub(crate) struct EventLogHandle {
store: Arc<dyn EventLogStore>,
ingress_tx: mpsc::Sender<EventEnvelope<UnifiedEvent>>,
}
impl EventLogHandle {
pub fn store(&self) -> std::sync::Arc<dyn EventLogStore> {
self.store.clone()
}
pub fn ingest(&self, event: EventEnvelope<UnifiedEvent>) {
let _ = self.ingress_tx.try_send(event);
}
}
const EVENT_LOG_RETRY_BUFFER_CAP: usize = 4096;
pub(crate) fn start_event_log(
config: EventLogConfig,
error_hook: Option<super::ErrorHook>,
) -> EventLogHandle {
let store: Arc<dyn EventLogStore> = Arc::from(config.store);
let seq = Arc::new(AtomicU64::new(1));
let batch_size = config.batch_size.max(1);
let channel_capacity = (batch_size * 4).max(4);
let (ingress_tx, ingress_rx) = mpsc::channel(channel_capacity);
let handle = EventLogHandle {
store: store.clone(),
ingress_tx,
};
tokio::spawn(run_flush_loop(
ingress_rx,
store,
seq,
config.filter,
batch_size,
config.flush_interval,
error_hook,
));
handle
}
async fn run_flush_loop(
mut rx: mpsc::Receiver<EventEnvelope<UnifiedEvent>>,
store: Arc<dyn EventLogStore>,
seq: Arc<AtomicU64>,
filter: Option<EventFilter>,
batch_size: usize,
flush_interval: Duration,
error_hook: Option<super::ErrorHook>,
) {
let mut batch: Vec<PersistedEvent> = Vec::with_capacity(batch_size);
let mut interval = tokio::time::interval(flush_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
maybe_event = rx.recv() => {
match maybe_event {
Some(envelope) => {
if let Some(ref f) = filter
&& !f(&envelope.event)
{
continue;
}
let persisted = to_persisted(&seq, &envelope);
batch.push(persisted);
if batch.len() >= batch_size {
flush_batch(&store, &mut batch, &error_hook).await;
}
}
None => {
if !batch.is_empty() {
flush_batch(&store, &mut batch, &error_hook).await;
}
break;
}
}
}
_ = interval.tick() => {
if !batch.is_empty() {
flush_batch(&store, &mut batch, &error_hook).await;
}
}
}
}
}
fn enforce_retry_cap(batch: &mut Vec<PersistedEvent>) -> usize {
if batch.len() <= EVENT_LOG_RETRY_BUFFER_CAP {
return 0;
}
let drop = batch.len() - EVENT_LOG_RETRY_BUFFER_CAP;
batch.drain(0..drop);
drop
}
fn to_persisted(seq: &AtomicU64, envelope: &EventEnvelope<UnifiedEvent>) -> PersistedEvent {
let member_id = match &envelope.event {
UnifiedEvent::Agent { agent_id, .. } => Some(agent_id.clone()),
UnifiedEvent::Module(_) => None,
};
PersistedEvent {
id: envelope.event_id.clone(),
seq: seq.fetch_add(1, Ordering::Relaxed),
timestamp_ms: envelope.timestamp_ms,
member_id,
event: envelope.event.clone(),
}
}
async fn flush_batch(
store: &Arc<dyn EventLogStore>,
batch: &mut Vec<PersistedEvent>,
error_hook: &Option<super::ErrorHook>,
) {
let events = std::mem::take(batch);
if let Err(err) = store.append_batch(events.clone()).await {
let mut restored = events;
restored.append(batch); let dropped = enforce_retry_cap(&mut restored);
*batch = restored;
if let Some(hook) = error_hook {
let hook = hook.clone();
let msg = if dropped > 0 {
format!(
"event log flush failed: {err}; dropped {dropped} oldest events to bound the retry buffer at {EVENT_LOG_RETRY_BUFFER_CAP}"
)
} else {
format!("event log flush failed: {err}; will retry")
};
tokio::spawn(async move {
let () = hook(super::types::ErrorEvent::EventLogFlushFailure { error: msg }).await;
});
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use std::sync::Mutex;
struct FlakyStore {
failures_remaining: Mutex<usize>,
persisted: Mutex<Vec<PersistedEvent>>,
attempts: Mutex<usize>,
}
impl EventLogStore for FlakyStore {
fn append_batch(
&self,
events: Vec<PersistedEvent>,
) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
Box::pin(async move {
*self.attempts.lock().expect("attempts") += 1;
let mut left = self.failures_remaining.lock().expect("failures");
if *left > 0 {
*left -= 1;
#[derive(Debug)]
struct Transient;
impl std::fmt::Display for Transient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "transient")
}
}
impl std::error::Error for Transient {}
return Err(Box::new(Transient) as Box<dyn std::error::Error + Send>);
}
self.persisted.lock().expect("persisted").extend(events);
Ok(())
})
}
fn query(
&self,
_query: EventQuery,
) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>
{
Box::pin(async { Ok(Vec::new()) })
}
}
fn sample_event(id: &str) -> PersistedEvent {
PersistedEvent {
id: id.to_string(),
seq: 0,
timestamp_ms: 0,
member_id: None,
event: UnifiedEvent::Module(crate::types::ModuleEvent {
module: "test-module".into(),
event_type: "x".into(),
payload: serde_json::Value::Null,
}),
}
}
#[tokio::test]
async fn flush_failure_retries_instead_of_dropping_events() {
let flaky = Arc::new(FlakyStore {
failures_remaining: Mutex::new(2),
persisted: Mutex::new(Vec::new()),
attempts: Mutex::new(0),
});
let store: Arc<dyn EventLogStore> = flaky.clone();
let mut batch = vec![sample_event("a"), sample_event("b")];
flush_batch(&store, &mut batch, &None).await;
assert_eq!(batch.len(), 2, "events must be retained on flush failure");
flush_batch(&store, &mut batch, &None).await;
assert_eq!(batch.len(), 2);
flush_batch(&store, &mut batch, &None).await;
assert!(batch.is_empty(), "batch must drain on successful flush");
assert_eq!(*flaky.attempts.lock().expect("attempts"), 3);
assert_eq!(flaky.persisted.lock().expect("persisted").len(), 2);
}
#[test]
fn enforce_retry_cap_drops_oldest() {
let mut batch: Vec<PersistedEvent> = (0..(EVENT_LOG_RETRY_BUFFER_CAP + 100))
.map(|i| sample_event(&format!("evt-{i}")))
.collect();
let dropped = enforce_retry_cap(&mut batch);
assert_eq!(dropped, 100);
assert_eq!(batch.len(), EVENT_LOG_RETRY_BUFFER_CAP);
assert_eq!(batch.first().expect("first").id, "evt-100");
}
}