Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27        let value = value.into();
28        if value.is_empty() {
29            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30        }
31        if !value
32            .chars()
33            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34        {
35            return Err(LogError::InvalidTopic(format!(
36                "topic '{value}' contains unsupported characters"
37            )));
38        }
39        Ok(Self(value))
40    }
41
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl fmt::Display for Topic {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        self.0.fmt(f)
50    }
51}
52
53impl FromStr for Topic {
54    type Err = LogError;
55
56    fn from_str(s: &str) -> Result<Self, Self::Err> {
57        Self::new(s)
58    }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66        let value = value.into();
67        if value.trim().is_empty() {
68            return Err(LogError::InvalidConsumer(
69                "consumer id cannot be empty".to_string(),
70            ));
71        }
72        Ok(Self(value))
73    }
74
75    pub fn as_str(&self) -> &str {
76        &self.0
77    }
78}
79
80impl fmt::Display for ConsumerId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        self.0.fmt(f)
83    }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89    Memory,
90    File,
91    Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Memory => write!(f, "memory"),
98            Self::File => write!(f, "file"),
99            Self::Sqlite => write!(f, "sqlite"),
100        }
101    }
102}
103
104impl FromStr for EventLogBackendKind {
105    type Err = LogError;
106
107    fn from_str(value: &str) -> Result<Self, Self::Err> {
108        match value.trim().to_ascii_lowercase().as_str() {
109            "memory" => Ok(Self::Memory),
110            "file" => Ok(Self::File),
111            "sqlite" => Ok(Self::Sqlite),
112            other => Err(LogError::Config(format!(
113                "unsupported event log backend '{other}'"
114            ))),
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121    pub kind: String,
122    pub payload: serde_json::Value,
123    #[serde(default)]
124    pub headers: BTreeMap<String, String>,
125    pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130        Self {
131            kind: kind.into(),
132            payload,
133            headers: BTreeMap::new(),
134            occurred_at_ms: now_ms(),
135        }
136    }
137
138    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139        self.headers = headers;
140        self
141    }
142}
143
144/// Serialized event payload form for large read paths.
145///
146/// `payload` contains the original JSON bytes for backends that can expose
147/// them directly. Callers that only need to forward or hash the payload can
148/// avoid materializing a `serde_json::Value`; callers that need structured
149/// access can opt in with `payload_json`.
150#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152    pub kind: String,
153    pub payload: Bytes,
154    pub headers: BTreeMap<String, String>,
155    pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160        serde_json::from_slice(&self.payload)
161            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162    }
163
164    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165        Ok(LogEvent {
166            kind: self.kind,
167            payload: serde_json::from_slice(&self.payload).map_err(|error| {
168                LogError::Serde(format!("event log payload parse error: {error}"))
169            })?,
170            headers: self.headers,
171            occurred_at_ms: self.occurred_at_ms,
172        })
173    }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177    type Error = LogError;
178
179    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180        let payload = serde_json::to_vec(&event.payload)
181            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182        Ok(Self {
183            kind: event.kind,
184            payload: Bytes::from(payload),
185            headers: event.headers,
186            occurred_at_ms: event.occurred_at_ms,
187        })
188    }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193    pub removed: usize,
194    pub remaining: usize,
195    pub latest: Option<EventId>,
196    pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201    pub backend: EventLogBackendKind,
202    pub location: Option<PathBuf>,
203    pub size_bytes: Option<u64>,
204    pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209    Config(String),
210    InvalidTopic(String),
211    InvalidConsumer(String),
212    Io(String),
213    Serde(String),
214    Sqlite(String),
215    ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        match self {
221            Self::Config(message)
222            | Self::InvalidTopic(message)
223            | Self::InvalidConsumer(message)
224            | Self::Io(message)
225            | Self::Serde(message)
226            | Self::Sqlite(message) => message.fmt(f),
227            Self::ConsumerLagged(last_id) => {
228                write!(f, "subscriber lagged behind after event {last_id}")
229            }
230        }
231    }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238    fn describe(&self) -> EventLogDescription;
239
240    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242    async fn flush(&self) -> Result<(), LogError>;
243
244    /// Read events strictly after `from`. `None` starts from the
245    /// beginning of the topic.
246    async fn read_range(
247        &self,
248        topic: &Topic,
249        from: Option<EventId>,
250        limit: usize,
251    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253    async fn read_range_bytes(
254        &self,
255        topic: &Topic,
256        from: Option<EventId>,
257        limit: usize,
258    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259        let events = self.read_range(topic, from, limit).await?;
260        events
261            .into_iter()
262            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263            .collect()
264    }
265
266    /// `async fn` keeps the ergonomic generic surface; the boxed stream
267    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
268    async fn subscribe(
269        self: Arc<Self>,
270        topic: &Topic,
271        from: Option<EventId>,
272    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274    async fn ack(
275        &self,
276        topic: &Topic,
277        consumer: &ConsumerId,
278        up_to: EventId,
279    ) -> Result<(), LogError>;
280
281    async fn consumer_cursor(
282        &self,
283        topic: &Topic,
284        consumer: &ConsumerId,
285    ) -> Result<Option<EventId>, LogError>;
286
287    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294    pub backend: EventLogBackendKind,
295    pub file_dir: PathBuf,
296    pub sqlite_path: PathBuf,
297    pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303            .ok()
304            .map(|value| value.parse())
305            .transpose()?
306            .unwrap_or(EventLogBackendKind::Sqlite);
307        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308            .ok()
309            .and_then(|value| value.parse::<usize>().ok())
310            .unwrap_or(128)
311            .max(1);
312
313        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315            _ => crate::runtime_paths::event_log_dir(base_dir),
316        };
317        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320        };
321
322        Ok(Self {
323            backend,
324            file_dir,
325            sqlite_path,
326            queue_depth,
327        })
328    }
329
330    pub fn location(&self) -> Option<PathBuf> {
331        match self.backend {
332            EventLogBackendKind::Memory => None,
333            EventLogBackendKind::File => Some(self.file_dir.clone()),
334            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335        }
336    }
337}
338
339thread_local! {
340    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344    let config = EventLogConfig::for_base_dir(base_dir)?;
345    let log = open_event_log(&config)?;
346    ACTIVE_EVENT_LOG.with(|slot| {
347        *slot.borrow_mut() = Some(log.clone());
348    });
349    Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354    ACTIVE_EVENT_LOG.with(|slot| {
355        *slot.borrow_mut() = Some(log.clone());
356    });
357    log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361    ACTIVE_EVENT_LOG.with(|slot| {
362        *slot.borrow_mut() = Some(log.clone());
363    });
364    log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372    ACTIVE_EVENT_LOG.with(|slot| {
373        *slot.borrow_mut() = None;
374    });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378    let config = EventLogConfig::for_base_dir(base_dir)?;
379    let description = match config.backend {
380        EventLogBackendKind::Memory => EventLogDescription {
381            backend: EventLogBackendKind::Memory,
382            location: None,
383            size_bytes: None,
384            queue_depth: config.queue_depth,
385        },
386        EventLogBackendKind::File => EventLogDescription {
387            backend: EventLogBackendKind::File,
388            size_bytes: Some(dir_size_bytes(&config.file_dir)),
389            location: Some(config.file_dir),
390            queue_depth: config.queue_depth,
391        },
392        EventLogBackendKind::Sqlite => EventLogDescription {
393            backend: EventLogBackendKind::Sqlite,
394            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395            location: Some(config.sqlite_path),
396            queue_depth: config.queue_depth,
397        },
398    };
399    Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403    match config.backend {
404        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405            config.queue_depth,
406        )))),
407        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408            config.file_dir.clone(),
409            config.queue_depth,
410        )?))),
411        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412            config.sqlite_path.clone(),
413            config.queue_depth,
414        )?))),
415    }
416}
417
418pub enum AnyEventLog {
419    Memory(MemoryEventLog),
420    File(FileEventLog),
421    Sqlite(SqliteEventLog),
422}
423
424impl AnyEventLog {
425    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
426        match self {
427            Self::Memory(log) => log.topics().await,
428            Self::File(log) => log.topics(),
429            Self::Sqlite(log) => log.topics(),
430        }
431    }
432}
433
434impl EventLog for AnyEventLog {
435    fn describe(&self) -> EventLogDescription {
436        match self {
437            Self::Memory(log) => log.describe(),
438            Self::File(log) => log.describe(),
439            Self::Sqlite(log) => log.describe(),
440        }
441    }
442
443    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
444        match self {
445            Self::Memory(log) => log.append(topic, event).await,
446            Self::File(log) => log.append(topic, event).await,
447            Self::Sqlite(log) => log.append(topic, event).await,
448        }
449    }
450
451    async fn flush(&self) -> Result<(), LogError> {
452        match self {
453            Self::Memory(log) => log.flush().await,
454            Self::File(log) => log.flush().await,
455            Self::Sqlite(log) => log.flush().await,
456        }
457    }
458
459    async fn read_range(
460        &self,
461        topic: &Topic,
462        from: Option<EventId>,
463        limit: usize,
464    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
465        match self {
466            Self::Memory(log) => log.read_range(topic, from, limit).await,
467            Self::File(log) => log.read_range(topic, from, limit).await,
468            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
469        }
470    }
471
472    async fn read_range_bytes(
473        &self,
474        topic: &Topic,
475        from: Option<EventId>,
476        limit: usize,
477    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
478        match self {
479            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
480            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
481            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
482        }
483    }
484
485    async fn subscribe(
486        self: Arc<Self>,
487        topic: &Topic,
488        from: Option<EventId>,
489    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
490        let (rx, queue_depth) = match self.as_ref() {
491            Self::Memory(log) => (
492                log.broadcasts.subscribe(topic, log.queue_depth),
493                log.queue_depth,
494            ),
495            Self::File(log) => (
496                log.broadcasts.subscribe(topic, log.queue_depth),
497                log.queue_depth,
498            ),
499            Self::Sqlite(log) => (
500                log.broadcasts.subscribe(topic, log.queue_depth),
501                log.queue_depth,
502            ),
503        };
504        let history = self.read_range(topic, from, usize::MAX).await?;
505        Ok(stream_from_broadcast(history, from, rx, queue_depth))
506    }
507
508    async fn ack(
509        &self,
510        topic: &Topic,
511        consumer: &ConsumerId,
512        up_to: EventId,
513    ) -> Result<(), LogError> {
514        match self {
515            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
516            Self::File(log) => log.ack(topic, consumer, up_to).await,
517            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
518        }
519    }
520
521    async fn consumer_cursor(
522        &self,
523        topic: &Topic,
524        consumer: &ConsumerId,
525    ) -> Result<Option<EventId>, LogError> {
526        match self {
527            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
528            Self::File(log) => log.consumer_cursor(topic, consumer).await,
529            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
530        }
531    }
532
533    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
534        match self {
535            Self::Memory(log) => log.latest(topic).await,
536            Self::File(log) => log.latest(topic).await,
537            Self::Sqlite(log) => log.latest(topic).await,
538        }
539    }
540
541    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
542        match self {
543            Self::Memory(log) => log.compact(topic, before).await,
544            Self::File(log) => log.compact(topic, before).await,
545            Self::Sqlite(log) => log.compact(topic, before).await,
546        }
547    }
548}
549
550#[derive(Default)]
551struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
552
553impl BroadcastMap {
554    fn subscribe(
555        &self,
556        topic: &Topic,
557        capacity: usize,
558    ) -> broadcast::Receiver<(EventId, LogEvent)> {
559        self.sender(topic, capacity).subscribe()
560    }
561
562    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
563        let _ = self.sender(topic, capacity).send(record);
564    }
565
566    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
567        let mut map = self.0.lock().expect("event log broadcast map poisoned");
568        map.entry(topic.as_str().to_string())
569            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
570            .clone()
571    }
572}
573
574fn stream_from_broadcast(
575    history: Vec<(EventId, LogEvent)>,
576    from: Option<EventId>,
577    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
578    queue_depth: usize,
579) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
580    let (tx, rx) = mpsc::channel(queue_depth.max(1));
581    // Run the subscription forwarder as a tokio task rather than a detached
582    // OS thread. A dedicated thread running under `futures::executor::block_on`
583    // is invisible to the tokio runtime, so tests that use `start_paused = true`
584    // race against auto-advanced timers while the thread catches up in real
585    // time. Spawning on tokio makes the forwarder participate in runtime
586    // scheduling (including paused-time quiescence) and ties its lifetime to
587    // the runtime's shutdown.
588    tokio::spawn(async move {
589        let mut last_seen = from.unwrap_or(0);
590        for (event_id, event) in history {
591            last_seen = event_id;
592            if tx.send(Ok((event_id, event))).await.is_err() {
593                return;
594            }
595        }
596
597        loop {
598            tokio::select! {
599                _ = tx.closed() => return,
600                received = live_rx.recv() => {
601                    match received {
602                        Ok((event_id, event)) if event_id > last_seen => {
603                            last_seen = event_id;
604                            if tx.send(Ok((event_id, event))).await.is_err() {
605                                return;
606                            }
607                        }
608                        Ok(_) => {}
609                        Err(broadcast::error::RecvError::Closed) => return,
610                        Err(broadcast::error::RecvError::Lagged(_)) => {
611                            let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
612                            return;
613                        }
614                    }
615                }
616            }
617        }
618    });
619    Box::pin(ReceiverStream::new(rx))
620}
621
622#[derive(Default)]
623struct MemoryState {
624    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
625    latest: HashMap<String, EventId>,
626    consumers: HashMap<(String, String), EventId>,
627}
628
629pub struct MemoryEventLog {
630    state: tokio::sync::Mutex<MemoryState>,
631    broadcasts: BroadcastMap,
632    queue_depth: usize,
633}
634
635impl MemoryEventLog {
636    pub fn new(queue_depth: usize) -> Self {
637        Self {
638            state: tokio::sync::Mutex::new(MemoryState::default()),
639            broadcasts: BroadcastMap::default(),
640            queue_depth: queue_depth.max(1),
641        }
642    }
643
644    async fn topics(&self) -> Result<Vec<Topic>, LogError> {
645        let state = self.state.lock().await;
646        let mut topics = state
647            .topics
648            .keys()
649            .map(|topic| Topic::new(topic.clone()))
650            .collect::<Result<Vec<_>, _>>()?;
651        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
652        Ok(topics)
653    }
654}
655
656impl EventLog for MemoryEventLog {
657    fn describe(&self) -> EventLogDescription {
658        EventLogDescription {
659            backend: EventLogBackendKind::Memory,
660            location: None,
661            size_bytes: None,
662            queue_depth: self.queue_depth,
663        }
664    }
665
666    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
667        let mut state = self.state.lock().await;
668        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
669        let previous_hash = state
670            .topics
671            .get(topic.as_str())
672            .and_then(|events| events.back())
673            .map(|(previous_id, previous_event)| {
674                crate::provenance::event_record_hash_from_headers(
675                    topic.as_str(),
676                    *previous_id,
677                    previous_event,
678                )
679            })
680            .transpose()?;
681        let event = crate::provenance::prepare_event_for_append(
682            topic.as_str(),
683            event_id,
684            previous_hash,
685            event,
686        )?;
687        state.latest.insert(topic.as_str().to_string(), event_id);
688        state
689            .topics
690            .entry(topic.as_str().to_string())
691            .or_default()
692            .push_back((event_id, event.clone()));
693        drop(state);
694        self.broadcasts
695            .publish(topic, self.queue_depth, (event_id, event));
696        Ok(event_id)
697    }
698
699    async fn flush(&self) -> Result<(), LogError> {
700        Ok(())
701    }
702
703    async fn read_range(
704        &self,
705        topic: &Topic,
706        from: Option<EventId>,
707        limit: usize,
708    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
709        let from = from.unwrap_or(0);
710        let state = self.state.lock().await;
711        let events = state
712            .topics
713            .get(topic.as_str())
714            .into_iter()
715            .flat_map(|events| events.iter())
716            .filter(|(event_id, _)| *event_id > from)
717            .take(limit)
718            .map(|(event_id, event)| (*event_id, event.clone()))
719            .collect();
720        Ok(events)
721    }
722
723    async fn subscribe(
724        self: Arc<Self>,
725        topic: &Topic,
726        from: Option<EventId>,
727    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
728        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
729        let history = self.read_range(topic, from, usize::MAX).await?;
730        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
731    }
732
733    async fn ack(
734        &self,
735        topic: &Topic,
736        consumer: &ConsumerId,
737        up_to: EventId,
738    ) -> Result<(), LogError> {
739        let mut state = self.state.lock().await;
740        state.consumers.insert(
741            (topic.as_str().to_string(), consumer.as_str().to_string()),
742            up_to,
743        );
744        Ok(())
745    }
746
747    async fn consumer_cursor(
748        &self,
749        topic: &Topic,
750        consumer: &ConsumerId,
751    ) -> Result<Option<EventId>, LogError> {
752        let state = self.state.lock().await;
753        Ok(state
754            .consumers
755            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
756            .copied())
757    }
758
759    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
760        let state = self.state.lock().await;
761        Ok(state.latest.get(topic.as_str()).copied())
762    }
763
764    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
765        let mut state = self.state.lock().await;
766        let Some(events) = state.topics.get_mut(topic.as_str()) else {
767            return Ok(CompactReport::default());
768        };
769        let removed = events
770            .iter()
771            .take_while(|(event_id, _)| *event_id <= before)
772            .count();
773        for _ in 0..removed {
774            events.pop_front();
775        }
776        Ok(CompactReport {
777            removed,
778            remaining: events.len(),
779            latest: state.latest.get(topic.as_str()).copied(),
780            checkpointed: false,
781        })
782    }
783}
784
785#[derive(Serialize, Deserialize)]
786struct FileRecord {
787    id: EventId,
788    event: LogEvent,
789}
790
791pub struct FileEventLog {
792    root: PathBuf,
793    latest_ids: Mutex<HashMap<String, EventId>>,
794    write_lock: Mutex<()>,
795    broadcasts: BroadcastMap,
796    queue_depth: usize,
797}
798
799impl FileEventLog {
800    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
801        std::fs::create_dir_all(root.join("topics"))
802            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
803        std::fs::create_dir_all(root.join("consumers"))
804            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
805        Ok(Self {
806            root,
807            latest_ids: Mutex::new(HashMap::new()),
808            write_lock: Mutex::new(()),
809            broadcasts: BroadcastMap::default(),
810            queue_depth: queue_depth.max(1),
811        })
812    }
813
814    fn topic_path(&self, topic: &Topic) -> PathBuf {
815        self.root
816            .join("topics")
817            .join(format!("{}.jsonl", topic.as_str()))
818    }
819
820    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
821        self.root.join("consumers").join(format!(
822            "{}__{}.json",
823            topic.as_str(),
824            sanitize_filename(consumer.as_str())
825        ))
826    }
827
828    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
829        if let Some(event_id) = self
830            .latest_ids
831            .lock()
832            .expect("file event log latest ids poisoned")
833            .get(topic.as_str())
834            .copied()
835        {
836            return Ok(event_id);
837        }
838
839        let mut latest = 0;
840        let path = self.topic_path(topic);
841        if path.is_file() {
842            for record in read_file_records(&path)? {
843                latest = record.id;
844            }
845        }
846        self.latest_ids
847            .lock()
848            .expect("file event log latest ids poisoned")
849            .insert(topic.as_str().to_string(), latest);
850        Ok(latest)
851    }
852
853    fn read_range_sync(
854        &self,
855        topic: &Topic,
856        from: Option<EventId>,
857        limit: usize,
858    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
859        let path = self.topic_path(topic);
860        if !path.is_file() {
861            return Ok(Vec::new());
862        }
863        let from = from.unwrap_or(0);
864        let mut events = Vec::new();
865        for record in read_file_records(&path)? {
866            if record.id > from {
867                events.push((record.id, record.event));
868            }
869            if events.len() >= limit {
870                break;
871            }
872        }
873        Ok(events)
874    }
875
876    fn topics(&self) -> Result<Vec<Topic>, LogError> {
877        let topics_dir = self.root.join("topics");
878        if !topics_dir.is_dir() {
879            return Ok(Vec::new());
880        }
881        let mut topics = Vec::new();
882        for entry in std::fs::read_dir(&topics_dir)
883            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
884        {
885            let entry = entry
886                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
887            let path = entry.path();
888            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
889                continue;
890            }
891            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
892                continue;
893            };
894            topics.push(Topic::new(stem.to_string())?);
895        }
896        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
897        Ok(topics)
898    }
899}
900
901fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
902    let file = std::fs::File::open(path)
903        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
904    let mut reader = std::io::BufReader::new(file);
905    let mut records = Vec::new();
906    let mut line = Vec::new();
907    loop {
908        line.clear();
909        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
910            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
911        if bytes_read == 0 {
912            break;
913        }
914        if line.iter().all(u8::is_ascii_whitespace) {
915            continue;
916        }
917        let complete_line = line.ends_with(b"\n");
918        match serde_json::from_slice::<FileRecord>(&line) {
919            Ok(record) => records.push(record),
920            Err(_) if !complete_line => break,
921            Err(error) => {
922                return Err(LogError::Serde(format!("event log parse error: {error}")));
923            }
924        }
925    }
926    Ok(records)
927}
928
929impl EventLog for FileEventLog {
930    fn describe(&self) -> EventLogDescription {
931        EventLogDescription {
932            backend: EventLogBackendKind::File,
933            location: Some(self.root.clone()),
934            size_bytes: Some(dir_size_bytes(&self.root)),
935            queue_depth: self.queue_depth,
936        }
937    }
938
939    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
940        let _guard = self
941            .write_lock
942            .lock()
943            .expect("file event log write lock poisoned");
944        let next_id = self.latest_id_for_topic(topic)? + 1;
945        let previous_hash = self
946            .read_range_sync(topic, None, usize::MAX)?
947            .last()
948            .map(|(previous_id, previous_event)| {
949                crate::provenance::event_record_hash_from_headers(
950                    topic.as_str(),
951                    *previous_id,
952                    previous_event,
953                )
954            })
955            .transpose()?;
956        let event = crate::provenance::prepare_event_for_append(
957            topic.as_str(),
958            next_id,
959            previous_hash,
960            event,
961        )?;
962        let record = FileRecord {
963            id: next_id,
964            event: event.clone(),
965        };
966        let path = self.topic_path(topic);
967        if let Some(parent) = path.parent() {
968            std::fs::create_dir_all(parent)
969                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
970        }
971        let line = serde_json::to_string(&record)
972            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
973        use std::io::Write as _;
974        let mut file = std::fs::OpenOptions::new()
975            .create(true)
976            .append(true)
977            .open(&path)
978            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
979        writeln!(file, "{line}")
980            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
981        self.latest_ids
982            .lock()
983            .expect("file event log latest ids poisoned")
984            .insert(topic.as_str().to_string(), next_id);
985        self.broadcasts
986            .publish(topic, self.queue_depth, (next_id, event));
987        Ok(next_id)
988    }
989
990    async fn flush(&self) -> Result<(), LogError> {
991        sync_tree(&self.root)
992    }
993
994    async fn read_range(
995        &self,
996        topic: &Topic,
997        from: Option<EventId>,
998        limit: usize,
999    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1000        self.read_range_sync(topic, from, limit)
1001    }
1002
1003    async fn read_range_bytes(
1004        &self,
1005        topic: &Topic,
1006        from: Option<EventId>,
1007        limit: usize,
1008    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1009        self.read_range_sync(topic, from, limit)?
1010            .into_iter()
1011            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1012            .collect()
1013    }
1014
1015    async fn subscribe(
1016        self: Arc<Self>,
1017        topic: &Topic,
1018        from: Option<EventId>,
1019    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1020        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1021        let history = self.read_range_sync(topic, from, usize::MAX)?;
1022        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1023    }
1024
1025    async fn ack(
1026        &self,
1027        topic: &Topic,
1028        consumer: &ConsumerId,
1029        up_to: EventId,
1030    ) -> Result<(), LogError> {
1031        let path = self.consumer_path(topic, consumer);
1032        let payload = serde_json::json!({
1033            "topic": topic.as_str(),
1034            "consumer_id": consumer.as_str(),
1035            "cursor": up_to,
1036            "updated_at_ms": now_ms(),
1037        });
1038        write_json_atomically(&path, &payload)
1039    }
1040
1041    async fn consumer_cursor(
1042        &self,
1043        topic: &Topic,
1044        consumer: &ConsumerId,
1045    ) -> Result<Option<EventId>, LogError> {
1046        let path = self.consumer_path(topic, consumer);
1047        if !path.is_file() {
1048            return Ok(None);
1049        }
1050        let raw = std::fs::read_to_string(&path)
1051            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1052        let payload: serde_json::Value = serde_json::from_str(&raw)
1053            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1054        let cursor = payload
1055            .get("cursor")
1056            .and_then(serde_json::Value::as_u64)
1057            .ok_or_else(|| {
1058                LogError::Serde("event log consumer record missing numeric cursor".to_string())
1059            })?;
1060        Ok(Some(cursor))
1061    }
1062
1063    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1064        let latest = self.latest_id_for_topic(topic)?;
1065        if latest == 0 {
1066            Ok(None)
1067        } else {
1068            Ok(Some(latest))
1069        }
1070    }
1071
1072    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1073        let _guard = self
1074            .write_lock
1075            .lock()
1076            .expect("file event log write lock poisoned");
1077        let path = self.topic_path(topic);
1078        if !path.is_file() {
1079            return Ok(CompactReport::default());
1080        }
1081        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1082        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1083        let tmp = path.with_extension("jsonl.tmp");
1084        if retained.is_empty() {
1085            let _ = std::fs::remove_file(&path);
1086        } else {
1087            let mut writer =
1088                std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
1089                    LogError::Io(format!("event log tmp create error: {error}"))
1090                })?);
1091            use std::io::Write as _;
1092            for (event_id, event) in &retained {
1093                let line = serde_json::to_string(&FileRecord {
1094                    id: *event_id,
1095                    event: event.clone(),
1096                })
1097                .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1098                writeln!(writer, "{line}")
1099                    .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1100            }
1101            writer
1102                .flush()
1103                .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
1104            std::fs::rename(&tmp, &path).map_err(|error| {
1105                LogError::Io(format!("event log compact finalize error: {error}"))
1106            })?;
1107        }
1108        let latest = retained.last().map(|(event_id, _)| *event_id);
1109        self.latest_ids
1110            .lock()
1111            .expect("file event log latest ids poisoned")
1112            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1113        Ok(CompactReport {
1114            removed,
1115            remaining: retained.len(),
1116            latest,
1117            checkpointed: false,
1118        })
1119    }
1120}
1121
1122pub struct SqliteEventLog {
1123    path: PathBuf,
1124    connection: Mutex<Connection>,
1125    broadcasts: BroadcastMap,
1126    queue_depth: usize,
1127}
1128
1129impl SqliteEventLog {
1130    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1131        if let Some(parent) = path.parent() {
1132            std::fs::create_dir_all(parent)
1133                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1134        }
1135        let connection = Connection::open(&path)
1136            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1137        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1138        // SQLITE_BUSY from a previous test's connection that hasn't finished
1139        // dropping yet (parallel `cargo test` on the same process, distinct
1140        // paths, still contends on SQLite's own global mutex under WAL-mode
1141        // promotion). Without this, `journal_mode = WAL` fails fast with
1142        // "database is locked" instead of retrying.
1143        connection
1144            .busy_timeout(std::time::Duration::from_secs(5))
1145            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1146        connection
1147            .pragma_update(None, "journal_mode", "WAL")
1148            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1149        connection
1150            .pragma_update(None, "synchronous", "NORMAL")
1151            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1152        connection
1153            .execute_batch(
1154                "CREATE TABLE IF NOT EXISTS topic_heads (
1155                    topic TEXT PRIMARY KEY,
1156                    last_id INTEGER NOT NULL
1157                );
1158                CREATE TABLE IF NOT EXISTS events (
1159                    topic TEXT NOT NULL,
1160                    event_id INTEGER NOT NULL,
1161                    kind TEXT NOT NULL,
1162                    payload BLOB NOT NULL,
1163                    headers TEXT NOT NULL,
1164                    occurred_at_ms INTEGER NOT NULL,
1165                    PRIMARY KEY (topic, event_id)
1166                );
1167                CREATE TABLE IF NOT EXISTS consumers (
1168                    topic TEXT NOT NULL,
1169                    consumer_id TEXT NOT NULL,
1170                    cursor INTEGER NOT NULL,
1171                    updated_at_ms INTEGER NOT NULL,
1172                    PRIMARY KEY (topic, consumer_id)
1173                );",
1174            )
1175            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1176        Ok(Self {
1177            path,
1178            connection: Mutex::new(connection),
1179            broadcasts: BroadcastMap::default(),
1180            queue_depth: queue_depth.max(1),
1181        })
1182    }
1183
1184    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1185        let connection = self
1186            .connection
1187            .lock()
1188            .expect("sqlite event log connection poisoned");
1189        let mut statement = connection
1190            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1191            .map_err(|error| {
1192                LogError::Sqlite(format!("event log topics prepare error: {error}"))
1193            })?;
1194        let rows = statement
1195            .query_map([], |row| row.get::<_, String>(0))
1196            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1197        let mut topics = Vec::new();
1198        for row in rows {
1199            topics.push(Topic::new(row.map_err(|error| {
1200                LogError::Sqlite(format!("event log topic row error: {error}"))
1201            })?)?);
1202        }
1203        Ok(topics)
1204    }
1205}
1206
1207impl EventLog for SqliteEventLog {
1208    fn describe(&self) -> EventLogDescription {
1209        EventLogDescription {
1210            backend: EventLogBackendKind::Sqlite,
1211            location: Some(self.path.clone()),
1212            size_bytes: Some(sqlite_size_bytes(&self.path)),
1213            queue_depth: self.queue_depth,
1214        }
1215    }
1216
1217    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1218        let mut connection = self
1219            .connection
1220            .lock()
1221            .expect("sqlite event log connection poisoned");
1222        let tx = connection
1223            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1224            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1225        tx.execute(
1226            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1227            params![topic.as_str()],
1228        )
1229        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1230        tx.execute(
1231            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1232            params![topic.as_str()],
1233        )
1234        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1235        let event_id = tx
1236            .query_row(
1237                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1238                params![topic.as_str()],
1239                |row| row.get::<_, i64>(0),
1240            )
1241            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1242            .and_then(sqlite_i64_to_event_id)?;
1243        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1244        let previous = tx
1245            .query_row(
1246                "SELECT event_id, kind, payload, headers, occurred_at_ms
1247                 FROM events
1248                 WHERE topic = ?1 AND event_id < ?2
1249                 ORDER BY event_id DESC
1250                 LIMIT 1",
1251                params![topic.as_str(), event_id_sql],
1252                |row| {
1253                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1254                    let headers: String = row.get(3)?;
1255                    Ok((
1256                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1257                        LogEvent {
1258                            kind: row.get(1)?,
1259                            payload: serde_json::from_slice(&payload).map_err(|error| {
1260                                rusqlite::Error::FromSqlConversionFailure(
1261                                    payload.len(),
1262                                    rusqlite::types::Type::Blob,
1263                                    Box::new(error),
1264                                )
1265                            })?,
1266                            headers: serde_json::from_str(&headers).map_err(|error| {
1267                                rusqlite::Error::FromSqlConversionFailure(
1268                                    headers.len(),
1269                                    rusqlite::types::Type::Text,
1270                                    Box::new(error),
1271                                )
1272                            })?,
1273                            occurred_at_ms: row.get(4)?,
1274                        },
1275                    ))
1276                },
1277            )
1278            .optional()
1279            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1280        let previous_hash = previous
1281            .as_ref()
1282            .map(|(previous_id, previous_event)| {
1283                crate::provenance::event_record_hash_from_headers(
1284                    topic.as_str(),
1285                    *previous_id,
1286                    previous_event,
1287                )
1288            })
1289            .transpose()?;
1290        let event = crate::provenance::prepare_event_for_append(
1291            topic.as_str(),
1292            event_id,
1293            previous_hash,
1294            event,
1295        )?;
1296        tx.execute(
1297            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1298             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1299            params![
1300                topic.as_str(),
1301                event_id_sql,
1302                event.kind,
1303                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1304                    "event log payload encode error: {error}"
1305                )))?,
1306                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1307                    "event log headers encode error: {error}"
1308                )))?,
1309                event.occurred_at_ms
1310            ],
1311        )
1312        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1313        tx.commit()
1314            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1315        self.broadcasts
1316            .publish(topic, self.queue_depth, (event_id, event.clone()));
1317        Ok(event_id)
1318    }
1319
1320    async fn flush(&self) -> Result<(), LogError> {
1321        let connection = self
1322            .connection
1323            .lock()
1324            .expect("sqlite event log connection poisoned");
1325        connection
1326            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1327            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1328        Ok(())
1329    }
1330
1331    async fn read_range(
1332        &self,
1333        topic: &Topic,
1334        from: Option<EventId>,
1335        limit: usize,
1336    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1337        let connection = self
1338            .connection
1339            .lock()
1340            .expect("sqlite event log connection poisoned");
1341        let mut statement = connection
1342            .prepare(
1343                "SELECT event_id, kind, payload, headers, occurred_at_ms
1344                 FROM events
1345                 WHERE topic = ?1 AND event_id > ?2
1346                 ORDER BY event_id ASC
1347                 LIMIT ?3",
1348            )
1349            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1350        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1351        let rows = statement
1352            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1353                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1354                let headers: String = row.get(3)?;
1355                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1356                Ok((
1357                    event_id,
1358                    LogEvent {
1359                        kind: row.get(1)?,
1360                        payload: serde_json::from_slice(&payload).map_err(|error| {
1361                            rusqlite::Error::FromSqlConversionFailure(
1362                                payload.len(),
1363                                rusqlite::types::Type::Blob,
1364                                Box::new(error),
1365                            )
1366                        })?,
1367                        headers: serde_json::from_str(&headers).map_err(|error| {
1368                            rusqlite::Error::FromSqlConversionFailure(
1369                                headers.len(),
1370                                rusqlite::types::Type::Text,
1371                                Box::new(error),
1372                            )
1373                        })?,
1374                        occurred_at_ms: row.get(4)?,
1375                    },
1376                ))
1377            })
1378            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1379        let mut events = Vec::new();
1380        for row in rows {
1381            events.push(
1382                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1383            );
1384        }
1385        Ok(events)
1386    }
1387
1388    async fn read_range_bytes(
1389        &self,
1390        topic: &Topic,
1391        from: Option<EventId>,
1392        limit: usize,
1393    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1394        let connection = self
1395            .connection
1396            .lock()
1397            .expect("sqlite event log connection poisoned");
1398        let mut statement = connection
1399            .prepare(
1400                "SELECT event_id, kind, payload, headers, occurred_at_ms
1401                 FROM events
1402                 WHERE topic = ?1 AND event_id > ?2
1403                 ORDER BY event_id ASC
1404                 LIMIT ?3",
1405            )
1406            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1407        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1408        let rows = statement
1409            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1410                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1411                let headers: String = row.get(3)?;
1412                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1413                Ok((
1414                    event_id,
1415                    LogEventBytes {
1416                        kind: row.get(1)?,
1417                        payload: Bytes::from(payload),
1418                        headers: serde_json::from_str(&headers).map_err(|error| {
1419                            rusqlite::Error::FromSqlConversionFailure(
1420                                headers.len(),
1421                                rusqlite::types::Type::Text,
1422                                Box::new(error),
1423                            )
1424                        })?,
1425                        occurred_at_ms: row.get(4)?,
1426                    },
1427                ))
1428            })
1429            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1430        let mut events = Vec::new();
1431        for row in rows {
1432            events.push(
1433                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1434            );
1435        }
1436        Ok(events)
1437    }
1438
1439    async fn subscribe(
1440        self: Arc<Self>,
1441        topic: &Topic,
1442        from: Option<EventId>,
1443    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1444        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1445        let history = self.read_range(topic, from, usize::MAX).await?;
1446        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1447    }
1448
1449    async fn ack(
1450        &self,
1451        topic: &Topic,
1452        consumer: &ConsumerId,
1453        up_to: EventId,
1454    ) -> Result<(), LogError> {
1455        let connection = self
1456            .connection
1457            .lock()
1458            .expect("sqlite event log connection poisoned");
1459        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1460        connection
1461            .execute(
1462                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1463                 VALUES (?1, ?2, ?3, ?4)
1464                 ON CONFLICT(topic, consumer_id)
1465                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1466                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1467            )
1468            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1469        Ok(())
1470    }
1471
1472    async fn consumer_cursor(
1473        &self,
1474        topic: &Topic,
1475        consumer: &ConsumerId,
1476    ) -> Result<Option<EventId>, LogError> {
1477        let connection = self
1478            .connection
1479            .lock()
1480            .expect("sqlite event log connection poisoned");
1481        connection
1482            .query_row(
1483                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1484                params![topic.as_str(), consumer.as_str()],
1485                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1486            )
1487            .optional()
1488            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1489    }
1490
1491    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1492        let connection = self
1493            .connection
1494            .lock()
1495            .expect("sqlite event log connection poisoned");
1496        connection
1497            .query_row(
1498                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1499                params![topic.as_str()],
1500                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1501            )
1502            .optional()
1503            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1504    }
1505
1506    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1507        let connection = self
1508            .connection
1509            .lock()
1510            .expect("sqlite event log connection poisoned");
1511        let before_sql = event_id_to_sqlite_i64(before)?;
1512        let removed = connection
1513            .execute(
1514                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1515                params![topic.as_str(), before_sql],
1516            )
1517            .map_err(|error| {
1518                LogError::Sqlite(format!("event log compact delete error: {error}"))
1519            })?;
1520        let remaining = connection
1521            .query_row(
1522                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1523                params![topic.as_str()],
1524                |row| row.get::<_, i64>(0),
1525            )
1526            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1527            .and_then(sqlite_i64_to_usize)?;
1528        let latest = connection
1529            .query_row(
1530                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1531                params![topic.as_str()],
1532                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1533            )
1534            .optional()
1535            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1536        connection
1537            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1538            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1539        Ok(CompactReport {
1540            removed,
1541            remaining,
1542            latest,
1543            checkpointed: true,
1544        })
1545    }
1546}
1547
1548fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1549    let candidate = PathBuf::from(value);
1550    if candidate.is_absolute() {
1551        candidate
1552    } else {
1553        base_dir.join(candidate)
1554    }
1555}
1556
1557fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1558    if let Some(parent) = path.parent() {
1559        std::fs::create_dir_all(parent)
1560            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1561    }
1562    let tmp = path.with_extension("tmp");
1563    let encoded = serde_json::to_vec_pretty(payload)
1564        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1565    std::fs::write(&tmp, encoded)
1566        .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1567    std::fs::rename(&tmp, path)
1568        .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1569    Ok(())
1570}
1571
1572fn sanitize_filename(value: &str) -> String {
1573    sanitize_topic_component(value)
1574}
1575
1576pub fn sanitize_topic_component(value: &str) -> String {
1577    value
1578        .chars()
1579        .map(|ch| {
1580            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1581                ch
1582            } else {
1583                '_'
1584            }
1585        })
1586        .collect()
1587}
1588
1589fn dir_size_bytes(path: &Path) -> u64 {
1590    if !path.exists() {
1591        return 0;
1592    }
1593    let mut total = 0;
1594    if let Ok(entries) = std::fs::read_dir(path) {
1595        for entry in entries.flatten() {
1596            let path = entry.path();
1597            if path.is_dir() {
1598                total += dir_size_bytes(&path);
1599            } else if let Ok(metadata) = entry.metadata() {
1600                total += metadata.len();
1601            }
1602        }
1603    }
1604    total
1605}
1606
1607fn sqlite_size_bytes(path: &Path) -> u64 {
1608    let mut total = file_size(path);
1609    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1610    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1611    total
1612}
1613
1614fn file_size(path: &Path) -> u64 {
1615    std::fs::metadata(path)
1616        .map(|metadata| metadata.len())
1617        .unwrap_or(0)
1618}
1619
1620fn sync_tree(root: &Path) -> Result<(), LogError> {
1621    if !root.exists() {
1622        return Ok(());
1623    }
1624    for entry in std::fs::read_dir(root)
1625        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1626    {
1627        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1628        let path = entry.path();
1629        if path.is_dir() {
1630            sync_tree(&path)?;
1631            continue;
1632        }
1633        std::fs::File::open(&path)
1634            .and_then(|file| file.sync_all())
1635            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1636    }
1637    Ok(())
1638}
1639
1640fn now_ms() -> i64 {
1641    std::time::SystemTime::now()
1642        .duration_since(std::time::UNIX_EPOCH)
1643        .map(|duration| duration.as_millis() as i64)
1644        .unwrap_or(0)
1645}
1646
1647fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1648    i64::try_from(event_id)
1649        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1650}
1651
1652fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1653    u64::try_from(value)
1654        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1655}
1656
1657fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1658    u64::try_from(value).map_err(|_| {
1659        rusqlite::Error::FromSqlConversionFailure(
1660            std::mem::size_of::<i64>(),
1661            rusqlite::types::Type::Integer,
1662            "sqlite event id is negative".into(),
1663        )
1664    })
1665}
1666
1667fn sqlite_json_bytes_for_row(
1668    row: &rusqlite::Row<'_>,
1669    index: usize,
1670    name: &str,
1671) -> rusqlite::Result<Vec<u8>> {
1672    let value = row.get_ref(index)?;
1673    match value {
1674        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1675            Ok(bytes.to_vec())
1676        }
1677        other => Err(rusqlite::Error::InvalidColumnType(
1678            index,
1679            name.to_string(),
1680            other.data_type(),
1681        )),
1682    }
1683}
1684
1685fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1686    usize::try_from(value)
1687        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692    use super::*;
1693    use futures::StreamExt;
1694    use rand::{rngs::StdRng, RngExt, SeedableRng};
1695
1696    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1697        let topic = Topic::new("trigger.inbox").unwrap();
1698        for i in 0..10_000 {
1699            log.append(
1700                &topic,
1701                LogEvent::new("append", serde_json::json!({ "i": i })),
1702            )
1703            .await
1704            .unwrap();
1705        }
1706        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1707        assert_eq!(events.len(), 10_000);
1708        assert_eq!(events.first().unwrap().0, 1);
1709        assert_eq!(events.last().unwrap().0, 10_000);
1710    }
1711
1712    #[tokio::test(flavor = "current_thread")]
1713    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1714        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1715        exercise_basic_backend(log.clone()).await;
1716
1717        let topic = Topic::new("agent.transcript.demo").unwrap();
1718        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1719        let first = log
1720            .append(
1721                &topic,
1722                LogEvent::new("message", serde_json::json!({"text":"one"})),
1723            )
1724            .await
1725            .unwrap();
1726        let second = log
1727            .append(
1728                &topic,
1729                LogEvent::new("message", serde_json::json!({"text":"two"})),
1730            )
1731            .await
1732            .unwrap();
1733        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1734        assert_eq!(seen[0].as_ref().unwrap().0, first);
1735        assert_eq!(seen[1].as_ref().unwrap().0, second);
1736
1737        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1738            .await
1739            .unwrap();
1740        let compact = log.compact(&topic, first).await.unwrap();
1741        assert_eq!(compact.removed, 1);
1742        assert_eq!(compact.remaining, 1);
1743    }
1744
1745    #[tokio::test(flavor = "current_thread")]
1746    async fn file_backend_persists_across_reopen_and_compacts() {
1747        let dir = tempfile::tempdir().unwrap();
1748        let topic = Topic::new("trigger.outbox").unwrap();
1749        let first_log = Arc::new(AnyEventLog::File(
1750            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1751        ));
1752        first_log
1753            .append(
1754                &topic,
1755                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1756            )
1757            .await
1758            .unwrap();
1759        first_log
1760            .append(
1761                &topic,
1762                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1763            )
1764            .await
1765            .unwrap();
1766        drop(first_log);
1767
1768        let reopened = Arc::new(AnyEventLog::File(
1769            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1770        ));
1771        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1772        assert_eq!(events.len(), 2);
1773        let compact = reopened.compact(&topic, 1).await.unwrap();
1774        assert_eq!(compact.removed, 1);
1775        assert_eq!(
1776            reopened
1777                .read_range(&topic, None, usize::MAX)
1778                .await
1779                .unwrap()
1780                .len(),
1781            1
1782        );
1783    }
1784
1785    #[tokio::test(flavor = "current_thread")]
1786    async fn file_backend_skips_torn_tail_on_restart() {
1787        let dir = tempfile::tempdir().unwrap();
1788        let topic = Topic::new("trigger.inbox").unwrap();
1789        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1790        first_log
1791            .append(
1792                &topic,
1793                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1794            )
1795            .await
1796            .unwrap();
1797        drop(first_log);
1798
1799        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1800        use std::io::Write as _;
1801        let mut file = std::fs::OpenOptions::new()
1802            .append(true)
1803            .open(&topic_path)
1804            .unwrap();
1805        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1806        drop(file);
1807
1808        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1809        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1810        assert_eq!(events.len(), 1);
1811        assert_eq!(events[0].0, 1);
1812        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1813    }
1814
1815    #[tokio::test(flavor = "current_thread")]
1816    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1817        let dir = tempfile::tempdir().unwrap();
1818        let path = dir.path().join("events.sqlite");
1819        let topic = Topic::new("daemon.demo.state").unwrap();
1820        let first_log = Arc::new(AnyEventLog::Sqlite(
1821            SqliteEventLog::open(path.clone(), 8).unwrap(),
1822        ));
1823        first_log
1824            .append(
1825                &topic,
1826                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1827            )
1828            .await
1829            .unwrap();
1830        first_log
1831            .append(
1832                &topic,
1833                LogEvent::new("state", serde_json::json!({"state":"active"})),
1834            )
1835            .await
1836            .unwrap();
1837        drop(first_log);
1838
1839        let reopened = Arc::new(AnyEventLog::Sqlite(
1840            SqliteEventLog::open(path.clone(), 8).unwrap(),
1841        ));
1842        assert_eq!(
1843            reopened
1844                .read_range(&topic, None, usize::MAX)
1845                .await
1846                .unwrap()
1847                .len(),
1848            2
1849        );
1850        let compact = reopened.compact(&topic, 1).await.unwrap();
1851        assert!(compact.checkpointed);
1852        let wal = PathBuf::from(format!("{}-wal", path.display()));
1853        assert!(file_size(&wal) == 0 || !wal.exists());
1854    }
1855
1856    #[tokio::test(flavor = "current_thread")]
1857    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1858        let dir = tempfile::tempdir().unwrap();
1859        let path = dir.path().join("events.sqlite");
1860        let topic = Topic::new("observability.action_graph").unwrap();
1861        let log = SqliteEventLog::open(path, 8).unwrap();
1862        let event_id = log
1863            .append(
1864                &topic,
1865                LogEvent::new(
1866                    "snapshot",
1867                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1868                ),
1869            )
1870            .await
1871            .unwrap();
1872
1873        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1874        assert_eq!(events.len(), 1);
1875        assert_eq!(events[0].0, event_id);
1876        assert_eq!(
1877            events[0].1.payload_json().unwrap(),
1878            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1879        );
1880    }
1881
1882    #[tokio::test(flavor = "current_thread")]
1883    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1884        let dir = tempfile::tempdir().unwrap();
1885        let path = dir.path().join("events.sqlite");
1886        let topic = Topic::new("agent.transcript.legacy").unwrap();
1887        let log = SqliteEventLog::open(path, 8).unwrap();
1888        {
1889            let connection = log.connection.lock().unwrap();
1890            connection
1891                .execute(
1892                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1893                    params![topic.as_str()],
1894                )
1895                .unwrap();
1896            connection
1897                .execute(
1898                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1899                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1900                    params![topic.as_str(), "{\"text\":\"old\"}"],
1901                )
1902                .unwrap();
1903        }
1904
1905        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1906        assert_eq!(
1907            events[0].1.payload_json().unwrap(),
1908            serde_json::json!({"text": "old"})
1909        );
1910        assert_eq!(
1911            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1912            "legacy"
1913        );
1914    }
1915
1916    #[tokio::test(flavor = "current_thread")]
1917    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1918        let (sender, rx) = broadcast::channel(2);
1919        for i in 0..10 {
1920            sender
1921                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1922                .unwrap();
1923        }
1924        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1925
1926        match stream.next().await {
1927            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1928            other => panic!("subscriber should surface lag, got {other:?}"),
1929        }
1930    }
1931
1932    #[tokio::test(flavor = "current_thread")]
1933    async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1934        let (sender, rx) = broadcast::channel(2);
1935        let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1936        assert_eq!(sender.receiver_count(), 1);
1937        drop(stream);
1938
1939        tokio::time::timeout(std::time::Duration::from_millis(100), async {
1940            while sender.receiver_count() != 0 {
1941                tokio::task::yield_now().await;
1942            }
1943        })
1944        .await
1945        .expect("subscription receiver should close after consumer drop");
1946    }
1947
1948    #[tokio::test(flavor = "current_thread")]
1949    async fn randomized_reader_sequences_stay_monotonic() {
1950        let log = Arc::new(MemoryEventLog::new(32));
1951        let topic = Topic::new("fuzz.demo").unwrap();
1952        let mut readers = vec![
1953            log.clone().subscribe(&topic, None).await.unwrap(),
1954            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1955            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1956        ];
1957        let mut rng = StdRng::seed_from_u64(7);
1958        for _ in 0..64 {
1959            let value = rng.random_range(0..1000);
1960            log.append(
1961                &topic,
1962                LogEvent::new("rand", serde_json::json!({"value": value})),
1963            )
1964            .await
1965            .unwrap();
1966        }
1967
1968        let mut sequences = Vec::new();
1969        for reader in &mut readers {
1970            let mut ids = Vec::new();
1971            while let Some(item) = reader.next().await {
1972                match item {
1973                    Ok((event_id, _)) => {
1974                        ids.push(event_id);
1975                        if ids.len() >= 16 {
1976                            break;
1977                        }
1978                    }
1979                    Err(LogError::ConsumerLagged(_)) => break,
1980                    Err(error) => panic!("unexpected subscription error: {error}"),
1981                }
1982            }
1983            sequences.push(ids);
1984        }
1985
1986        for ids in sequences {
1987            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1988        }
1989    }
1990}