Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27        let value = value.into();
28        if value.is_empty() {
29            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30        }
31        if !value
32            .chars()
33            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34        {
35            return Err(LogError::InvalidTopic(format!(
36                "topic '{value}' contains unsupported characters"
37            )));
38        }
39        Ok(Self(value))
40    }
41
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl fmt::Display for Topic {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        self.0.fmt(f)
50    }
51}
52
53impl FromStr for Topic {
54    type Err = LogError;
55
56    fn from_str(s: &str) -> Result<Self, Self::Err> {
57        Self::new(s)
58    }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66        let value = value.into();
67        if value.trim().is_empty() {
68            return Err(LogError::InvalidConsumer(
69                "consumer id cannot be empty".to_string(),
70            ));
71        }
72        Ok(Self(value))
73    }
74
75    pub fn as_str(&self) -> &str {
76        &self.0
77    }
78}
79
80impl fmt::Display for ConsumerId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        self.0.fmt(f)
83    }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89    Memory,
90    File,
91    Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Memory => write!(f, "memory"),
98            Self::File => write!(f, "file"),
99            Self::Sqlite => write!(f, "sqlite"),
100        }
101    }
102}
103
104impl FromStr for EventLogBackendKind {
105    type Err = LogError;
106
107    fn from_str(value: &str) -> Result<Self, Self::Err> {
108        match value.trim().to_ascii_lowercase().as_str() {
109            "memory" => Ok(Self::Memory),
110            "file" => Ok(Self::File),
111            "sqlite" => Ok(Self::Sqlite),
112            other => Err(LogError::Config(format!(
113                "unsupported event log backend '{other}'"
114            ))),
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121    pub kind: String,
122    pub payload: serde_json::Value,
123    #[serde(default)]
124    pub headers: BTreeMap<String, String>,
125    pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130        Self {
131            kind: kind.into(),
132            payload,
133            headers: BTreeMap::new(),
134            occurred_at_ms: now_ms(),
135        }
136    }
137
138    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139        self.headers = headers;
140        self
141    }
142}
143
144/// Serialized event payload form for large read paths.
145///
146/// `payload` contains the original JSON bytes for backends that can expose
147/// them directly. Callers that only need to forward or hash the payload can
148/// avoid materializing a `serde_json::Value`; callers that need structured
149/// access can opt in with `payload_json`.
150#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152    pub kind: String,
153    pub payload: Bytes,
154    pub headers: BTreeMap<String, String>,
155    pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160        serde_json::from_slice(&self.payload)
161            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162    }
163
164    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165        Ok(LogEvent {
166            kind: self.kind,
167            payload: serde_json::from_slice(&self.payload).map_err(|error| {
168                LogError::Serde(format!("event log payload parse error: {error}"))
169            })?,
170            headers: self.headers,
171            occurred_at_ms: self.occurred_at_ms,
172        })
173    }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177    type Error = LogError;
178
179    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180        let payload = serde_json::to_vec(&event.payload)
181            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182        Ok(Self {
183            kind: event.kind,
184            payload: Bytes::from(payload),
185            headers: event.headers,
186            occurred_at_ms: event.occurred_at_ms,
187        })
188    }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193    pub removed: usize,
194    pub remaining: usize,
195    pub latest: Option<EventId>,
196    pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201    pub backend: EventLogBackendKind,
202    pub location: Option<PathBuf>,
203    pub size_bytes: Option<u64>,
204    pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209    Config(String),
210    InvalidTopic(String),
211    InvalidConsumer(String),
212    Io(String),
213    Serde(String),
214    Sqlite(String),
215    ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        match self {
221            Self::Config(message)
222            | Self::InvalidTopic(message)
223            | Self::InvalidConsumer(message)
224            | Self::Io(message)
225            | Self::Serde(message)
226            | Self::Sqlite(message) => message.fmt(f),
227            Self::ConsumerLagged(last_id) => {
228                write!(f, "subscriber lagged behind after event {last_id}")
229            }
230        }
231    }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238    fn describe(&self) -> EventLogDescription;
239
240    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242    async fn flush(&self) -> Result<(), LogError>;
243
244    /// Read events strictly after `from`. `None` starts from the
245    /// beginning of the topic.
246    async fn read_range(
247        &self,
248        topic: &Topic,
249        from: Option<EventId>,
250        limit: usize,
251    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253    async fn read_range_bytes(
254        &self,
255        topic: &Topic,
256        from: Option<EventId>,
257        limit: usize,
258    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259        let events = self.read_range(topic, from, limit).await?;
260        events
261            .into_iter()
262            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263            .collect()
264    }
265
266    /// `async fn` keeps the ergonomic generic surface; the boxed stream
267    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
268    async fn subscribe(
269        self: Arc<Self>,
270        topic: &Topic,
271        from: Option<EventId>,
272    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274    async fn ack(
275        &self,
276        topic: &Topic,
277        consumer: &ConsumerId,
278        up_to: EventId,
279    ) -> Result<(), LogError>;
280
281    async fn consumer_cursor(
282        &self,
283        topic: &Topic,
284        consumer: &ConsumerId,
285    ) -> Result<Option<EventId>, LogError>;
286
287    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294    pub backend: EventLogBackendKind,
295    pub file_dir: PathBuf,
296    pub sqlite_path: PathBuf,
297    pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303            .ok()
304            .map(|value| value.parse())
305            .transpose()?
306            .unwrap_or(EventLogBackendKind::Sqlite);
307        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308            .ok()
309            .and_then(|value| value.parse::<usize>().ok())
310            .unwrap_or(128)
311            .max(1);
312
313        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315            _ => crate::runtime_paths::event_log_dir(base_dir),
316        };
317        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320        };
321
322        Ok(Self {
323            backend,
324            file_dir,
325            sqlite_path,
326            queue_depth,
327        })
328    }
329
330    pub fn location(&self) -> Option<PathBuf> {
331        match self.backend {
332            EventLogBackendKind::Memory => None,
333            EventLogBackendKind::File => Some(self.file_dir.clone()),
334            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335        }
336    }
337}
338
339thread_local! {
340    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344    let config = EventLogConfig::for_base_dir(base_dir)?;
345    let log = open_event_log(&config)?;
346    ACTIVE_EVENT_LOG.with(|slot| {
347        *slot.borrow_mut() = Some(log.clone());
348    });
349    Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354    ACTIVE_EVENT_LOG.with(|slot| {
355        *slot.borrow_mut() = Some(log.clone());
356    });
357    log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361    ACTIVE_EVENT_LOG.with(|slot| {
362        *slot.borrow_mut() = Some(log.clone());
363    });
364    log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372    ACTIVE_EVENT_LOG.with(|slot| {
373        *slot.borrow_mut() = None;
374    });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378    let config = EventLogConfig::for_base_dir(base_dir)?;
379    let description = match config.backend {
380        EventLogBackendKind::Memory => EventLogDescription {
381            backend: EventLogBackendKind::Memory,
382            location: None,
383            size_bytes: None,
384            queue_depth: config.queue_depth,
385        },
386        EventLogBackendKind::File => EventLogDescription {
387            backend: EventLogBackendKind::File,
388            size_bytes: Some(dir_size_bytes(&config.file_dir)),
389            location: Some(config.file_dir),
390            queue_depth: config.queue_depth,
391        },
392        EventLogBackendKind::Sqlite => EventLogDescription {
393            backend: EventLogBackendKind::Sqlite,
394            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395            location: Some(config.sqlite_path),
396            queue_depth: config.queue_depth,
397        },
398    };
399    Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403    match config.backend {
404        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405            config.queue_depth,
406        )))),
407        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408            config.file_dir.clone(),
409            config.queue_depth,
410        )?))),
411        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412            config.sqlite_path.clone(),
413            config.queue_depth,
414        )?))),
415    }
416}
417
418pub enum AnyEventLog {
419    Memory(MemoryEventLog),
420    File(FileEventLog),
421    Sqlite(SqliteEventLog),
422}
423
424impl EventLog for AnyEventLog {
425    fn describe(&self) -> EventLogDescription {
426        match self {
427            Self::Memory(log) => log.describe(),
428            Self::File(log) => log.describe(),
429            Self::Sqlite(log) => log.describe(),
430        }
431    }
432
433    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
434        match self {
435            Self::Memory(log) => log.append(topic, event).await,
436            Self::File(log) => log.append(topic, event).await,
437            Self::Sqlite(log) => log.append(topic, event).await,
438        }
439    }
440
441    async fn flush(&self) -> Result<(), LogError> {
442        match self {
443            Self::Memory(log) => log.flush().await,
444            Self::File(log) => log.flush().await,
445            Self::Sqlite(log) => log.flush().await,
446        }
447    }
448
449    async fn read_range(
450        &self,
451        topic: &Topic,
452        from: Option<EventId>,
453        limit: usize,
454    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
455        match self {
456            Self::Memory(log) => log.read_range(topic, from, limit).await,
457            Self::File(log) => log.read_range(topic, from, limit).await,
458            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
459        }
460    }
461
462    async fn read_range_bytes(
463        &self,
464        topic: &Topic,
465        from: Option<EventId>,
466        limit: usize,
467    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
468        match self {
469            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
470            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
471            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
472        }
473    }
474
475    async fn subscribe(
476        self: Arc<Self>,
477        topic: &Topic,
478        from: Option<EventId>,
479    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
480        let (rx, queue_depth) = match self.as_ref() {
481            Self::Memory(log) => (
482                log.broadcasts.subscribe(topic, log.queue_depth),
483                log.queue_depth,
484            ),
485            Self::File(log) => (
486                log.broadcasts.subscribe(topic, log.queue_depth),
487                log.queue_depth,
488            ),
489            Self::Sqlite(log) => (
490                log.broadcasts.subscribe(topic, log.queue_depth),
491                log.queue_depth,
492            ),
493        };
494        let history = self.read_range(topic, from, usize::MAX).await?;
495        Ok(stream_from_broadcast(history, from, rx, queue_depth))
496    }
497
498    async fn ack(
499        &self,
500        topic: &Topic,
501        consumer: &ConsumerId,
502        up_to: EventId,
503    ) -> Result<(), LogError> {
504        match self {
505            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
506            Self::File(log) => log.ack(topic, consumer, up_to).await,
507            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
508        }
509    }
510
511    async fn consumer_cursor(
512        &self,
513        topic: &Topic,
514        consumer: &ConsumerId,
515    ) -> Result<Option<EventId>, LogError> {
516        match self {
517            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
518            Self::File(log) => log.consumer_cursor(topic, consumer).await,
519            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
520        }
521    }
522
523    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
524        match self {
525            Self::Memory(log) => log.latest(topic).await,
526            Self::File(log) => log.latest(topic).await,
527            Self::Sqlite(log) => log.latest(topic).await,
528        }
529    }
530
531    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
532        match self {
533            Self::Memory(log) => log.compact(topic, before).await,
534            Self::File(log) => log.compact(topic, before).await,
535            Self::Sqlite(log) => log.compact(topic, before).await,
536        }
537    }
538}
539
540#[derive(Default)]
541struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
542
543impl BroadcastMap {
544    fn subscribe(
545        &self,
546        topic: &Topic,
547        capacity: usize,
548    ) -> broadcast::Receiver<(EventId, LogEvent)> {
549        self.sender(topic, capacity).subscribe()
550    }
551
552    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
553        let _ = self.sender(topic, capacity).send(record);
554    }
555
556    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
557        let mut map = self.0.lock().expect("event log broadcast map poisoned");
558        map.entry(topic.as_str().to_string())
559            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
560            .clone()
561    }
562}
563
564fn stream_from_broadcast(
565    history: Vec<(EventId, LogEvent)>,
566    from: Option<EventId>,
567    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
568    queue_depth: usize,
569) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
570    let (tx, rx) = mpsc::channel(queue_depth.max(1));
571    // Run the subscription forwarder as a tokio task rather than a detached
572    // OS thread. A dedicated thread running under `futures::executor::block_on`
573    // is invisible to the tokio runtime, so tests that use `start_paused = true`
574    // race against auto-advanced timers while the thread catches up in real
575    // time. Spawning on tokio makes the forwarder participate in runtime
576    // scheduling (including paused-time quiescence) and ties its lifetime to
577    // the runtime's shutdown.
578    tokio::spawn(async move {
579        let mut last_seen = from.unwrap_or(0);
580        for (event_id, event) in history {
581            last_seen = event_id;
582            if tx.send(Ok((event_id, event))).await.is_err() {
583                return;
584            }
585        }
586
587        loop {
588            match live_rx.recv().await {
589                Ok((event_id, event)) if event_id > last_seen => {
590                    last_seen = event_id;
591                    if tx.send(Ok((event_id, event))).await.is_err() {
592                        return;
593                    }
594                }
595                Ok(_) => {}
596                Err(broadcast::error::RecvError::Closed) => return,
597                Err(broadcast::error::RecvError::Lagged(_)) => {
598                    let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
599                    return;
600                }
601            }
602        }
603    });
604    Box::pin(ReceiverStream::new(rx))
605}
606
607#[derive(Default)]
608struct MemoryState {
609    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
610    latest: HashMap<String, EventId>,
611    consumers: HashMap<(String, String), EventId>,
612}
613
614pub struct MemoryEventLog {
615    state: tokio::sync::Mutex<MemoryState>,
616    broadcasts: BroadcastMap,
617    queue_depth: usize,
618}
619
620impl MemoryEventLog {
621    pub fn new(queue_depth: usize) -> Self {
622        Self {
623            state: tokio::sync::Mutex::new(MemoryState::default()),
624            broadcasts: BroadcastMap::default(),
625            queue_depth: queue_depth.max(1),
626        }
627    }
628}
629
630impl EventLog for MemoryEventLog {
631    fn describe(&self) -> EventLogDescription {
632        EventLogDescription {
633            backend: EventLogBackendKind::Memory,
634            location: None,
635            size_bytes: None,
636            queue_depth: self.queue_depth,
637        }
638    }
639
640    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
641        let mut state = self.state.lock().await;
642        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
643        state.latest.insert(topic.as_str().to_string(), event_id);
644        state
645            .topics
646            .entry(topic.as_str().to_string())
647            .or_default()
648            .push_back((event_id, event.clone()));
649        drop(state);
650        self.broadcasts
651            .publish(topic, self.queue_depth, (event_id, event));
652        Ok(event_id)
653    }
654
655    async fn flush(&self) -> Result<(), LogError> {
656        Ok(())
657    }
658
659    async fn read_range(
660        &self,
661        topic: &Topic,
662        from: Option<EventId>,
663        limit: usize,
664    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
665        let from = from.unwrap_or(0);
666        let state = self.state.lock().await;
667        let events = state
668            .topics
669            .get(topic.as_str())
670            .into_iter()
671            .flat_map(|events| events.iter())
672            .filter(|(event_id, _)| *event_id > from)
673            .take(limit)
674            .map(|(event_id, event)| (*event_id, event.clone()))
675            .collect();
676        Ok(events)
677    }
678
679    async fn subscribe(
680        self: Arc<Self>,
681        topic: &Topic,
682        from: Option<EventId>,
683    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
684        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
685        let history = self.read_range(topic, from, usize::MAX).await?;
686        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
687    }
688
689    async fn ack(
690        &self,
691        topic: &Topic,
692        consumer: &ConsumerId,
693        up_to: EventId,
694    ) -> Result<(), LogError> {
695        let mut state = self.state.lock().await;
696        state.consumers.insert(
697            (topic.as_str().to_string(), consumer.as_str().to_string()),
698            up_to,
699        );
700        Ok(())
701    }
702
703    async fn consumer_cursor(
704        &self,
705        topic: &Topic,
706        consumer: &ConsumerId,
707    ) -> Result<Option<EventId>, LogError> {
708        let state = self.state.lock().await;
709        Ok(state
710            .consumers
711            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
712            .copied())
713    }
714
715    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
716        let state = self.state.lock().await;
717        Ok(state.latest.get(topic.as_str()).copied())
718    }
719
720    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
721        let mut state = self.state.lock().await;
722        let Some(events) = state.topics.get_mut(topic.as_str()) else {
723            return Ok(CompactReport::default());
724        };
725        let removed = events
726            .iter()
727            .take_while(|(event_id, _)| *event_id <= before)
728            .count();
729        for _ in 0..removed {
730            events.pop_front();
731        }
732        Ok(CompactReport {
733            removed,
734            remaining: events.len(),
735            latest: state.latest.get(topic.as_str()).copied(),
736            checkpointed: false,
737        })
738    }
739}
740
741#[derive(Serialize, Deserialize)]
742struct FileRecord {
743    id: EventId,
744    event: LogEvent,
745}
746
747pub struct FileEventLog {
748    root: PathBuf,
749    latest_ids: Mutex<HashMap<String, EventId>>,
750    write_lock: Mutex<()>,
751    broadcasts: BroadcastMap,
752    queue_depth: usize,
753}
754
755impl FileEventLog {
756    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
757        std::fs::create_dir_all(root.join("topics"))
758            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
759        std::fs::create_dir_all(root.join("consumers"))
760            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
761        Ok(Self {
762            root,
763            latest_ids: Mutex::new(HashMap::new()),
764            write_lock: Mutex::new(()),
765            broadcasts: BroadcastMap::default(),
766            queue_depth: queue_depth.max(1),
767        })
768    }
769
770    fn topic_path(&self, topic: &Topic) -> PathBuf {
771        self.root
772            .join("topics")
773            .join(format!("{}.jsonl", topic.as_str()))
774    }
775
776    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
777        self.root.join("consumers").join(format!(
778            "{}__{}.json",
779            topic.as_str(),
780            sanitize_filename(consumer.as_str())
781        ))
782    }
783
784    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
785        if let Some(event_id) = self
786            .latest_ids
787            .lock()
788            .expect("file event log latest ids poisoned")
789            .get(topic.as_str())
790            .copied()
791        {
792            return Ok(event_id);
793        }
794
795        let mut latest = 0;
796        let path = self.topic_path(topic);
797        if path.is_file() {
798            for record in read_file_records(&path)? {
799                latest = record.id;
800            }
801        }
802        self.latest_ids
803            .lock()
804            .expect("file event log latest ids poisoned")
805            .insert(topic.as_str().to_string(), latest);
806        Ok(latest)
807    }
808
809    fn read_range_sync(
810        &self,
811        topic: &Topic,
812        from: Option<EventId>,
813        limit: usize,
814    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
815        let path = self.topic_path(topic);
816        if !path.is_file() {
817            return Ok(Vec::new());
818        }
819        let from = from.unwrap_or(0);
820        let mut events = Vec::new();
821        for record in read_file_records(&path)? {
822            if record.id > from {
823                events.push((record.id, record.event));
824            }
825            if events.len() >= limit {
826                break;
827            }
828        }
829        Ok(events)
830    }
831}
832
833fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
834    let file = std::fs::File::open(path)
835        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
836    let mut reader = std::io::BufReader::new(file);
837    let mut records = Vec::new();
838    let mut line = Vec::new();
839    loop {
840        line.clear();
841        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
842            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
843        if bytes_read == 0 {
844            break;
845        }
846        if line.iter().all(u8::is_ascii_whitespace) {
847            continue;
848        }
849        let complete_line = line.ends_with(b"\n");
850        match serde_json::from_slice::<FileRecord>(&line) {
851            Ok(record) => records.push(record),
852            Err(_) if !complete_line => break,
853            Err(error) => {
854                return Err(LogError::Serde(format!("event log parse error: {error}")));
855            }
856        }
857    }
858    Ok(records)
859}
860
861impl EventLog for FileEventLog {
862    fn describe(&self) -> EventLogDescription {
863        EventLogDescription {
864            backend: EventLogBackendKind::File,
865            location: Some(self.root.clone()),
866            size_bytes: Some(dir_size_bytes(&self.root)),
867            queue_depth: self.queue_depth,
868        }
869    }
870
871    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
872        let _guard = self
873            .write_lock
874            .lock()
875            .expect("file event log write lock poisoned");
876        let next_id = self.latest_id_for_topic(topic)? + 1;
877        let record = FileRecord {
878            id: next_id,
879            event: event.clone(),
880        };
881        let path = self.topic_path(topic);
882        if let Some(parent) = path.parent() {
883            std::fs::create_dir_all(parent)
884                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
885        }
886        let line = serde_json::to_string(&record)
887            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
888        use std::io::Write as _;
889        let mut file = std::fs::OpenOptions::new()
890            .create(true)
891            .append(true)
892            .open(&path)
893            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
894        writeln!(file, "{line}")
895            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
896        self.latest_ids
897            .lock()
898            .expect("file event log latest ids poisoned")
899            .insert(topic.as_str().to_string(), next_id);
900        self.broadcasts
901            .publish(topic, self.queue_depth, (next_id, event));
902        Ok(next_id)
903    }
904
905    async fn flush(&self) -> Result<(), LogError> {
906        sync_tree(&self.root)
907    }
908
909    async fn read_range(
910        &self,
911        topic: &Topic,
912        from: Option<EventId>,
913        limit: usize,
914    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
915        self.read_range_sync(topic, from, limit)
916    }
917
918    async fn read_range_bytes(
919        &self,
920        topic: &Topic,
921        from: Option<EventId>,
922        limit: usize,
923    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
924        self.read_range_sync(topic, from, limit)?
925            .into_iter()
926            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
927            .collect()
928    }
929
930    async fn subscribe(
931        self: Arc<Self>,
932        topic: &Topic,
933        from: Option<EventId>,
934    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
935        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
936        let history = self.read_range_sync(topic, from, usize::MAX)?;
937        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
938    }
939
940    async fn ack(
941        &self,
942        topic: &Topic,
943        consumer: &ConsumerId,
944        up_to: EventId,
945    ) -> Result<(), LogError> {
946        let path = self.consumer_path(topic, consumer);
947        let payload = serde_json::json!({
948            "topic": topic.as_str(),
949            "consumer_id": consumer.as_str(),
950            "cursor": up_to,
951            "updated_at_ms": now_ms(),
952        });
953        write_json_atomically(&path, &payload)
954    }
955
956    async fn consumer_cursor(
957        &self,
958        topic: &Topic,
959        consumer: &ConsumerId,
960    ) -> Result<Option<EventId>, LogError> {
961        let path = self.consumer_path(topic, consumer);
962        if !path.is_file() {
963            return Ok(None);
964        }
965        let raw = std::fs::read_to_string(&path)
966            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
967        let payload: serde_json::Value = serde_json::from_str(&raw)
968            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
969        let cursor = payload
970            .get("cursor")
971            .and_then(serde_json::Value::as_u64)
972            .ok_or_else(|| {
973                LogError::Serde("event log consumer record missing numeric cursor".to_string())
974            })?;
975        Ok(Some(cursor))
976    }
977
978    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
979        let latest = self.latest_id_for_topic(topic)?;
980        if latest == 0 {
981            Ok(None)
982        } else {
983            Ok(Some(latest))
984        }
985    }
986
987    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
988        let _guard = self
989            .write_lock
990            .lock()
991            .expect("file event log write lock poisoned");
992        let path = self.topic_path(topic);
993        if !path.is_file() {
994            return Ok(CompactReport::default());
995        }
996        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
997        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
998        let tmp = path.with_extension("jsonl.tmp");
999        if retained.is_empty() {
1000            let _ = std::fs::remove_file(&path);
1001        } else {
1002            let mut writer =
1003                std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
1004                    LogError::Io(format!("event log tmp create error: {error}"))
1005                })?);
1006            use std::io::Write as _;
1007            for (event_id, event) in &retained {
1008                let line = serde_json::to_string(&FileRecord {
1009                    id: *event_id,
1010                    event: event.clone(),
1011                })
1012                .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1013                writeln!(writer, "{line}")
1014                    .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1015            }
1016            writer
1017                .flush()
1018                .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
1019            std::fs::rename(&tmp, &path).map_err(|error| {
1020                LogError::Io(format!("event log compact finalize error: {error}"))
1021            })?;
1022        }
1023        let latest = retained.last().map(|(event_id, _)| *event_id);
1024        self.latest_ids
1025            .lock()
1026            .expect("file event log latest ids poisoned")
1027            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1028        Ok(CompactReport {
1029            removed,
1030            remaining: retained.len(),
1031            latest,
1032            checkpointed: false,
1033        })
1034    }
1035}
1036
1037pub struct SqliteEventLog {
1038    path: PathBuf,
1039    connection: Mutex<Connection>,
1040    broadcasts: BroadcastMap,
1041    queue_depth: usize,
1042}
1043
1044impl SqliteEventLog {
1045    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1046        if let Some(parent) = path.parent() {
1047            std::fs::create_dir_all(parent)
1048                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1049        }
1050        let connection = Connection::open(&path)
1051            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1052        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1053        // SQLITE_BUSY from a previous test's connection that hasn't finished
1054        // dropping yet (parallel `cargo test` on the same process, distinct
1055        // paths, still contends on SQLite's own global mutex under WAL-mode
1056        // promotion). Without this, `journal_mode = WAL` fails fast with
1057        // "database is locked" instead of retrying.
1058        connection
1059            .busy_timeout(std::time::Duration::from_secs(5))
1060            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1061        connection
1062            .pragma_update(None, "journal_mode", "WAL")
1063            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1064        connection
1065            .pragma_update(None, "synchronous", "NORMAL")
1066            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1067        connection
1068            .execute_batch(
1069                "CREATE TABLE IF NOT EXISTS topic_heads (
1070                    topic TEXT PRIMARY KEY,
1071                    last_id INTEGER NOT NULL
1072                );
1073                CREATE TABLE IF NOT EXISTS events (
1074                    topic TEXT NOT NULL,
1075                    event_id INTEGER NOT NULL,
1076                    kind TEXT NOT NULL,
1077                    payload BLOB NOT NULL,
1078                    headers TEXT NOT NULL,
1079                    occurred_at_ms INTEGER NOT NULL,
1080                    PRIMARY KEY (topic, event_id)
1081                );
1082                CREATE TABLE IF NOT EXISTS consumers (
1083                    topic TEXT NOT NULL,
1084                    consumer_id TEXT NOT NULL,
1085                    cursor INTEGER NOT NULL,
1086                    updated_at_ms INTEGER NOT NULL,
1087                    PRIMARY KEY (topic, consumer_id)
1088                );",
1089            )
1090            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1091        Ok(Self {
1092            path,
1093            connection: Mutex::new(connection),
1094            broadcasts: BroadcastMap::default(),
1095            queue_depth: queue_depth.max(1),
1096        })
1097    }
1098}
1099
1100impl EventLog for SqliteEventLog {
1101    fn describe(&self) -> EventLogDescription {
1102        EventLogDescription {
1103            backend: EventLogBackendKind::Sqlite,
1104            location: Some(self.path.clone()),
1105            size_bytes: Some(sqlite_size_bytes(&self.path)),
1106            queue_depth: self.queue_depth,
1107        }
1108    }
1109
1110    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1111        let mut connection = self
1112            .connection
1113            .lock()
1114            .expect("sqlite event log connection poisoned");
1115        let tx = connection
1116            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1117            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1118        tx.execute(
1119            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1120            params![topic.as_str()],
1121        )
1122        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1123        tx.execute(
1124            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1125            params![topic.as_str()],
1126        )
1127        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1128        let event_id = tx
1129            .query_row(
1130                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1131                params![topic.as_str()],
1132                |row| row.get::<_, i64>(0),
1133            )
1134            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1135            .and_then(sqlite_i64_to_event_id)?;
1136        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1137        tx.execute(
1138            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1139             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1140            params![
1141                topic.as_str(),
1142                event_id_sql,
1143                event.kind,
1144                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1145                    "event log payload encode error: {error}"
1146                )))?,
1147                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1148                    "event log headers encode error: {error}"
1149                )))?,
1150                event.occurred_at_ms
1151            ],
1152        )
1153        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1154        tx.commit()
1155            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1156        self.broadcasts
1157            .publish(topic, self.queue_depth, (event_id, event.clone()));
1158        Ok(event_id)
1159    }
1160
1161    async fn flush(&self) -> Result<(), LogError> {
1162        let connection = self
1163            .connection
1164            .lock()
1165            .expect("sqlite event log connection poisoned");
1166        connection
1167            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1168            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1169        Ok(())
1170    }
1171
1172    async fn read_range(
1173        &self,
1174        topic: &Topic,
1175        from: Option<EventId>,
1176        limit: usize,
1177    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1178        let connection = self
1179            .connection
1180            .lock()
1181            .expect("sqlite event log connection poisoned");
1182        let mut statement = connection
1183            .prepare(
1184                "SELECT event_id, kind, payload, headers, occurred_at_ms
1185                 FROM events
1186                 WHERE topic = ?1 AND event_id > ?2
1187                 ORDER BY event_id ASC
1188                 LIMIT ?3",
1189            )
1190            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1191        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1192        let rows = statement
1193            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1194                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1195                let headers: String = row.get(3)?;
1196                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1197                Ok((
1198                    event_id,
1199                    LogEvent {
1200                        kind: row.get(1)?,
1201                        payload: serde_json::from_slice(&payload).map_err(|error| {
1202                            rusqlite::Error::FromSqlConversionFailure(
1203                                payload.len(),
1204                                rusqlite::types::Type::Blob,
1205                                Box::new(error),
1206                            )
1207                        })?,
1208                        headers: serde_json::from_str(&headers).map_err(|error| {
1209                            rusqlite::Error::FromSqlConversionFailure(
1210                                headers.len(),
1211                                rusqlite::types::Type::Text,
1212                                Box::new(error),
1213                            )
1214                        })?,
1215                        occurred_at_ms: row.get(4)?,
1216                    },
1217                ))
1218            })
1219            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1220        let mut events = Vec::new();
1221        for row in rows {
1222            events.push(
1223                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1224            );
1225        }
1226        Ok(events)
1227    }
1228
1229    async fn read_range_bytes(
1230        &self,
1231        topic: &Topic,
1232        from: Option<EventId>,
1233        limit: usize,
1234    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1235        let connection = self
1236            .connection
1237            .lock()
1238            .expect("sqlite event log connection poisoned");
1239        let mut statement = connection
1240            .prepare(
1241                "SELECT event_id, kind, payload, headers, occurred_at_ms
1242                 FROM events
1243                 WHERE topic = ?1 AND event_id > ?2
1244                 ORDER BY event_id ASC
1245                 LIMIT ?3",
1246            )
1247            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1248        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1249        let rows = statement
1250            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1251                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1252                let headers: String = row.get(3)?;
1253                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1254                Ok((
1255                    event_id,
1256                    LogEventBytes {
1257                        kind: row.get(1)?,
1258                        payload: Bytes::from(payload),
1259                        headers: serde_json::from_str(&headers).map_err(|error| {
1260                            rusqlite::Error::FromSqlConversionFailure(
1261                                headers.len(),
1262                                rusqlite::types::Type::Text,
1263                                Box::new(error),
1264                            )
1265                        })?,
1266                        occurred_at_ms: row.get(4)?,
1267                    },
1268                ))
1269            })
1270            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1271        let mut events = Vec::new();
1272        for row in rows {
1273            events.push(
1274                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1275            );
1276        }
1277        Ok(events)
1278    }
1279
1280    async fn subscribe(
1281        self: Arc<Self>,
1282        topic: &Topic,
1283        from: Option<EventId>,
1284    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1285        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1286        let history = self.read_range(topic, from, usize::MAX).await?;
1287        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1288    }
1289
1290    async fn ack(
1291        &self,
1292        topic: &Topic,
1293        consumer: &ConsumerId,
1294        up_to: EventId,
1295    ) -> Result<(), LogError> {
1296        let connection = self
1297            .connection
1298            .lock()
1299            .expect("sqlite event log connection poisoned");
1300        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1301        connection
1302            .execute(
1303                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1304                 VALUES (?1, ?2, ?3, ?4)
1305                 ON CONFLICT(topic, consumer_id)
1306                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1307                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1308            )
1309            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1310        Ok(())
1311    }
1312
1313    async fn consumer_cursor(
1314        &self,
1315        topic: &Topic,
1316        consumer: &ConsumerId,
1317    ) -> Result<Option<EventId>, LogError> {
1318        let connection = self
1319            .connection
1320            .lock()
1321            .expect("sqlite event log connection poisoned");
1322        connection
1323            .query_row(
1324                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1325                params![topic.as_str(), consumer.as_str()],
1326                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1327            )
1328            .optional()
1329            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1330    }
1331
1332    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1333        let connection = self
1334            .connection
1335            .lock()
1336            .expect("sqlite event log connection poisoned");
1337        connection
1338            .query_row(
1339                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1340                params![topic.as_str()],
1341                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1342            )
1343            .optional()
1344            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1345    }
1346
1347    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1348        let connection = self
1349            .connection
1350            .lock()
1351            .expect("sqlite event log connection poisoned");
1352        let before_sql = event_id_to_sqlite_i64(before)?;
1353        let removed = connection
1354            .execute(
1355                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1356                params![topic.as_str(), before_sql],
1357            )
1358            .map_err(|error| {
1359                LogError::Sqlite(format!("event log compact delete error: {error}"))
1360            })?;
1361        let remaining = connection
1362            .query_row(
1363                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1364                params![topic.as_str()],
1365                |row| row.get::<_, i64>(0),
1366            )
1367            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1368            .and_then(sqlite_i64_to_usize)?;
1369        let latest = connection
1370            .query_row(
1371                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1372                params![topic.as_str()],
1373                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1374            )
1375            .optional()
1376            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1377        connection
1378            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1379            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1380        Ok(CompactReport {
1381            removed,
1382            remaining,
1383            latest,
1384            checkpointed: true,
1385        })
1386    }
1387}
1388
1389fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1390    let candidate = PathBuf::from(value);
1391    if candidate.is_absolute() {
1392        candidate
1393    } else {
1394        base_dir.join(candidate)
1395    }
1396}
1397
1398fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1399    if let Some(parent) = path.parent() {
1400        std::fs::create_dir_all(parent)
1401            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1402    }
1403    let tmp = path.with_extension("tmp");
1404    let encoded = serde_json::to_vec_pretty(payload)
1405        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1406    std::fs::write(&tmp, encoded)
1407        .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1408    std::fs::rename(&tmp, path)
1409        .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1410    Ok(())
1411}
1412
1413fn sanitize_filename(value: &str) -> String {
1414    sanitize_topic_component(value)
1415}
1416
1417pub fn sanitize_topic_component(value: &str) -> String {
1418    value
1419        .chars()
1420        .map(|ch| {
1421            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1422                ch
1423            } else {
1424                '_'
1425            }
1426        })
1427        .collect()
1428}
1429
1430fn dir_size_bytes(path: &Path) -> u64 {
1431    if !path.exists() {
1432        return 0;
1433    }
1434    let mut total = 0;
1435    if let Ok(entries) = std::fs::read_dir(path) {
1436        for entry in entries.flatten() {
1437            let path = entry.path();
1438            if path.is_dir() {
1439                total += dir_size_bytes(&path);
1440            } else if let Ok(metadata) = entry.metadata() {
1441                total += metadata.len();
1442            }
1443        }
1444    }
1445    total
1446}
1447
1448fn sqlite_size_bytes(path: &Path) -> u64 {
1449    let mut total = file_size(path);
1450    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1451    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1452    total
1453}
1454
1455fn file_size(path: &Path) -> u64 {
1456    std::fs::metadata(path)
1457        .map(|metadata| metadata.len())
1458        .unwrap_or(0)
1459}
1460
1461fn sync_tree(root: &Path) -> Result<(), LogError> {
1462    if !root.exists() {
1463        return Ok(());
1464    }
1465    for entry in std::fs::read_dir(root)
1466        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1467    {
1468        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1469        let path = entry.path();
1470        if path.is_dir() {
1471            sync_tree(&path)?;
1472            continue;
1473        }
1474        std::fs::File::open(&path)
1475            .and_then(|file| file.sync_all())
1476            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1477    }
1478    Ok(())
1479}
1480
1481fn now_ms() -> i64 {
1482    std::time::SystemTime::now()
1483        .duration_since(std::time::UNIX_EPOCH)
1484        .map(|duration| duration.as_millis() as i64)
1485        .unwrap_or(0)
1486}
1487
1488fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1489    i64::try_from(event_id)
1490        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1491}
1492
1493fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1494    u64::try_from(value)
1495        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1496}
1497
1498fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1499    u64::try_from(value).map_err(|_| {
1500        rusqlite::Error::FromSqlConversionFailure(
1501            std::mem::size_of::<i64>(),
1502            rusqlite::types::Type::Integer,
1503            "sqlite event id is negative".into(),
1504        )
1505    })
1506}
1507
1508fn sqlite_json_bytes_for_row(
1509    row: &rusqlite::Row<'_>,
1510    index: usize,
1511    name: &str,
1512) -> rusqlite::Result<Vec<u8>> {
1513    let value = row.get_ref(index)?;
1514    match value {
1515        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1516            Ok(bytes.to_vec())
1517        }
1518        other => Err(rusqlite::Error::InvalidColumnType(
1519            index,
1520            name.to_string(),
1521            other.data_type(),
1522        )),
1523    }
1524}
1525
1526fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1527    usize::try_from(value)
1528        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533    use super::*;
1534    use futures::StreamExt;
1535    use rand::{rngs::StdRng, RngExt, SeedableRng};
1536
1537    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1538        let topic = Topic::new("trigger.inbox").unwrap();
1539        for i in 0..10_000 {
1540            log.append(
1541                &topic,
1542                LogEvent::new("append", serde_json::json!({ "i": i })),
1543            )
1544            .await
1545            .unwrap();
1546        }
1547        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1548        assert_eq!(events.len(), 10_000);
1549        assert_eq!(events.first().unwrap().0, 1);
1550        assert_eq!(events.last().unwrap().0, 10_000);
1551    }
1552
1553    #[tokio::test(flavor = "current_thread")]
1554    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1555        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1556        exercise_basic_backend(log.clone()).await;
1557
1558        let topic = Topic::new("agent.transcript.demo").unwrap();
1559        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1560        let first = log
1561            .append(
1562                &topic,
1563                LogEvent::new("message", serde_json::json!({"text":"one"})),
1564            )
1565            .await
1566            .unwrap();
1567        let second = log
1568            .append(
1569                &topic,
1570                LogEvent::new("message", serde_json::json!({"text":"two"})),
1571            )
1572            .await
1573            .unwrap();
1574        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1575        assert_eq!(seen[0].as_ref().unwrap().0, first);
1576        assert_eq!(seen[1].as_ref().unwrap().0, second);
1577
1578        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1579            .await
1580            .unwrap();
1581        let compact = log.compact(&topic, first).await.unwrap();
1582        assert_eq!(compact.removed, 1);
1583        assert_eq!(compact.remaining, 1);
1584    }
1585
1586    #[tokio::test(flavor = "current_thread")]
1587    async fn file_backend_persists_across_reopen_and_compacts() {
1588        let dir = tempfile::tempdir().unwrap();
1589        let topic = Topic::new("trigger.outbox").unwrap();
1590        let first_log = Arc::new(AnyEventLog::File(
1591            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1592        ));
1593        first_log
1594            .append(
1595                &topic,
1596                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1597            )
1598            .await
1599            .unwrap();
1600        first_log
1601            .append(
1602                &topic,
1603                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1604            )
1605            .await
1606            .unwrap();
1607        drop(first_log);
1608
1609        let reopened = Arc::new(AnyEventLog::File(
1610            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1611        ));
1612        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1613        assert_eq!(events.len(), 2);
1614        let compact = reopened.compact(&topic, 1).await.unwrap();
1615        assert_eq!(compact.removed, 1);
1616        assert_eq!(
1617            reopened
1618                .read_range(&topic, None, usize::MAX)
1619                .await
1620                .unwrap()
1621                .len(),
1622            1
1623        );
1624    }
1625
1626    #[tokio::test(flavor = "current_thread")]
1627    async fn file_backend_skips_torn_tail_on_restart() {
1628        let dir = tempfile::tempdir().unwrap();
1629        let topic = Topic::new("trigger.inbox").unwrap();
1630        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1631        first_log
1632            .append(
1633                &topic,
1634                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1635            )
1636            .await
1637            .unwrap();
1638        drop(first_log);
1639
1640        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1641        use std::io::Write as _;
1642        let mut file = std::fs::OpenOptions::new()
1643            .append(true)
1644            .open(&topic_path)
1645            .unwrap();
1646        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1647        drop(file);
1648
1649        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1650        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1651        assert_eq!(events.len(), 1);
1652        assert_eq!(events[0].0, 1);
1653        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1654    }
1655
1656    #[tokio::test(flavor = "current_thread")]
1657    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1658        let dir = tempfile::tempdir().unwrap();
1659        let path = dir.path().join("events.sqlite");
1660        let topic = Topic::new("daemon.demo.state").unwrap();
1661        let first_log = Arc::new(AnyEventLog::Sqlite(
1662            SqliteEventLog::open(path.clone(), 8).unwrap(),
1663        ));
1664        first_log
1665            .append(
1666                &topic,
1667                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1668            )
1669            .await
1670            .unwrap();
1671        first_log
1672            .append(
1673                &topic,
1674                LogEvent::new("state", serde_json::json!({"state":"active"})),
1675            )
1676            .await
1677            .unwrap();
1678        drop(first_log);
1679
1680        let reopened = Arc::new(AnyEventLog::Sqlite(
1681            SqliteEventLog::open(path.clone(), 8).unwrap(),
1682        ));
1683        assert_eq!(
1684            reopened
1685                .read_range(&topic, None, usize::MAX)
1686                .await
1687                .unwrap()
1688                .len(),
1689            2
1690        );
1691        let compact = reopened.compact(&topic, 1).await.unwrap();
1692        assert!(compact.checkpointed);
1693        let wal = PathBuf::from(format!("{}-wal", path.display()));
1694        assert!(file_size(&wal) == 0 || !wal.exists());
1695    }
1696
1697    #[tokio::test(flavor = "current_thread")]
1698    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1699        let dir = tempfile::tempdir().unwrap();
1700        let path = dir.path().join("events.sqlite");
1701        let topic = Topic::new("observability.action_graph").unwrap();
1702        let log = SqliteEventLog::open(path, 8).unwrap();
1703        let event_id = log
1704            .append(
1705                &topic,
1706                LogEvent::new(
1707                    "snapshot",
1708                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1709                ),
1710            )
1711            .await
1712            .unwrap();
1713
1714        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1715        assert_eq!(events.len(), 1);
1716        assert_eq!(events[0].0, event_id);
1717        assert_eq!(
1718            events[0].1.payload_json().unwrap(),
1719            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1720        );
1721    }
1722
1723    #[tokio::test(flavor = "current_thread")]
1724    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1725        let dir = tempfile::tempdir().unwrap();
1726        let path = dir.path().join("events.sqlite");
1727        let topic = Topic::new("agent.transcript.legacy").unwrap();
1728        let log = SqliteEventLog::open(path, 8).unwrap();
1729        {
1730            let connection = log.connection.lock().unwrap();
1731            connection
1732                .execute(
1733                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1734                    params![topic.as_str()],
1735                )
1736                .unwrap();
1737            connection
1738                .execute(
1739                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1740                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1741                    params![topic.as_str(), "{\"text\":\"old\"}"],
1742                )
1743                .unwrap();
1744        }
1745
1746        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1747        assert_eq!(
1748            events[0].1.payload_json().unwrap(),
1749            serde_json::json!({"text": "old"})
1750        );
1751        assert_eq!(
1752            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1753            "legacy"
1754        );
1755    }
1756
1757    #[tokio::test(flavor = "current_thread")]
1758    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1759        let (sender, rx) = broadcast::channel(2);
1760        for i in 0..10 {
1761            sender
1762                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1763                .unwrap();
1764        }
1765        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1766
1767        match stream.next().await {
1768            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1769            other => panic!("subscriber should surface lag, got {other:?}"),
1770        }
1771    }
1772
1773    #[tokio::test(flavor = "current_thread")]
1774    async fn randomized_reader_sequences_stay_monotonic() {
1775        let log = Arc::new(MemoryEventLog::new(32));
1776        let topic = Topic::new("fuzz.demo").unwrap();
1777        let mut readers = vec![
1778            log.clone().subscribe(&topic, None).await.unwrap(),
1779            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1780            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1781        ];
1782        let mut rng = StdRng::seed_from_u64(7);
1783        for _ in 0..64 {
1784            let value = rng.random_range(0..1000);
1785            log.append(
1786                &topic,
1787                LogEvent::new("rand", serde_json::json!({"value": value})),
1788            )
1789            .await
1790            .unwrap();
1791        }
1792
1793        let mut sequences = Vec::new();
1794        for reader in &mut readers {
1795            let mut ids = Vec::new();
1796            while let Some(item) = reader.next().await {
1797                match item {
1798                    Ok((event_id, _)) => {
1799                        ids.push(event_id);
1800                        if ids.len() >= 16 {
1801                            break;
1802                        }
1803                    }
1804                    Err(LogError::ConsumerLagged(_)) => break,
1805                    Err(error) => panic!("unexpected subscription error: {error}"),
1806                }
1807            }
1808            sequences.push(ids);
1809        }
1810
1811        for ids in sequences {
1812            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1813        }
1814    }
1815}