Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27        let value = value.into();
28        if value.is_empty() {
29            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30        }
31        if !value
32            .chars()
33            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34        {
35            return Err(LogError::InvalidTopic(format!(
36                "topic '{value}' contains unsupported characters"
37            )));
38        }
39        Ok(Self(value))
40    }
41
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl fmt::Display for Topic {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        self.0.fmt(f)
50    }
51}
52
53impl FromStr for Topic {
54    type Err = LogError;
55
56    fn from_str(s: &str) -> Result<Self, Self::Err> {
57        Self::new(s)
58    }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66        let value = value.into();
67        if value.trim().is_empty() {
68            return Err(LogError::InvalidConsumer(
69                "consumer id cannot be empty".to_string(),
70            ));
71        }
72        Ok(Self(value))
73    }
74
75    pub fn as_str(&self) -> &str {
76        &self.0
77    }
78}
79
80impl fmt::Display for ConsumerId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        self.0.fmt(f)
83    }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89    Memory,
90    File,
91    Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Memory => write!(f, "memory"),
98            Self::File => write!(f, "file"),
99            Self::Sqlite => write!(f, "sqlite"),
100        }
101    }
102}
103
104impl FromStr for EventLogBackendKind {
105    type Err = LogError;
106
107    fn from_str(value: &str) -> Result<Self, Self::Err> {
108        match value.trim().to_ascii_lowercase().as_str() {
109            "memory" => Ok(Self::Memory),
110            "file" => Ok(Self::File),
111            "sqlite" => Ok(Self::Sqlite),
112            other => Err(LogError::Config(format!(
113                "unsupported event log backend '{other}'"
114            ))),
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121    pub kind: String,
122    pub payload: serde_json::Value,
123    #[serde(default)]
124    pub headers: BTreeMap<String, String>,
125    pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130        Self {
131            kind: kind.into(),
132            payload,
133            headers: BTreeMap::new(),
134            occurred_at_ms: now_ms(),
135        }
136    }
137
138    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139        self.headers = headers;
140        self
141    }
142}
143
144/// Serialized event payload form for large read paths.
145///
146/// `payload` contains the original JSON bytes for backends that can expose
147/// them directly. Callers that only need to forward or hash the payload can
148/// avoid materializing a `serde_json::Value`; callers that need structured
149/// access can opt in with `payload_json`.
150#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152    pub kind: String,
153    pub payload: Bytes,
154    pub headers: BTreeMap<String, String>,
155    pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160        serde_json::from_slice(&self.payload)
161            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162    }
163
164    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165        Ok(LogEvent {
166            kind: self.kind,
167            payload: serde_json::from_slice(&self.payload).map_err(|error| {
168                LogError::Serde(format!("event log payload parse error: {error}"))
169            })?,
170            headers: self.headers,
171            occurred_at_ms: self.occurred_at_ms,
172        })
173    }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177    type Error = LogError;
178
179    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180        let payload = serde_json::to_vec(&event.payload)
181            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182        Ok(Self {
183            kind: event.kind,
184            payload: Bytes::from(payload),
185            headers: event.headers,
186            occurred_at_ms: event.occurred_at_ms,
187        })
188    }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193    pub removed: usize,
194    pub remaining: usize,
195    pub latest: Option<EventId>,
196    pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201    pub backend: EventLogBackendKind,
202    pub location: Option<PathBuf>,
203    pub size_bytes: Option<u64>,
204    pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209    Config(String),
210    InvalidTopic(String),
211    InvalidConsumer(String),
212    Io(String),
213    Serde(String),
214    Sqlite(String),
215    ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        match self {
221            Self::Config(message)
222            | Self::InvalidTopic(message)
223            | Self::InvalidConsumer(message)
224            | Self::Io(message)
225            | Self::Serde(message)
226            | Self::Sqlite(message) => message.fmt(f),
227            Self::ConsumerLagged(last_id) => {
228                write!(f, "subscriber lagged behind after event {last_id}")
229            }
230        }
231    }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238    fn describe(&self) -> EventLogDescription;
239
240    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242    async fn flush(&self) -> Result<(), LogError>;
243
244    /// Read events strictly after `from`. `None` starts from the
245    /// beginning of the topic.
246    async fn read_range(
247        &self,
248        topic: &Topic,
249        from: Option<EventId>,
250        limit: usize,
251    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253    async fn read_range_bytes(
254        &self,
255        topic: &Topic,
256        from: Option<EventId>,
257        limit: usize,
258    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259        let events = self.read_range(topic, from, limit).await?;
260        events
261            .into_iter()
262            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263            .collect()
264    }
265
266    /// `async fn` keeps the ergonomic generic surface; the boxed stream
267    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
268    async fn subscribe(
269        self: Arc<Self>,
270        topic: &Topic,
271        from: Option<EventId>,
272    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274    async fn ack(
275        &self,
276        topic: &Topic,
277        consumer: &ConsumerId,
278        up_to: EventId,
279    ) -> Result<(), LogError>;
280
281    async fn consumer_cursor(
282        &self,
283        topic: &Topic,
284        consumer: &ConsumerId,
285    ) -> Result<Option<EventId>, LogError>;
286
287    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294    pub backend: EventLogBackendKind,
295    pub file_dir: PathBuf,
296    pub sqlite_path: PathBuf,
297    pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303            .ok()
304            .map(|value| value.parse())
305            .transpose()?
306            .unwrap_or(EventLogBackendKind::Sqlite);
307        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308            .ok()
309            .and_then(|value| value.parse::<usize>().ok())
310            .unwrap_or(128)
311            .max(1);
312
313        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315            _ => crate::runtime_paths::event_log_dir(base_dir),
316        };
317        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320        };
321
322        Ok(Self {
323            backend,
324            file_dir,
325            sqlite_path,
326            queue_depth,
327        })
328    }
329
330    pub fn location(&self) -> Option<PathBuf> {
331        match self.backend {
332            EventLogBackendKind::Memory => None,
333            EventLogBackendKind::File => Some(self.file_dir.clone()),
334            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335        }
336    }
337}
338
339thread_local! {
340    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344    let config = EventLogConfig::for_base_dir(base_dir)?;
345    let log = open_event_log(&config)?;
346    ACTIVE_EVENT_LOG.with(|slot| {
347        *slot.borrow_mut() = Some(log.clone());
348    });
349    Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354    ACTIVE_EVENT_LOG.with(|slot| {
355        *slot.borrow_mut() = Some(log.clone());
356    });
357    log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361    ACTIVE_EVENT_LOG.with(|slot| {
362        *slot.borrow_mut() = Some(log.clone());
363    });
364    log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372    ACTIVE_EVENT_LOG.with(|slot| {
373        *slot.borrow_mut() = None;
374    });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378    let config = EventLogConfig::for_base_dir(base_dir)?;
379    let description = match config.backend {
380        EventLogBackendKind::Memory => EventLogDescription {
381            backend: EventLogBackendKind::Memory,
382            location: None,
383            size_bytes: None,
384            queue_depth: config.queue_depth,
385        },
386        EventLogBackendKind::File => EventLogDescription {
387            backend: EventLogBackendKind::File,
388            size_bytes: Some(dir_size_bytes(&config.file_dir)),
389            location: Some(config.file_dir),
390            queue_depth: config.queue_depth,
391        },
392        EventLogBackendKind::Sqlite => EventLogDescription {
393            backend: EventLogBackendKind::Sqlite,
394            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395            location: Some(config.sqlite_path),
396            queue_depth: config.queue_depth,
397        },
398    };
399    Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403    match config.backend {
404        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405            config.queue_depth,
406        )))),
407        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408            config.file_dir.clone(),
409            config.queue_depth,
410        )?))),
411        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412            config.sqlite_path.clone(),
413            config.queue_depth,
414        )?))),
415    }
416}
417
418pub enum AnyEventLog {
419    Memory(MemoryEventLog),
420    File(FileEventLog),
421    Sqlite(SqliteEventLog),
422}
423
424impl AnyEventLog {
425    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
426        match self {
427            Self::Memory(log) => log.topics().await,
428            Self::File(log) => log.topics(),
429            Self::Sqlite(log) => log.topics(),
430        }
431    }
432}
433
434impl EventLog for AnyEventLog {
435    fn describe(&self) -> EventLogDescription {
436        match self {
437            Self::Memory(log) => log.describe(),
438            Self::File(log) => log.describe(),
439            Self::Sqlite(log) => log.describe(),
440        }
441    }
442
443    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
444        match self {
445            Self::Memory(log) => log.append(topic, event).await,
446            Self::File(log) => log.append(topic, event).await,
447            Self::Sqlite(log) => log.append(topic, event).await,
448        }
449    }
450
451    async fn flush(&self) -> Result<(), LogError> {
452        match self {
453            Self::Memory(log) => log.flush().await,
454            Self::File(log) => log.flush().await,
455            Self::Sqlite(log) => log.flush().await,
456        }
457    }
458
459    async fn read_range(
460        &self,
461        topic: &Topic,
462        from: Option<EventId>,
463        limit: usize,
464    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
465        match self {
466            Self::Memory(log) => log.read_range(topic, from, limit).await,
467            Self::File(log) => log.read_range(topic, from, limit).await,
468            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
469        }
470    }
471
472    async fn read_range_bytes(
473        &self,
474        topic: &Topic,
475        from: Option<EventId>,
476        limit: usize,
477    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
478        match self {
479            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
480            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
481            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
482        }
483    }
484
485    async fn subscribe(
486        self: Arc<Self>,
487        topic: &Topic,
488        from: Option<EventId>,
489    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
490        let (rx, queue_depth) = match self.as_ref() {
491            Self::Memory(log) => (
492                log.broadcasts.subscribe(topic, log.queue_depth),
493                log.queue_depth,
494            ),
495            Self::File(log) => (
496                log.broadcasts.subscribe(topic, log.queue_depth),
497                log.queue_depth,
498            ),
499            Self::Sqlite(log) => (
500                log.broadcasts.subscribe(topic, log.queue_depth),
501                log.queue_depth,
502            ),
503        };
504        let history = self.read_range(topic, from, usize::MAX).await?;
505        Ok(stream_from_broadcast(history, from, rx, queue_depth))
506    }
507
508    async fn ack(
509        &self,
510        topic: &Topic,
511        consumer: &ConsumerId,
512        up_to: EventId,
513    ) -> Result<(), LogError> {
514        match self {
515            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
516            Self::File(log) => log.ack(topic, consumer, up_to).await,
517            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
518        }
519    }
520
521    async fn consumer_cursor(
522        &self,
523        topic: &Topic,
524        consumer: &ConsumerId,
525    ) -> Result<Option<EventId>, LogError> {
526        match self {
527            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
528            Self::File(log) => log.consumer_cursor(topic, consumer).await,
529            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
530        }
531    }
532
533    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
534        match self {
535            Self::Memory(log) => log.latest(topic).await,
536            Self::File(log) => log.latest(topic).await,
537            Self::Sqlite(log) => log.latest(topic).await,
538        }
539    }
540
541    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
542        match self {
543            Self::Memory(log) => log.compact(topic, before).await,
544            Self::File(log) => log.compact(topic, before).await,
545            Self::Sqlite(log) => log.compact(topic, before).await,
546        }
547    }
548}
549
550#[derive(Default)]
551struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
552
553impl BroadcastMap {
554    fn subscribe(
555        &self,
556        topic: &Topic,
557        capacity: usize,
558    ) -> broadcast::Receiver<(EventId, LogEvent)> {
559        self.sender(topic, capacity).subscribe()
560    }
561
562    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
563        let _ = self.sender(topic, capacity).send(record);
564    }
565
566    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
567        let mut map = self.0.lock().expect("event log broadcast map poisoned");
568        map.entry(topic.as_str().to_string())
569            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
570            .clone()
571    }
572}
573
574fn stream_from_broadcast(
575    history: Vec<(EventId, LogEvent)>,
576    from: Option<EventId>,
577    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
578    queue_depth: usize,
579) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
580    let (tx, rx) = mpsc::channel(queue_depth.max(1));
581    // Run the subscription forwarder as a tokio task rather than a detached
582    // OS thread. A dedicated thread running under `futures::executor::block_on`
583    // is invisible to the tokio runtime, so tests that use `start_paused = true`
584    // race against auto-advanced timers while the thread catches up in real
585    // time. Spawning on tokio makes the forwarder participate in runtime
586    // scheduling (including paused-time quiescence) and ties its lifetime to
587    // the runtime's shutdown.
588    tokio::spawn(async move {
589        let mut last_seen = from.unwrap_or(0);
590        for (event_id, event) in history {
591            last_seen = event_id;
592            if tx.send(Ok((event_id, event))).await.is_err() {
593                return;
594            }
595        }
596
597        loop {
598            tokio::select! {
599                _ = tx.closed() => return,
600                received = live_rx.recv() => {
601                    match received {
602                        Ok((event_id, event)) if event_id > last_seen => {
603                            last_seen = event_id;
604                            if tx.send(Ok((event_id, event))).await.is_err() {
605                                return;
606                            }
607                        }
608                        Ok(_) => {}
609                        Err(broadcast::error::RecvError::Closed) => return,
610                        Err(broadcast::error::RecvError::Lagged(_)) => {
611                            let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
612                            return;
613                        }
614                    }
615                }
616            }
617        }
618    });
619    Box::pin(ReceiverStream::new(rx))
620}
621
622#[derive(Default)]
623struct MemoryState {
624    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
625    latest: HashMap<String, EventId>,
626    consumers: HashMap<(String, String), EventId>,
627}
628
629pub struct MemoryEventLog {
630    state: tokio::sync::Mutex<MemoryState>,
631    broadcasts: BroadcastMap,
632    queue_depth: usize,
633}
634
635impl MemoryEventLog {
636    pub fn new(queue_depth: usize) -> Self {
637        Self {
638            state: tokio::sync::Mutex::new(MemoryState::default()),
639            broadcasts: BroadcastMap::default(),
640            queue_depth: queue_depth.max(1),
641        }
642    }
643
644    async fn topics(&self) -> Result<Vec<Topic>, LogError> {
645        let state = self.state.lock().await;
646        let mut topics = state
647            .topics
648            .keys()
649            .map(|topic| Topic::new(topic.clone()))
650            .collect::<Result<Vec<_>, _>>()?;
651        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
652        Ok(topics)
653    }
654}
655
656impl EventLog for MemoryEventLog {
657    fn describe(&self) -> EventLogDescription {
658        EventLogDescription {
659            backend: EventLogBackendKind::Memory,
660            location: None,
661            size_bytes: None,
662            queue_depth: self.queue_depth,
663        }
664    }
665
666    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
667        let mut state = self.state.lock().await;
668        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
669        let previous_hash = state
670            .topics
671            .get(topic.as_str())
672            .and_then(|events| events.back())
673            .map(|(previous_id, previous_event)| {
674                crate::provenance::event_record_hash_from_headers(
675                    topic.as_str(),
676                    *previous_id,
677                    previous_event,
678                )
679            })
680            .transpose()?;
681        let event = crate::provenance::prepare_event_for_append(
682            topic.as_str(),
683            event_id,
684            previous_hash,
685            event,
686        )?;
687        state.latest.insert(topic.as_str().to_string(), event_id);
688        state
689            .topics
690            .entry(topic.as_str().to_string())
691            .or_default()
692            .push_back((event_id, event.clone()));
693        drop(state);
694        self.broadcasts
695            .publish(topic, self.queue_depth, (event_id, event));
696        Ok(event_id)
697    }
698
699    async fn flush(&self) -> Result<(), LogError> {
700        Ok(())
701    }
702
703    async fn read_range(
704        &self,
705        topic: &Topic,
706        from: Option<EventId>,
707        limit: usize,
708    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
709        let from = from.unwrap_or(0);
710        let state = self.state.lock().await;
711        let events = state
712            .topics
713            .get(topic.as_str())
714            .into_iter()
715            .flat_map(|events| events.iter())
716            .filter(|(event_id, _)| *event_id > from)
717            .take(limit)
718            .map(|(event_id, event)| (*event_id, event.clone()))
719            .collect();
720        Ok(events)
721    }
722
723    async fn subscribe(
724        self: Arc<Self>,
725        topic: &Topic,
726        from: Option<EventId>,
727    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
728        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
729        let history = self.read_range(topic, from, usize::MAX).await?;
730        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
731    }
732
733    async fn ack(
734        &self,
735        topic: &Topic,
736        consumer: &ConsumerId,
737        up_to: EventId,
738    ) -> Result<(), LogError> {
739        let mut state = self.state.lock().await;
740        state.consumers.insert(
741            (topic.as_str().to_string(), consumer.as_str().to_string()),
742            up_to,
743        );
744        Ok(())
745    }
746
747    async fn consumer_cursor(
748        &self,
749        topic: &Topic,
750        consumer: &ConsumerId,
751    ) -> Result<Option<EventId>, LogError> {
752        let state = self.state.lock().await;
753        Ok(state
754            .consumers
755            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
756            .copied())
757    }
758
759    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
760        let state = self.state.lock().await;
761        Ok(state.latest.get(topic.as_str()).copied())
762    }
763
764    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
765        let mut state = self.state.lock().await;
766        let Some(events) = state.topics.get_mut(topic.as_str()) else {
767            return Ok(CompactReport::default());
768        };
769        let removed = events
770            .iter()
771            .take_while(|(event_id, _)| *event_id <= before)
772            .count();
773        for _ in 0..removed {
774            events.pop_front();
775        }
776        Ok(CompactReport {
777            removed,
778            remaining: events.len(),
779            latest: state.latest.get(topic.as_str()).copied(),
780            checkpointed: false,
781        })
782    }
783}
784
785#[derive(Serialize, Deserialize)]
786struct FileRecord {
787    id: EventId,
788    event: LogEvent,
789}
790
791pub struct FileEventLog {
792    root: PathBuf,
793    latest_ids: Mutex<HashMap<String, EventId>>,
794    write_lock: Mutex<()>,
795    broadcasts: BroadcastMap,
796    queue_depth: usize,
797}
798
799impl FileEventLog {
800    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
801        std::fs::create_dir_all(root.join("topics"))
802            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
803        std::fs::create_dir_all(root.join("consumers"))
804            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
805        Ok(Self {
806            root,
807            latest_ids: Mutex::new(HashMap::new()),
808            write_lock: Mutex::new(()),
809            broadcasts: BroadcastMap::default(),
810            queue_depth: queue_depth.max(1),
811        })
812    }
813
814    fn topic_path(&self, topic: &Topic) -> PathBuf {
815        self.root
816            .join("topics")
817            .join(format!("{}.jsonl", topic.as_str()))
818    }
819
820    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
821        self.root.join("consumers").join(format!(
822            "{}__{}.json",
823            topic.as_str(),
824            sanitize_filename(consumer.as_str())
825        ))
826    }
827
828    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
829        if let Some(event_id) = self
830            .latest_ids
831            .lock()
832            .expect("file event log latest ids poisoned")
833            .get(topic.as_str())
834            .copied()
835        {
836            return Ok(event_id);
837        }
838
839        let mut latest = 0;
840        let path = self.topic_path(topic);
841        if path.is_file() {
842            for record in read_file_records(&path)? {
843                latest = record.id;
844            }
845        }
846        self.latest_ids
847            .lock()
848            .expect("file event log latest ids poisoned")
849            .insert(topic.as_str().to_string(), latest);
850        Ok(latest)
851    }
852
853    fn read_range_sync(
854        &self,
855        topic: &Topic,
856        from: Option<EventId>,
857        limit: usize,
858    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
859        let path = self.topic_path(topic);
860        if !path.is_file() {
861            return Ok(Vec::new());
862        }
863        let from = from.unwrap_or(0);
864        let mut events = Vec::new();
865        for record in read_file_records(&path)? {
866            if record.id > from {
867                events.push((record.id, record.event));
868            }
869            if events.len() >= limit {
870                break;
871            }
872        }
873        Ok(events)
874    }
875
876    fn topics(&self) -> Result<Vec<Topic>, LogError> {
877        let topics_dir = self.root.join("topics");
878        if !topics_dir.is_dir() {
879            return Ok(Vec::new());
880        }
881        let mut topics = Vec::new();
882        for entry in std::fs::read_dir(&topics_dir)
883            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
884        {
885            let entry = entry
886                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
887            let path = entry.path();
888            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
889                continue;
890            }
891            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
892                continue;
893            };
894            topics.push(Topic::new(stem.to_string())?);
895        }
896        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
897        Ok(topics)
898    }
899}
900
901fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
902    let file = std::fs::File::open(path)
903        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
904    let mut reader = std::io::BufReader::new(file);
905    let mut records = Vec::new();
906    let mut line = Vec::new();
907    loop {
908        line.clear();
909        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
910            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
911        if bytes_read == 0 {
912            break;
913        }
914        if line.iter().all(u8::is_ascii_whitespace) {
915            continue;
916        }
917        let complete_line = line.ends_with(b"\n");
918        match serde_json::from_slice::<FileRecord>(&line) {
919            Ok(record) => records.push(record),
920            Err(_) if !complete_line => break,
921            Err(error) => {
922                return Err(LogError::Serde(format!("event log parse error: {error}")));
923            }
924        }
925    }
926    Ok(records)
927}
928
929impl EventLog for FileEventLog {
930    fn describe(&self) -> EventLogDescription {
931        EventLogDescription {
932            backend: EventLogBackendKind::File,
933            location: Some(self.root.clone()),
934            size_bytes: Some(dir_size_bytes(&self.root)),
935            queue_depth: self.queue_depth,
936        }
937    }
938
939    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
940        let _guard = self
941            .write_lock
942            .lock()
943            .expect("file event log write lock poisoned");
944        let next_id = self.latest_id_for_topic(topic)? + 1;
945        let previous_hash = self
946            .read_range_sync(topic, None, usize::MAX)?
947            .last()
948            .map(|(previous_id, previous_event)| {
949                crate::provenance::event_record_hash_from_headers(
950                    topic.as_str(),
951                    *previous_id,
952                    previous_event,
953                )
954            })
955            .transpose()?;
956        let event = crate::provenance::prepare_event_for_append(
957            topic.as_str(),
958            next_id,
959            previous_hash,
960            event,
961        )?;
962        let record = FileRecord {
963            id: next_id,
964            event: event.clone(),
965        };
966        let path = self.topic_path(topic);
967        if let Some(parent) = path.parent() {
968            std::fs::create_dir_all(parent)
969                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
970        }
971        let line = serde_json::to_string(&record)
972            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
973        use std::io::Write as _;
974        let mut file = std::fs::OpenOptions::new()
975            .create(true)
976            .append(true)
977            .open(&path)
978            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
979        writeln!(file, "{line}")
980            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
981        self.latest_ids
982            .lock()
983            .expect("file event log latest ids poisoned")
984            .insert(topic.as_str().to_string(), next_id);
985        self.broadcasts
986            .publish(topic, self.queue_depth, (next_id, event));
987        Ok(next_id)
988    }
989
990    async fn flush(&self) -> Result<(), LogError> {
991        sync_tree(&self.root)
992    }
993
994    async fn read_range(
995        &self,
996        topic: &Topic,
997        from: Option<EventId>,
998        limit: usize,
999    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1000        self.read_range_sync(topic, from, limit)
1001    }
1002
1003    async fn read_range_bytes(
1004        &self,
1005        topic: &Topic,
1006        from: Option<EventId>,
1007        limit: usize,
1008    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1009        self.read_range_sync(topic, from, limit)?
1010            .into_iter()
1011            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1012            .collect()
1013    }
1014
1015    async fn subscribe(
1016        self: Arc<Self>,
1017        topic: &Topic,
1018        from: Option<EventId>,
1019    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1020        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1021        let history = self.read_range_sync(topic, from, usize::MAX)?;
1022        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1023    }
1024
1025    async fn ack(
1026        &self,
1027        topic: &Topic,
1028        consumer: &ConsumerId,
1029        up_to: EventId,
1030    ) -> Result<(), LogError> {
1031        let path = self.consumer_path(topic, consumer);
1032        let payload = serde_json::json!({
1033            "topic": topic.as_str(),
1034            "consumer_id": consumer.as_str(),
1035            "cursor": up_to,
1036            "updated_at_ms": now_ms(),
1037        });
1038        write_json_atomically(&path, &payload)
1039    }
1040
1041    async fn consumer_cursor(
1042        &self,
1043        topic: &Topic,
1044        consumer: &ConsumerId,
1045    ) -> Result<Option<EventId>, LogError> {
1046        let path = self.consumer_path(topic, consumer);
1047        if !path.is_file() {
1048            return Ok(None);
1049        }
1050        let raw = std::fs::read_to_string(&path)
1051            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1052        let payload: serde_json::Value = serde_json::from_str(&raw)
1053            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1054        let cursor = payload
1055            .get("cursor")
1056            .and_then(serde_json::Value::as_u64)
1057            .ok_or_else(|| {
1058                LogError::Serde("event log consumer record missing numeric cursor".to_string())
1059            })?;
1060        Ok(Some(cursor))
1061    }
1062
1063    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1064        let latest = self.latest_id_for_topic(topic)?;
1065        if latest == 0 {
1066            Ok(None)
1067        } else {
1068            Ok(Some(latest))
1069        }
1070    }
1071
1072    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1073        let _guard = self
1074            .write_lock
1075            .lock()
1076            .expect("file event log write lock poisoned");
1077        let path = self.topic_path(topic);
1078        if !path.is_file() {
1079            return Ok(CompactReport::default());
1080        }
1081        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1082        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1083        if retained.is_empty() {
1084            let _ = std::fs::remove_file(&path);
1085        } else {
1086            crate::atomic_io::atomic_write_with(&path, |writer| {
1087                use std::io::Write as _;
1088                for (event_id, event) in &retained {
1089                    let line = serde_json::to_string(&FileRecord {
1090                        id: *event_id,
1091                        event: event.clone(),
1092                    })
1093                    .map_err(|error| std::io::Error::other(error.to_string()))?;
1094                    writeln!(writer, "{line}")?;
1095                }
1096                Ok(())
1097            })
1098            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1099        }
1100        let latest = retained.last().map(|(event_id, _)| *event_id);
1101        self.latest_ids
1102            .lock()
1103            .expect("file event log latest ids poisoned")
1104            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1105        Ok(CompactReport {
1106            removed,
1107            remaining: retained.len(),
1108            latest,
1109            checkpointed: false,
1110        })
1111    }
1112}
1113
1114pub struct SqliteEventLog {
1115    path: PathBuf,
1116    connection: Mutex<Connection>,
1117    broadcasts: BroadcastMap,
1118    queue_depth: usize,
1119}
1120
1121impl SqliteEventLog {
1122    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1123        if let Some(parent) = path.parent() {
1124            std::fs::create_dir_all(parent)
1125                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1126        }
1127        let connection = Connection::open(&path)
1128            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1129        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1130        // SQLITE_BUSY from a previous test's connection that hasn't finished
1131        // dropping yet (parallel `cargo test` on the same process, distinct
1132        // paths, still contends on SQLite's own global mutex under WAL-mode
1133        // promotion). Without this, `journal_mode = WAL` fails fast with
1134        // "database is locked" instead of retrying.
1135        connection
1136            .busy_timeout(std::time::Duration::from_secs(5))
1137            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1138        connection
1139            .pragma_update(None, "journal_mode", "WAL")
1140            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1141        connection
1142            .pragma_update(None, "synchronous", "NORMAL")
1143            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1144        connection
1145            .execute_batch(
1146                "CREATE TABLE IF NOT EXISTS topic_heads (
1147                    topic TEXT PRIMARY KEY,
1148                    last_id INTEGER NOT NULL
1149                );
1150                CREATE TABLE IF NOT EXISTS events (
1151                    topic TEXT NOT NULL,
1152                    event_id INTEGER NOT NULL,
1153                    kind TEXT NOT NULL,
1154                    payload BLOB NOT NULL,
1155                    headers TEXT NOT NULL,
1156                    occurred_at_ms INTEGER NOT NULL,
1157                    PRIMARY KEY (topic, event_id)
1158                );
1159                CREATE TABLE IF NOT EXISTS consumers (
1160                    topic TEXT NOT NULL,
1161                    consumer_id TEXT NOT NULL,
1162                    cursor INTEGER NOT NULL,
1163                    updated_at_ms INTEGER NOT NULL,
1164                    PRIMARY KEY (topic, consumer_id)
1165                );",
1166            )
1167            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1168        Ok(Self {
1169            path,
1170            connection: Mutex::new(connection),
1171            broadcasts: BroadcastMap::default(),
1172            queue_depth: queue_depth.max(1),
1173        })
1174    }
1175
1176    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1177        let connection = self
1178            .connection
1179            .lock()
1180            .expect("sqlite event log connection poisoned");
1181        let mut statement = connection
1182            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1183            .map_err(|error| {
1184                LogError::Sqlite(format!("event log topics prepare error: {error}"))
1185            })?;
1186        let rows = statement
1187            .query_map([], |row| row.get::<_, String>(0))
1188            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1189        let mut topics = Vec::new();
1190        for row in rows {
1191            topics.push(Topic::new(row.map_err(|error| {
1192                LogError::Sqlite(format!("event log topic row error: {error}"))
1193            })?)?);
1194        }
1195        Ok(topics)
1196    }
1197}
1198
1199impl EventLog for SqliteEventLog {
1200    fn describe(&self) -> EventLogDescription {
1201        EventLogDescription {
1202            backend: EventLogBackendKind::Sqlite,
1203            location: Some(self.path.clone()),
1204            size_bytes: Some(sqlite_size_bytes(&self.path)),
1205            queue_depth: self.queue_depth,
1206        }
1207    }
1208
1209    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1210        let mut connection = self
1211            .connection
1212            .lock()
1213            .expect("sqlite event log connection poisoned");
1214        let tx = connection
1215            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1216            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1217        tx.execute(
1218            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1219            params![topic.as_str()],
1220        )
1221        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1222        tx.execute(
1223            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1224            params![topic.as_str()],
1225        )
1226        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1227        let event_id = tx
1228            .query_row(
1229                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1230                params![topic.as_str()],
1231                |row| row.get::<_, i64>(0),
1232            )
1233            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1234            .and_then(sqlite_i64_to_event_id)?;
1235        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1236        let previous = tx
1237            .query_row(
1238                "SELECT event_id, kind, payload, headers, occurred_at_ms
1239                 FROM events
1240                 WHERE topic = ?1 AND event_id < ?2
1241                 ORDER BY event_id DESC
1242                 LIMIT 1",
1243                params![topic.as_str(), event_id_sql],
1244                |row| {
1245                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1246                    let headers: String = row.get(3)?;
1247                    Ok((
1248                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1249                        LogEvent {
1250                            kind: row.get(1)?,
1251                            payload: serde_json::from_slice(&payload).map_err(|error| {
1252                                rusqlite::Error::FromSqlConversionFailure(
1253                                    payload.len(),
1254                                    rusqlite::types::Type::Blob,
1255                                    Box::new(error),
1256                                )
1257                            })?,
1258                            headers: serde_json::from_str(&headers).map_err(|error| {
1259                                rusqlite::Error::FromSqlConversionFailure(
1260                                    headers.len(),
1261                                    rusqlite::types::Type::Text,
1262                                    Box::new(error),
1263                                )
1264                            })?,
1265                            occurred_at_ms: row.get(4)?,
1266                        },
1267                    ))
1268                },
1269            )
1270            .optional()
1271            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1272        let previous_hash = previous
1273            .as_ref()
1274            .map(|(previous_id, previous_event)| {
1275                crate::provenance::event_record_hash_from_headers(
1276                    topic.as_str(),
1277                    *previous_id,
1278                    previous_event,
1279                )
1280            })
1281            .transpose()?;
1282        let event = crate::provenance::prepare_event_for_append(
1283            topic.as_str(),
1284            event_id,
1285            previous_hash,
1286            event,
1287        )?;
1288        tx.execute(
1289            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1290             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1291            params![
1292                topic.as_str(),
1293                event_id_sql,
1294                event.kind,
1295                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1296                    "event log payload encode error: {error}"
1297                )))?,
1298                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1299                    "event log headers encode error: {error}"
1300                )))?,
1301                event.occurred_at_ms
1302            ],
1303        )
1304        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1305        tx.commit()
1306            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1307        self.broadcasts
1308            .publish(topic, self.queue_depth, (event_id, event.clone()));
1309        Ok(event_id)
1310    }
1311
1312    async fn flush(&self) -> Result<(), LogError> {
1313        let connection = self
1314            .connection
1315            .lock()
1316            .expect("sqlite event log connection poisoned");
1317        connection
1318            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1319            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1320        Ok(())
1321    }
1322
1323    async fn read_range(
1324        &self,
1325        topic: &Topic,
1326        from: Option<EventId>,
1327        limit: usize,
1328    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1329        let connection = self
1330            .connection
1331            .lock()
1332            .expect("sqlite event log connection poisoned");
1333        let mut statement = connection
1334            .prepare(
1335                "SELECT event_id, kind, payload, headers, occurred_at_ms
1336                 FROM events
1337                 WHERE topic = ?1 AND event_id > ?2
1338                 ORDER BY event_id ASC
1339                 LIMIT ?3",
1340            )
1341            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1342        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1343        let rows = statement
1344            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1345                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1346                let headers: String = row.get(3)?;
1347                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1348                Ok((
1349                    event_id,
1350                    LogEvent {
1351                        kind: row.get(1)?,
1352                        payload: serde_json::from_slice(&payload).map_err(|error| {
1353                            rusqlite::Error::FromSqlConversionFailure(
1354                                payload.len(),
1355                                rusqlite::types::Type::Blob,
1356                                Box::new(error),
1357                            )
1358                        })?,
1359                        headers: serde_json::from_str(&headers).map_err(|error| {
1360                            rusqlite::Error::FromSqlConversionFailure(
1361                                headers.len(),
1362                                rusqlite::types::Type::Text,
1363                                Box::new(error),
1364                            )
1365                        })?,
1366                        occurred_at_ms: row.get(4)?,
1367                    },
1368                ))
1369            })
1370            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1371        let mut events = Vec::new();
1372        for row in rows {
1373            events.push(
1374                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1375            );
1376        }
1377        Ok(events)
1378    }
1379
1380    async fn read_range_bytes(
1381        &self,
1382        topic: &Topic,
1383        from: Option<EventId>,
1384        limit: usize,
1385    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1386        let connection = self
1387            .connection
1388            .lock()
1389            .expect("sqlite event log connection poisoned");
1390        let mut statement = connection
1391            .prepare(
1392                "SELECT event_id, kind, payload, headers, occurred_at_ms
1393                 FROM events
1394                 WHERE topic = ?1 AND event_id > ?2
1395                 ORDER BY event_id ASC
1396                 LIMIT ?3",
1397            )
1398            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1399        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1400        let rows = statement
1401            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1402                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1403                let headers: String = row.get(3)?;
1404                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1405                Ok((
1406                    event_id,
1407                    LogEventBytes {
1408                        kind: row.get(1)?,
1409                        payload: Bytes::from(payload),
1410                        headers: serde_json::from_str(&headers).map_err(|error| {
1411                            rusqlite::Error::FromSqlConversionFailure(
1412                                headers.len(),
1413                                rusqlite::types::Type::Text,
1414                                Box::new(error),
1415                            )
1416                        })?,
1417                        occurred_at_ms: row.get(4)?,
1418                    },
1419                ))
1420            })
1421            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1422        let mut events = Vec::new();
1423        for row in rows {
1424            events.push(
1425                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1426            );
1427        }
1428        Ok(events)
1429    }
1430
1431    async fn subscribe(
1432        self: Arc<Self>,
1433        topic: &Topic,
1434        from: Option<EventId>,
1435    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1436        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1437        let history = self.read_range(topic, from, usize::MAX).await?;
1438        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1439    }
1440
1441    async fn ack(
1442        &self,
1443        topic: &Topic,
1444        consumer: &ConsumerId,
1445        up_to: EventId,
1446    ) -> Result<(), LogError> {
1447        let connection = self
1448            .connection
1449            .lock()
1450            .expect("sqlite event log connection poisoned");
1451        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1452        connection
1453            .execute(
1454                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1455                 VALUES (?1, ?2, ?3, ?4)
1456                 ON CONFLICT(topic, consumer_id)
1457                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1458                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1459            )
1460            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1461        Ok(())
1462    }
1463
1464    async fn consumer_cursor(
1465        &self,
1466        topic: &Topic,
1467        consumer: &ConsumerId,
1468    ) -> Result<Option<EventId>, LogError> {
1469        let connection = self
1470            .connection
1471            .lock()
1472            .expect("sqlite event log connection poisoned");
1473        connection
1474            .query_row(
1475                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1476                params![topic.as_str(), consumer.as_str()],
1477                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1478            )
1479            .optional()
1480            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1481    }
1482
1483    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1484        let connection = self
1485            .connection
1486            .lock()
1487            .expect("sqlite event log connection poisoned");
1488        connection
1489            .query_row(
1490                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1491                params![topic.as_str()],
1492                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1493            )
1494            .optional()
1495            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1496    }
1497
1498    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1499        let connection = self
1500            .connection
1501            .lock()
1502            .expect("sqlite event log connection poisoned");
1503        let before_sql = event_id_to_sqlite_i64(before)?;
1504        let removed = connection
1505            .execute(
1506                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1507                params![topic.as_str(), before_sql],
1508            )
1509            .map_err(|error| {
1510                LogError::Sqlite(format!("event log compact delete error: {error}"))
1511            })?;
1512        let remaining = connection
1513            .query_row(
1514                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1515                params![topic.as_str()],
1516                |row| row.get::<_, i64>(0),
1517            )
1518            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1519            .and_then(sqlite_i64_to_usize)?;
1520        let latest = connection
1521            .query_row(
1522                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1523                params![topic.as_str()],
1524                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1525            )
1526            .optional()
1527            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1528        connection
1529            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1530            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1531        Ok(CompactReport {
1532            removed,
1533            remaining,
1534            latest,
1535            checkpointed: true,
1536        })
1537    }
1538}
1539
1540fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1541    let candidate = PathBuf::from(value);
1542    if candidate.is_absolute() {
1543        candidate
1544    } else {
1545        base_dir.join(candidate)
1546    }
1547}
1548
1549fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1550    let encoded = serde_json::to_vec_pretty(payload)
1551        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1552    crate::atomic_io::atomic_write(path, &encoded)
1553        .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1554}
1555
1556fn sanitize_filename(value: &str) -> String {
1557    sanitize_topic_component(value)
1558}
1559
1560pub fn sanitize_topic_component(value: &str) -> String {
1561    value
1562        .chars()
1563        .map(|ch| {
1564            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1565                ch
1566            } else {
1567                '_'
1568            }
1569        })
1570        .collect()
1571}
1572
1573fn dir_size_bytes(path: &Path) -> u64 {
1574    if !path.exists() {
1575        return 0;
1576    }
1577    let mut total = 0;
1578    if let Ok(entries) = std::fs::read_dir(path) {
1579        for entry in entries.flatten() {
1580            let path = entry.path();
1581            if path.is_dir() {
1582                total += dir_size_bytes(&path);
1583            } else if let Ok(metadata) = entry.metadata() {
1584                total += metadata.len();
1585            }
1586        }
1587    }
1588    total
1589}
1590
1591fn sqlite_size_bytes(path: &Path) -> u64 {
1592    let mut total = file_size(path);
1593    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1594    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1595    total
1596}
1597
1598fn file_size(path: &Path) -> u64 {
1599    std::fs::metadata(path)
1600        .map(|metadata| metadata.len())
1601        .unwrap_or(0)
1602}
1603
1604fn sync_tree(root: &Path) -> Result<(), LogError> {
1605    if !root.exists() {
1606        return Ok(());
1607    }
1608    for entry in std::fs::read_dir(root)
1609        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1610    {
1611        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1612        let path = entry.path();
1613        if path.is_dir() {
1614            sync_tree(&path)?;
1615            continue;
1616        }
1617        std::fs::File::open(&path)
1618            .and_then(|file| file.sync_all())
1619            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1620    }
1621    Ok(())
1622}
1623
1624fn now_ms() -> i64 {
1625    std::time::SystemTime::now()
1626        .duration_since(std::time::UNIX_EPOCH)
1627        .map(|duration| duration.as_millis() as i64)
1628        .unwrap_or(0)
1629}
1630
1631fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1632    i64::try_from(event_id)
1633        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1634}
1635
1636fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1637    u64::try_from(value)
1638        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1639}
1640
1641fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1642    u64::try_from(value).map_err(|_| {
1643        rusqlite::Error::FromSqlConversionFailure(
1644            std::mem::size_of::<i64>(),
1645            rusqlite::types::Type::Integer,
1646            "sqlite event id is negative".into(),
1647        )
1648    })
1649}
1650
1651fn sqlite_json_bytes_for_row(
1652    row: &rusqlite::Row<'_>,
1653    index: usize,
1654    name: &str,
1655) -> rusqlite::Result<Vec<u8>> {
1656    let value = row.get_ref(index)?;
1657    match value {
1658        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1659            Ok(bytes.to_vec())
1660        }
1661        other => Err(rusqlite::Error::InvalidColumnType(
1662            index,
1663            name.to_string(),
1664            other.data_type(),
1665        )),
1666    }
1667}
1668
1669fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1670    usize::try_from(value)
1671        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676    use super::*;
1677    use futures::StreamExt;
1678    use rand::{rngs::StdRng, RngExt, SeedableRng};
1679
1680    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1681        let topic = Topic::new("trigger.inbox").unwrap();
1682        for i in 0..10_000 {
1683            log.append(
1684                &topic,
1685                LogEvent::new("append", serde_json::json!({ "i": i })),
1686            )
1687            .await
1688            .unwrap();
1689        }
1690        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1691        assert_eq!(events.len(), 10_000);
1692        assert_eq!(events.first().unwrap().0, 1);
1693        assert_eq!(events.last().unwrap().0, 10_000);
1694    }
1695
1696    #[tokio::test(flavor = "current_thread")]
1697    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1698        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1699        exercise_basic_backend(log.clone()).await;
1700
1701        let topic = Topic::new("agent.transcript.demo").unwrap();
1702        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1703        let first = log
1704            .append(
1705                &topic,
1706                LogEvent::new("message", serde_json::json!({"text":"one"})),
1707            )
1708            .await
1709            .unwrap();
1710        let second = log
1711            .append(
1712                &topic,
1713                LogEvent::new("message", serde_json::json!({"text":"two"})),
1714            )
1715            .await
1716            .unwrap();
1717        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1718        assert_eq!(seen[0].as_ref().unwrap().0, first);
1719        assert_eq!(seen[1].as_ref().unwrap().0, second);
1720
1721        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1722            .await
1723            .unwrap();
1724        let compact = log.compact(&topic, first).await.unwrap();
1725        assert_eq!(compact.removed, 1);
1726        assert_eq!(compact.remaining, 1);
1727    }
1728
1729    #[tokio::test(flavor = "current_thread")]
1730    async fn file_backend_persists_across_reopen_and_compacts() {
1731        let dir = tempfile::tempdir().unwrap();
1732        let topic = Topic::new("trigger.outbox").unwrap();
1733        let first_log = Arc::new(AnyEventLog::File(
1734            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1735        ));
1736        first_log
1737            .append(
1738                &topic,
1739                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1740            )
1741            .await
1742            .unwrap();
1743        first_log
1744            .append(
1745                &topic,
1746                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1747            )
1748            .await
1749            .unwrap();
1750        drop(first_log);
1751
1752        let reopened = Arc::new(AnyEventLog::File(
1753            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1754        ));
1755        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1756        assert_eq!(events.len(), 2);
1757        let compact = reopened.compact(&topic, 1).await.unwrap();
1758        assert_eq!(compact.removed, 1);
1759        assert_eq!(
1760            reopened
1761                .read_range(&topic, None, usize::MAX)
1762                .await
1763                .unwrap()
1764                .len(),
1765            1
1766        );
1767    }
1768
1769    #[tokio::test(flavor = "current_thread")]
1770    async fn file_backend_skips_torn_tail_on_restart() {
1771        let dir = tempfile::tempdir().unwrap();
1772        let topic = Topic::new("trigger.inbox").unwrap();
1773        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1774        first_log
1775            .append(
1776                &topic,
1777                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1778            )
1779            .await
1780            .unwrap();
1781        drop(first_log);
1782
1783        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1784        use std::io::Write as _;
1785        let mut file = std::fs::OpenOptions::new()
1786            .append(true)
1787            .open(&topic_path)
1788            .unwrap();
1789        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1790        drop(file);
1791
1792        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1793        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1794        assert_eq!(events.len(), 1);
1795        assert_eq!(events[0].0, 1);
1796        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1797    }
1798
1799    #[tokio::test(flavor = "current_thread")]
1800    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1801        let dir = tempfile::tempdir().unwrap();
1802        let path = dir.path().join("events.sqlite");
1803        let topic = Topic::new("daemon.demo.state").unwrap();
1804        let first_log = Arc::new(AnyEventLog::Sqlite(
1805            SqliteEventLog::open(path.clone(), 8).unwrap(),
1806        ));
1807        first_log
1808            .append(
1809                &topic,
1810                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1811            )
1812            .await
1813            .unwrap();
1814        first_log
1815            .append(
1816                &topic,
1817                LogEvent::new("state", serde_json::json!({"state":"active"})),
1818            )
1819            .await
1820            .unwrap();
1821        drop(first_log);
1822
1823        let reopened = Arc::new(AnyEventLog::Sqlite(
1824            SqliteEventLog::open(path.clone(), 8).unwrap(),
1825        ));
1826        assert_eq!(
1827            reopened
1828                .read_range(&topic, None, usize::MAX)
1829                .await
1830                .unwrap()
1831                .len(),
1832            2
1833        );
1834        let compact = reopened.compact(&topic, 1).await.unwrap();
1835        assert!(compact.checkpointed);
1836        let wal = PathBuf::from(format!("{}-wal", path.display()));
1837        assert!(file_size(&wal) == 0 || !wal.exists());
1838    }
1839
1840    #[tokio::test(flavor = "current_thread")]
1841    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1842        let dir = tempfile::tempdir().unwrap();
1843        let path = dir.path().join("events.sqlite");
1844        let topic = Topic::new("observability.action_graph").unwrap();
1845        let log = SqliteEventLog::open(path, 8).unwrap();
1846        let event_id = log
1847            .append(
1848                &topic,
1849                LogEvent::new(
1850                    "snapshot",
1851                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1852                ),
1853            )
1854            .await
1855            .unwrap();
1856
1857        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1858        assert_eq!(events.len(), 1);
1859        assert_eq!(events[0].0, event_id);
1860        assert_eq!(
1861            events[0].1.payload_json().unwrap(),
1862            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1863        );
1864    }
1865
1866    #[tokio::test(flavor = "current_thread")]
1867    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1868        let dir = tempfile::tempdir().unwrap();
1869        let path = dir.path().join("events.sqlite");
1870        let topic = Topic::new("agent.transcript.legacy").unwrap();
1871        let log = SqliteEventLog::open(path, 8).unwrap();
1872        {
1873            let connection = log.connection.lock().unwrap();
1874            connection
1875                .execute(
1876                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1877                    params![topic.as_str()],
1878                )
1879                .unwrap();
1880            connection
1881                .execute(
1882                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1883                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1884                    params![topic.as_str(), "{\"text\":\"old\"}"],
1885                )
1886                .unwrap();
1887        }
1888
1889        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1890        assert_eq!(
1891            events[0].1.payload_json().unwrap(),
1892            serde_json::json!({"text": "old"})
1893        );
1894        assert_eq!(
1895            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1896            "legacy"
1897        );
1898    }
1899
1900    #[tokio::test(flavor = "current_thread")]
1901    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1902        let (sender, rx) = broadcast::channel(2);
1903        for i in 0..10 {
1904            sender
1905                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1906                .unwrap();
1907        }
1908        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1909
1910        match stream.next().await {
1911            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1912            other => panic!("subscriber should surface lag, got {other:?}"),
1913        }
1914    }
1915
1916    #[tokio::test(flavor = "current_thread")]
1917    async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1918        let (sender, rx) = broadcast::channel(2);
1919        let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1920        assert_eq!(sender.receiver_count(), 1);
1921        drop(stream);
1922
1923        tokio::time::timeout(std::time::Duration::from_millis(100), async {
1924            while sender.receiver_count() != 0 {
1925                tokio::task::yield_now().await;
1926            }
1927        })
1928        .await
1929        .expect("subscription receiver should close after consumer drop");
1930    }
1931
1932    #[tokio::test(flavor = "current_thread")]
1933    async fn randomized_reader_sequences_stay_monotonic() {
1934        let log = Arc::new(MemoryEventLog::new(32));
1935        let topic = Topic::new("fuzz.demo").unwrap();
1936        let mut readers = vec![
1937            log.clone().subscribe(&topic, None).await.unwrap(),
1938            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1939            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1940        ];
1941        let mut rng = StdRng::seed_from_u64(7);
1942        for _ in 0..64 {
1943            let value = rng.random_range(0..1000);
1944            log.append(
1945                &topic,
1946                LogEvent::new("rand", serde_json::json!({"value": value})),
1947            )
1948            .await
1949            .unwrap();
1950        }
1951
1952        let mut sequences = Vec::new();
1953        for reader in &mut readers {
1954            let mut ids = Vec::new();
1955            while let Some(item) = reader.next().await {
1956                match item {
1957                    Ok((event_id, _)) => {
1958                        ids.push(event_id);
1959                        if ids.len() >= 16 {
1960                            break;
1961                        }
1962                    }
1963                    Err(LogError::ConsumerLagged(_)) => break,
1964                    Err(error) => panic!("unexpected subscription error: {error}"),
1965                }
1966            }
1967            sequences.push(ids);
1968        }
1969
1970        for ids in sequences {
1971            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1972        }
1973    }
1974}