Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27        let value = value.into();
28        if value.is_empty() {
29            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30        }
31        if !value
32            .chars()
33            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34        {
35            return Err(LogError::InvalidTopic(format!(
36                "topic '{value}' contains unsupported characters"
37            )));
38        }
39        Ok(Self(value))
40    }
41
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl fmt::Display for Topic {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        self.0.fmt(f)
50    }
51}
52
53impl FromStr for Topic {
54    type Err = LogError;
55
56    fn from_str(s: &str) -> Result<Self, Self::Err> {
57        Self::new(s)
58    }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66        let value = value.into();
67        if value.trim().is_empty() {
68            return Err(LogError::InvalidConsumer(
69                "consumer id cannot be empty".to_string(),
70            ));
71        }
72        Ok(Self(value))
73    }
74
75    pub fn as_str(&self) -> &str {
76        &self.0
77    }
78}
79
80impl fmt::Display for ConsumerId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        self.0.fmt(f)
83    }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89    Memory,
90    File,
91    Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Memory => write!(f, "memory"),
98            Self::File => write!(f, "file"),
99            Self::Sqlite => write!(f, "sqlite"),
100        }
101    }
102}
103
104impl FromStr for EventLogBackendKind {
105    type Err = LogError;
106
107    fn from_str(value: &str) -> Result<Self, Self::Err> {
108        match value.trim().to_ascii_lowercase().as_str() {
109            "memory" => Ok(Self::Memory),
110            "file" => Ok(Self::File),
111            "sqlite" => Ok(Self::Sqlite),
112            other => Err(LogError::Config(format!(
113                "unsupported event log backend '{other}'"
114            ))),
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121    pub kind: String,
122    pub payload: serde_json::Value,
123    #[serde(default)]
124    pub headers: BTreeMap<String, String>,
125    pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130        Self {
131            kind: kind.into(),
132            payload,
133            headers: BTreeMap::new(),
134            occurred_at_ms: now_ms(),
135        }
136    }
137
138    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139        self.headers = headers;
140        self
141    }
142
143    /// Apply the unified redaction policy to this event's headers and
144    /// payload. Backends are intentionally left unaware of redaction —
145    /// emitters that need scrubbed events call this before append, and
146    /// readers that materialize events for display can apply the policy
147    /// again as defense in depth.
148    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
149        self.headers = policy.redact_headers(&self.headers);
150        policy.redact_json_in_place(&mut self.payload);
151    }
152}
153
154/// Serialized event payload form for large read paths.
155///
156/// `payload` contains the original JSON bytes for backends that can expose
157/// them directly. Callers that only need to forward or hash the payload can
158/// avoid materializing a `serde_json::Value`; callers that need structured
159/// access can opt in with `payload_json`.
160#[derive(Clone, Debug, PartialEq, Eq)]
161pub struct LogEventBytes {
162    pub kind: String,
163    pub payload: Bytes,
164    pub headers: BTreeMap<String, String>,
165    pub occurred_at_ms: i64,
166}
167
168impl LogEventBytes {
169    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
170        serde_json::from_slice(&self.payload)
171            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
172    }
173
174    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
175        Ok(LogEvent {
176            kind: self.kind,
177            payload: serde_json::from_slice(&self.payload).map_err(|error| {
178                LogError::Serde(format!("event log payload parse error: {error}"))
179            })?,
180            headers: self.headers,
181            occurred_at_ms: self.occurred_at_ms,
182        })
183    }
184}
185
186impl TryFrom<LogEvent> for LogEventBytes {
187    type Error = LogError;
188
189    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
190        let payload = serde_json::to_vec(&event.payload)
191            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
192        Ok(Self {
193            kind: event.kind,
194            payload: Bytes::from(payload),
195            headers: event.headers,
196            occurred_at_ms: event.occurred_at_ms,
197        })
198    }
199}
200
201#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
202pub struct CompactReport {
203    pub removed: usize,
204    pub remaining: usize,
205    pub latest: Option<EventId>,
206    pub checkpointed: bool,
207}
208
209#[derive(Clone, Debug, PartialEq, Eq)]
210pub struct EventLogDescription {
211    pub backend: EventLogBackendKind,
212    pub location: Option<PathBuf>,
213    pub size_bytes: Option<u64>,
214    pub queue_depth: usize,
215}
216
217#[derive(Debug)]
218pub enum LogError {
219    Config(String),
220    InvalidTopic(String),
221    InvalidConsumer(String),
222    Io(String),
223    Serde(String),
224    Sqlite(String),
225    ConsumerLagged(EventId),
226}
227
228impl fmt::Display for LogError {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        match self {
231            Self::Config(message)
232            | Self::InvalidTopic(message)
233            | Self::InvalidConsumer(message)
234            | Self::Io(message)
235            | Self::Serde(message)
236            | Self::Sqlite(message) => message.fmt(f),
237            Self::ConsumerLagged(last_id) => {
238                write!(f, "subscriber lagged behind after event {last_id}")
239            }
240        }
241    }
242}
243
244impl std::error::Error for LogError {}
245
246#[allow(async_fn_in_trait)]
247pub trait EventLog: Send + Sync {
248    fn describe(&self) -> EventLogDescription;
249
250    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
251
252    async fn flush(&self) -> Result<(), LogError>;
253
254    /// Read events strictly after `from`. `None` starts from the
255    /// beginning of the topic.
256    async fn read_range(
257        &self,
258        topic: &Topic,
259        from: Option<EventId>,
260        limit: usize,
261    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
262
263    async fn read_range_bytes(
264        &self,
265        topic: &Topic,
266        from: Option<EventId>,
267        limit: usize,
268    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
269        let events = self.read_range(topic, from, limit).await?;
270        events
271            .into_iter()
272            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
273            .collect()
274    }
275
276    /// `async fn` keeps the ergonomic generic surface; the boxed stream
277    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
278    async fn subscribe(
279        self: Arc<Self>,
280        topic: &Topic,
281        from: Option<EventId>,
282    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
283
284    async fn ack(
285        &self,
286        topic: &Topic,
287        consumer: &ConsumerId,
288        up_to: EventId,
289    ) -> Result<(), LogError>;
290
291    async fn consumer_cursor(
292        &self,
293        topic: &Topic,
294        consumer: &ConsumerId,
295    ) -> Result<Option<EventId>, LogError>;
296
297    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
298
299    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
300}
301
302#[derive(Clone, Debug)]
303pub struct EventLogConfig {
304    pub backend: EventLogBackendKind,
305    pub file_dir: PathBuf,
306    pub sqlite_path: PathBuf,
307    pub queue_depth: usize,
308}
309
310impl EventLogConfig {
311    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
312        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
313            .ok()
314            .map(|value| value.parse())
315            .transpose()?
316            .unwrap_or(EventLogBackendKind::Sqlite);
317        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
318            .ok()
319            .and_then(|value| value.parse::<usize>().ok())
320            .unwrap_or(128)
321            .max(1);
322
323        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
324            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
325            _ => crate::runtime_paths::event_log_dir(base_dir),
326        };
327        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
328            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
329            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
330        };
331
332        Ok(Self {
333            backend,
334            file_dir,
335            sqlite_path,
336            queue_depth,
337        })
338    }
339
340    pub fn location(&self) -> Option<PathBuf> {
341        match self.backend {
342            EventLogBackendKind::Memory => None,
343            EventLogBackendKind::File => Some(self.file_dir.clone()),
344            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
345        }
346    }
347}
348
349thread_local! {
350    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
351}
352
353pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
354    let config = EventLogConfig::for_base_dir(base_dir)?;
355    let log = open_event_log(&config)?;
356    ACTIVE_EVENT_LOG.with(|slot| {
357        *slot.borrow_mut() = Some(log.clone());
358    });
359    Ok(log)
360}
361
362pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
363    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
364    ACTIVE_EVENT_LOG.with(|slot| {
365        *slot.borrow_mut() = Some(log.clone());
366    });
367    log
368}
369
370pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
371    ACTIVE_EVENT_LOG.with(|slot| {
372        *slot.borrow_mut() = Some(log.clone());
373    });
374    log
375}
376
377pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
378    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
379}
380
381pub fn reset_active_event_log() {
382    ACTIVE_EVENT_LOG.with(|slot| {
383        *slot.borrow_mut() = None;
384    });
385}
386
387pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
388    let config = EventLogConfig::for_base_dir(base_dir)?;
389    let description = match config.backend {
390        EventLogBackendKind::Memory => EventLogDescription {
391            backend: EventLogBackendKind::Memory,
392            location: None,
393            size_bytes: None,
394            queue_depth: config.queue_depth,
395        },
396        EventLogBackendKind::File => EventLogDescription {
397            backend: EventLogBackendKind::File,
398            size_bytes: Some(dir_size_bytes(&config.file_dir)),
399            location: Some(config.file_dir),
400            queue_depth: config.queue_depth,
401        },
402        EventLogBackendKind::Sqlite => EventLogDescription {
403            backend: EventLogBackendKind::Sqlite,
404            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
405            location: Some(config.sqlite_path),
406            queue_depth: config.queue_depth,
407        },
408    };
409    Ok(description)
410}
411
412pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
413    match config.backend {
414        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
415            config.queue_depth,
416        )))),
417        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
418            config.file_dir.clone(),
419            config.queue_depth,
420        )?))),
421        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
422            config.sqlite_path.clone(),
423            config.queue_depth,
424        )?))),
425    }
426}
427
428pub enum AnyEventLog {
429    Memory(MemoryEventLog),
430    File(FileEventLog),
431    Sqlite(SqliteEventLog),
432}
433
434impl AnyEventLog {
435    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
436        match self {
437            Self::Memory(log) => log.topics().await,
438            Self::File(log) => log.topics(),
439            Self::Sqlite(log) => log.topics(),
440        }
441    }
442}
443
444impl EventLog for AnyEventLog {
445    fn describe(&self) -> EventLogDescription {
446        match self {
447            Self::Memory(log) => log.describe(),
448            Self::File(log) => log.describe(),
449            Self::Sqlite(log) => log.describe(),
450        }
451    }
452
453    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
454        match self {
455            Self::Memory(log) => log.append(topic, event).await,
456            Self::File(log) => log.append(topic, event).await,
457            Self::Sqlite(log) => log.append(topic, event).await,
458        }
459    }
460
461    async fn flush(&self) -> Result<(), LogError> {
462        match self {
463            Self::Memory(log) => log.flush().await,
464            Self::File(log) => log.flush().await,
465            Self::Sqlite(log) => log.flush().await,
466        }
467    }
468
469    async fn read_range(
470        &self,
471        topic: &Topic,
472        from: Option<EventId>,
473        limit: usize,
474    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
475        match self {
476            Self::Memory(log) => log.read_range(topic, from, limit).await,
477            Self::File(log) => log.read_range(topic, from, limit).await,
478            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
479        }
480    }
481
482    async fn read_range_bytes(
483        &self,
484        topic: &Topic,
485        from: Option<EventId>,
486        limit: usize,
487    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
488        match self {
489            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
490            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
491            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
492        }
493    }
494
495    async fn subscribe(
496        self: Arc<Self>,
497        topic: &Topic,
498        from: Option<EventId>,
499    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
500        let (rx, queue_depth) = match self.as_ref() {
501            Self::Memory(log) => (
502                log.broadcasts.subscribe(topic, log.queue_depth),
503                log.queue_depth,
504            ),
505            Self::File(log) => (
506                log.broadcasts.subscribe(topic, log.queue_depth),
507                log.queue_depth,
508            ),
509            Self::Sqlite(log) => (
510                log.broadcasts.subscribe(topic, log.queue_depth),
511                log.queue_depth,
512            ),
513        };
514        let history = self.read_range(topic, from, usize::MAX).await?;
515        Ok(stream_from_broadcast(history, from, rx, queue_depth))
516    }
517
518    async fn ack(
519        &self,
520        topic: &Topic,
521        consumer: &ConsumerId,
522        up_to: EventId,
523    ) -> Result<(), LogError> {
524        match self {
525            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
526            Self::File(log) => log.ack(topic, consumer, up_to).await,
527            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
528        }
529    }
530
531    async fn consumer_cursor(
532        &self,
533        topic: &Topic,
534        consumer: &ConsumerId,
535    ) -> Result<Option<EventId>, LogError> {
536        match self {
537            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
538            Self::File(log) => log.consumer_cursor(topic, consumer).await,
539            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
540        }
541    }
542
543    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
544        match self {
545            Self::Memory(log) => log.latest(topic).await,
546            Self::File(log) => log.latest(topic).await,
547            Self::Sqlite(log) => log.latest(topic).await,
548        }
549    }
550
551    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
552        match self {
553            Self::Memory(log) => log.compact(topic, before).await,
554            Self::File(log) => log.compact(topic, before).await,
555            Self::Sqlite(log) => log.compact(topic, before).await,
556        }
557    }
558}
559
560#[derive(Default)]
561struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
562
563impl BroadcastMap {
564    fn subscribe(
565        &self,
566        topic: &Topic,
567        capacity: usize,
568    ) -> broadcast::Receiver<(EventId, LogEvent)> {
569        self.sender(topic, capacity).subscribe()
570    }
571
572    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
573        let _ = self.sender(topic, capacity).send(record);
574    }
575
576    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
577        let mut map = self.0.lock().expect("event log broadcast map poisoned");
578        map.entry(topic.as_str().to_string())
579            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
580            .clone()
581    }
582}
583
584fn stream_from_broadcast(
585    history: Vec<(EventId, LogEvent)>,
586    from: Option<EventId>,
587    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
588    queue_depth: usize,
589) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
590    let (tx, rx) = mpsc::channel(queue_depth.max(1));
591    // Run the subscription forwarder as a tokio task rather than a detached
592    // OS thread. A dedicated thread running under `futures::executor::block_on`
593    // is invisible to the tokio runtime, so tests that use `start_paused = true`
594    // race against auto-advanced timers while the thread catches up in real
595    // time. Spawning on tokio makes the forwarder participate in runtime
596    // scheduling (including paused-time quiescence) and ties its lifetime to
597    // the runtime's shutdown.
598    tokio::spawn(async move {
599        let mut last_seen = from.unwrap_or(0);
600        for (event_id, event) in history {
601            last_seen = event_id;
602            if tx.send(Ok((event_id, event))).await.is_err() {
603                return;
604            }
605        }
606
607        loop {
608            tokio::select! {
609                _ = tx.closed() => return,
610                received = live_rx.recv() => {
611                    match received {
612                        Ok((event_id, event)) if event_id > last_seen => {
613                            last_seen = event_id;
614                            if tx.send(Ok((event_id, event))).await.is_err() {
615                                return;
616                            }
617                        }
618                        Ok(_) => {}
619                        Err(broadcast::error::RecvError::Closed) => return,
620                        Err(broadcast::error::RecvError::Lagged(_)) => {
621                            let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
622                            return;
623                        }
624                    }
625                }
626            }
627        }
628    });
629    Box::pin(ReceiverStream::new(rx))
630}
631
632#[derive(Default)]
633struct MemoryState {
634    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
635    latest: HashMap<String, EventId>,
636    consumers: HashMap<(String, String), EventId>,
637}
638
639pub struct MemoryEventLog {
640    state: tokio::sync::Mutex<MemoryState>,
641    broadcasts: BroadcastMap,
642    queue_depth: usize,
643}
644
645impl MemoryEventLog {
646    pub fn new(queue_depth: usize) -> Self {
647        Self {
648            state: tokio::sync::Mutex::new(MemoryState::default()),
649            broadcasts: BroadcastMap::default(),
650            queue_depth: queue_depth.max(1),
651        }
652    }
653
654    async fn topics(&self) -> Result<Vec<Topic>, LogError> {
655        let state = self.state.lock().await;
656        let mut topics = state
657            .topics
658            .keys()
659            .map(|topic| Topic::new(topic.clone()))
660            .collect::<Result<Vec<_>, _>>()?;
661        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
662        Ok(topics)
663    }
664}
665
666impl EventLog for MemoryEventLog {
667    fn describe(&self) -> EventLogDescription {
668        EventLogDescription {
669            backend: EventLogBackendKind::Memory,
670            location: None,
671            size_bytes: None,
672            queue_depth: self.queue_depth,
673        }
674    }
675
676    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
677        let mut state = self.state.lock().await;
678        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
679        let previous_hash = state
680            .topics
681            .get(topic.as_str())
682            .and_then(|events| events.back())
683            .map(|(previous_id, previous_event)| {
684                crate::provenance::event_record_hash_from_headers(
685                    topic.as_str(),
686                    *previous_id,
687                    previous_event,
688                )
689            })
690            .transpose()?;
691        let event = crate::provenance::prepare_event_for_append(
692            topic.as_str(),
693            event_id,
694            previous_hash,
695            event,
696        )?;
697        state.latest.insert(topic.as_str().to_string(), event_id);
698        state
699            .topics
700            .entry(topic.as_str().to_string())
701            .or_default()
702            .push_back((event_id, event.clone()));
703        drop(state);
704        self.broadcasts
705            .publish(topic, self.queue_depth, (event_id, event));
706        Ok(event_id)
707    }
708
709    async fn flush(&self) -> Result<(), LogError> {
710        Ok(())
711    }
712
713    async fn read_range(
714        &self,
715        topic: &Topic,
716        from: Option<EventId>,
717        limit: usize,
718    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
719        let from = from.unwrap_or(0);
720        let state = self.state.lock().await;
721        let events = state
722            .topics
723            .get(topic.as_str())
724            .into_iter()
725            .flat_map(|events| events.iter())
726            .filter(|(event_id, _)| *event_id > from)
727            .take(limit)
728            .map(|(event_id, event)| (*event_id, event.clone()))
729            .collect();
730        Ok(events)
731    }
732
733    async fn subscribe(
734        self: Arc<Self>,
735        topic: &Topic,
736        from: Option<EventId>,
737    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
738        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
739        let history = self.read_range(topic, from, usize::MAX).await?;
740        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
741    }
742
743    async fn ack(
744        &self,
745        topic: &Topic,
746        consumer: &ConsumerId,
747        up_to: EventId,
748    ) -> Result<(), LogError> {
749        let mut state = self.state.lock().await;
750        state.consumers.insert(
751            (topic.as_str().to_string(), consumer.as_str().to_string()),
752            up_to,
753        );
754        Ok(())
755    }
756
757    async fn consumer_cursor(
758        &self,
759        topic: &Topic,
760        consumer: &ConsumerId,
761    ) -> Result<Option<EventId>, LogError> {
762        let state = self.state.lock().await;
763        Ok(state
764            .consumers
765            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
766            .copied())
767    }
768
769    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
770        let state = self.state.lock().await;
771        Ok(state.latest.get(topic.as_str()).copied())
772    }
773
774    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
775        let mut state = self.state.lock().await;
776        let Some(events) = state.topics.get_mut(topic.as_str()) else {
777            return Ok(CompactReport::default());
778        };
779        let removed = events
780            .iter()
781            .take_while(|(event_id, _)| *event_id <= before)
782            .count();
783        for _ in 0..removed {
784            events.pop_front();
785        }
786        Ok(CompactReport {
787            removed,
788            remaining: events.len(),
789            latest: state.latest.get(topic.as_str()).copied(),
790            checkpointed: false,
791        })
792    }
793}
794
795#[derive(Serialize, Deserialize)]
796struct FileRecord {
797    id: EventId,
798    event: LogEvent,
799}
800
801pub struct FileEventLog {
802    root: PathBuf,
803    latest_ids: Mutex<HashMap<String, EventId>>,
804    write_lock: Mutex<()>,
805    broadcasts: BroadcastMap,
806    queue_depth: usize,
807}
808
809impl FileEventLog {
810    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
811        std::fs::create_dir_all(root.join("topics"))
812            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
813        std::fs::create_dir_all(root.join("consumers"))
814            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
815        Ok(Self {
816            root,
817            latest_ids: Mutex::new(HashMap::new()),
818            write_lock: Mutex::new(()),
819            broadcasts: BroadcastMap::default(),
820            queue_depth: queue_depth.max(1),
821        })
822    }
823
824    fn topic_path(&self, topic: &Topic) -> PathBuf {
825        self.root
826            .join("topics")
827            .join(format!("{}.jsonl", topic.as_str()))
828    }
829
830    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
831        self.root.join("consumers").join(format!(
832            "{}__{}.json",
833            topic.as_str(),
834            sanitize_filename(consumer.as_str())
835        ))
836    }
837
838    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
839        if let Some(event_id) = self
840            .latest_ids
841            .lock()
842            .expect("file event log latest ids poisoned")
843            .get(topic.as_str())
844            .copied()
845        {
846            return Ok(event_id);
847        }
848
849        let mut latest = 0;
850        let path = self.topic_path(topic);
851        if path.is_file() {
852            for record in read_file_records(&path)? {
853                latest = record.id;
854            }
855        }
856        self.latest_ids
857            .lock()
858            .expect("file event log latest ids poisoned")
859            .insert(topic.as_str().to_string(), latest);
860        Ok(latest)
861    }
862
863    fn read_range_sync(
864        &self,
865        topic: &Topic,
866        from: Option<EventId>,
867        limit: usize,
868    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
869        let path = self.topic_path(topic);
870        if !path.is_file() {
871            return Ok(Vec::new());
872        }
873        let from = from.unwrap_or(0);
874        let mut events = Vec::new();
875        for record in read_file_records(&path)? {
876            if record.id > from {
877                events.push((record.id, record.event));
878            }
879            if events.len() >= limit {
880                break;
881            }
882        }
883        Ok(events)
884    }
885
886    fn topics(&self) -> Result<Vec<Topic>, LogError> {
887        let topics_dir = self.root.join("topics");
888        if !topics_dir.is_dir() {
889            return Ok(Vec::new());
890        }
891        let mut topics = Vec::new();
892        for entry in std::fs::read_dir(&topics_dir)
893            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
894        {
895            let entry = entry
896                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
897            let path = entry.path();
898            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
899                continue;
900            }
901            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
902                continue;
903            };
904            topics.push(Topic::new(stem.to_string())?);
905        }
906        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
907        Ok(topics)
908    }
909}
910
911fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
912    let file = std::fs::File::open(path)
913        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
914    let mut reader = std::io::BufReader::new(file);
915    let mut records = Vec::new();
916    let mut line = Vec::new();
917    loop {
918        line.clear();
919        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
920            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
921        if bytes_read == 0 {
922            break;
923        }
924        if line.iter().all(u8::is_ascii_whitespace) {
925            continue;
926        }
927        let complete_line = line.ends_with(b"\n");
928        match serde_json::from_slice::<FileRecord>(&line) {
929            Ok(record) => records.push(record),
930            Err(_) if !complete_line => break,
931            Err(error) => {
932                return Err(LogError::Serde(format!("event log parse error: {error}")));
933            }
934        }
935    }
936    Ok(records)
937}
938
939impl EventLog for FileEventLog {
940    fn describe(&self) -> EventLogDescription {
941        EventLogDescription {
942            backend: EventLogBackendKind::File,
943            location: Some(self.root.clone()),
944            size_bytes: Some(dir_size_bytes(&self.root)),
945            queue_depth: self.queue_depth,
946        }
947    }
948
949    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
950        let _guard = self
951            .write_lock
952            .lock()
953            .expect("file event log write lock poisoned");
954        let next_id = self.latest_id_for_topic(topic)? + 1;
955        let previous_hash = self
956            .read_range_sync(topic, None, usize::MAX)?
957            .last()
958            .map(|(previous_id, previous_event)| {
959                crate::provenance::event_record_hash_from_headers(
960                    topic.as_str(),
961                    *previous_id,
962                    previous_event,
963                )
964            })
965            .transpose()?;
966        let event = crate::provenance::prepare_event_for_append(
967            topic.as_str(),
968            next_id,
969            previous_hash,
970            event,
971        )?;
972        let record = FileRecord {
973            id: next_id,
974            event: event.clone(),
975        };
976        let path = self.topic_path(topic);
977        if let Some(parent) = path.parent() {
978            std::fs::create_dir_all(parent)
979                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
980        }
981        let line = serde_json::to_string(&record)
982            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
983        use std::io::Write as _;
984        let mut file = std::fs::OpenOptions::new()
985            .create(true)
986            .append(true)
987            .open(&path)
988            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
989        writeln!(file, "{line}")
990            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
991        self.latest_ids
992            .lock()
993            .expect("file event log latest ids poisoned")
994            .insert(topic.as_str().to_string(), next_id);
995        self.broadcasts
996            .publish(topic, self.queue_depth, (next_id, event));
997        Ok(next_id)
998    }
999
1000    async fn flush(&self) -> Result<(), LogError> {
1001        sync_tree(&self.root)
1002    }
1003
1004    async fn read_range(
1005        &self,
1006        topic: &Topic,
1007        from: Option<EventId>,
1008        limit: usize,
1009    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1010        self.read_range_sync(topic, from, limit)
1011    }
1012
1013    async fn read_range_bytes(
1014        &self,
1015        topic: &Topic,
1016        from: Option<EventId>,
1017        limit: usize,
1018    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1019        self.read_range_sync(topic, from, limit)?
1020            .into_iter()
1021            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1022            .collect()
1023    }
1024
1025    async fn subscribe(
1026        self: Arc<Self>,
1027        topic: &Topic,
1028        from: Option<EventId>,
1029    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1030        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1031        let history = self.read_range_sync(topic, from, usize::MAX)?;
1032        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1033    }
1034
1035    async fn ack(
1036        &self,
1037        topic: &Topic,
1038        consumer: &ConsumerId,
1039        up_to: EventId,
1040    ) -> Result<(), LogError> {
1041        let path = self.consumer_path(topic, consumer);
1042        let payload = serde_json::json!({
1043            "topic": topic.as_str(),
1044            "consumer_id": consumer.as_str(),
1045            "cursor": up_to,
1046            "updated_at_ms": now_ms(),
1047        });
1048        write_json_atomically(&path, &payload)
1049    }
1050
1051    async fn consumer_cursor(
1052        &self,
1053        topic: &Topic,
1054        consumer: &ConsumerId,
1055    ) -> Result<Option<EventId>, LogError> {
1056        let path = self.consumer_path(topic, consumer);
1057        if !path.is_file() {
1058            return Ok(None);
1059        }
1060        let raw = std::fs::read_to_string(&path)
1061            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1062        let payload: serde_json::Value = serde_json::from_str(&raw)
1063            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1064        let cursor = payload
1065            .get("cursor")
1066            .and_then(serde_json::Value::as_u64)
1067            .ok_or_else(|| {
1068                LogError::Serde("event log consumer record missing numeric cursor".to_string())
1069            })?;
1070        Ok(Some(cursor))
1071    }
1072
1073    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1074        let latest = self.latest_id_for_topic(topic)?;
1075        if latest == 0 {
1076            Ok(None)
1077        } else {
1078            Ok(Some(latest))
1079        }
1080    }
1081
1082    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1083        let _guard = self
1084            .write_lock
1085            .lock()
1086            .expect("file event log write lock poisoned");
1087        let path = self.topic_path(topic);
1088        if !path.is_file() {
1089            return Ok(CompactReport::default());
1090        }
1091        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1092        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1093        if retained.is_empty() {
1094            let _ = std::fs::remove_file(&path);
1095        } else {
1096            crate::atomic_io::atomic_write_with(&path, |writer| {
1097                use std::io::Write as _;
1098                for (event_id, event) in &retained {
1099                    let line = serde_json::to_string(&FileRecord {
1100                        id: *event_id,
1101                        event: event.clone(),
1102                    })
1103                    .map_err(|error| std::io::Error::other(error.to_string()))?;
1104                    writeln!(writer, "{line}")?;
1105                }
1106                Ok(())
1107            })
1108            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1109        }
1110        let latest = retained.last().map(|(event_id, _)| *event_id);
1111        self.latest_ids
1112            .lock()
1113            .expect("file event log latest ids poisoned")
1114            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1115        Ok(CompactReport {
1116            removed,
1117            remaining: retained.len(),
1118            latest,
1119            checkpointed: false,
1120        })
1121    }
1122}
1123
1124pub struct SqliteEventLog {
1125    path: PathBuf,
1126    connection: Mutex<Connection>,
1127    broadcasts: BroadcastMap,
1128    queue_depth: usize,
1129}
1130
1131impl SqliteEventLog {
1132    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1133        if let Some(parent) = path.parent() {
1134            std::fs::create_dir_all(parent)
1135                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1136        }
1137        let connection = Connection::open(&path)
1138            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1139        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1140        // SQLITE_BUSY from a previous test's connection that hasn't finished
1141        // dropping yet (parallel `cargo test` on the same process, distinct
1142        // paths, still contends on SQLite's own global mutex under WAL-mode
1143        // promotion). Without this, `journal_mode = WAL` fails fast with
1144        // "database is locked" instead of retrying.
1145        connection
1146            .busy_timeout(std::time::Duration::from_secs(5))
1147            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1148        connection
1149            .pragma_update(None, "journal_mode", "WAL")
1150            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1151        connection
1152            .pragma_update(None, "synchronous", "NORMAL")
1153            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1154        connection
1155            .execute_batch(
1156                "CREATE TABLE IF NOT EXISTS topic_heads (
1157                    topic TEXT PRIMARY KEY,
1158                    last_id INTEGER NOT NULL
1159                );
1160                CREATE TABLE IF NOT EXISTS events (
1161                    topic TEXT NOT NULL,
1162                    event_id INTEGER NOT NULL,
1163                    kind TEXT NOT NULL,
1164                    payload BLOB NOT NULL,
1165                    headers TEXT NOT NULL,
1166                    occurred_at_ms INTEGER NOT NULL,
1167                    PRIMARY KEY (topic, event_id)
1168                );
1169                CREATE TABLE IF NOT EXISTS consumers (
1170                    topic TEXT NOT NULL,
1171                    consumer_id TEXT NOT NULL,
1172                    cursor INTEGER NOT NULL,
1173                    updated_at_ms INTEGER NOT NULL,
1174                    PRIMARY KEY (topic, consumer_id)
1175                );",
1176            )
1177            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1178        Ok(Self {
1179            path,
1180            connection: Mutex::new(connection),
1181            broadcasts: BroadcastMap::default(),
1182            queue_depth: queue_depth.max(1),
1183        })
1184    }
1185
1186    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1187        let connection = self
1188            .connection
1189            .lock()
1190            .expect("sqlite event log connection poisoned");
1191        let mut statement = connection
1192            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1193            .map_err(|error| {
1194                LogError::Sqlite(format!("event log topics prepare error: {error}"))
1195            })?;
1196        let rows = statement
1197            .query_map([], |row| row.get::<_, String>(0))
1198            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1199        let mut topics = Vec::new();
1200        for row in rows {
1201            topics.push(Topic::new(row.map_err(|error| {
1202                LogError::Sqlite(format!("event log topic row error: {error}"))
1203            })?)?);
1204        }
1205        Ok(topics)
1206    }
1207}
1208
1209impl EventLog for SqliteEventLog {
1210    fn describe(&self) -> EventLogDescription {
1211        EventLogDescription {
1212            backend: EventLogBackendKind::Sqlite,
1213            location: Some(self.path.clone()),
1214            size_bytes: Some(sqlite_size_bytes(&self.path)),
1215            queue_depth: self.queue_depth,
1216        }
1217    }
1218
1219    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1220        let mut connection = self
1221            .connection
1222            .lock()
1223            .expect("sqlite event log connection poisoned");
1224        let tx = connection
1225            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1226            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1227        tx.execute(
1228            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1229            params![topic.as_str()],
1230        )
1231        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1232        tx.execute(
1233            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1234            params![topic.as_str()],
1235        )
1236        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1237        let event_id = tx
1238            .query_row(
1239                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1240                params![topic.as_str()],
1241                |row| row.get::<_, i64>(0),
1242            )
1243            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1244            .and_then(sqlite_i64_to_event_id)?;
1245        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1246        let previous = tx
1247            .query_row(
1248                "SELECT event_id, kind, payload, headers, occurred_at_ms
1249                 FROM events
1250                 WHERE topic = ?1 AND event_id < ?2
1251                 ORDER BY event_id DESC
1252                 LIMIT 1",
1253                params![topic.as_str(), event_id_sql],
1254                |row| {
1255                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1256                    let headers: String = row.get(3)?;
1257                    Ok((
1258                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1259                        LogEvent {
1260                            kind: row.get(1)?,
1261                            payload: serde_json::from_slice(&payload).map_err(|error| {
1262                                rusqlite::Error::FromSqlConversionFailure(
1263                                    payload.len(),
1264                                    rusqlite::types::Type::Blob,
1265                                    Box::new(error),
1266                                )
1267                            })?,
1268                            headers: serde_json::from_str(&headers).map_err(|error| {
1269                                rusqlite::Error::FromSqlConversionFailure(
1270                                    headers.len(),
1271                                    rusqlite::types::Type::Text,
1272                                    Box::new(error),
1273                                )
1274                            })?,
1275                            occurred_at_ms: row.get(4)?,
1276                        },
1277                    ))
1278                },
1279            )
1280            .optional()
1281            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1282        let previous_hash = previous
1283            .as_ref()
1284            .map(|(previous_id, previous_event)| {
1285                crate::provenance::event_record_hash_from_headers(
1286                    topic.as_str(),
1287                    *previous_id,
1288                    previous_event,
1289                )
1290            })
1291            .transpose()?;
1292        let event = crate::provenance::prepare_event_for_append(
1293            topic.as_str(),
1294            event_id,
1295            previous_hash,
1296            event,
1297        )?;
1298        tx.execute(
1299            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1300             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1301            params![
1302                topic.as_str(),
1303                event_id_sql,
1304                event.kind,
1305                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1306                    "event log payload encode error: {error}"
1307                )))?,
1308                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1309                    "event log headers encode error: {error}"
1310                )))?,
1311                event.occurred_at_ms
1312            ],
1313        )
1314        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1315        tx.commit()
1316            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1317        self.broadcasts
1318            .publish(topic, self.queue_depth, (event_id, event.clone()));
1319        Ok(event_id)
1320    }
1321
1322    async fn flush(&self) -> Result<(), LogError> {
1323        let connection = self
1324            .connection
1325            .lock()
1326            .expect("sqlite event log connection poisoned");
1327        connection
1328            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1329            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1330        Ok(())
1331    }
1332
1333    async fn read_range(
1334        &self,
1335        topic: &Topic,
1336        from: Option<EventId>,
1337        limit: usize,
1338    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1339        let connection = self
1340            .connection
1341            .lock()
1342            .expect("sqlite event log connection poisoned");
1343        let mut statement = connection
1344            .prepare(
1345                "SELECT event_id, kind, payload, headers, occurred_at_ms
1346                 FROM events
1347                 WHERE topic = ?1 AND event_id > ?2
1348                 ORDER BY event_id ASC
1349                 LIMIT ?3",
1350            )
1351            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1352        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1353        let rows = statement
1354            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1355                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1356                let headers: String = row.get(3)?;
1357                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1358                Ok((
1359                    event_id,
1360                    LogEvent {
1361                        kind: row.get(1)?,
1362                        payload: serde_json::from_slice(&payload).map_err(|error| {
1363                            rusqlite::Error::FromSqlConversionFailure(
1364                                payload.len(),
1365                                rusqlite::types::Type::Blob,
1366                                Box::new(error),
1367                            )
1368                        })?,
1369                        headers: serde_json::from_str(&headers).map_err(|error| {
1370                            rusqlite::Error::FromSqlConversionFailure(
1371                                headers.len(),
1372                                rusqlite::types::Type::Text,
1373                                Box::new(error),
1374                            )
1375                        })?,
1376                        occurred_at_ms: row.get(4)?,
1377                    },
1378                ))
1379            })
1380            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1381        let mut events = Vec::new();
1382        for row in rows {
1383            events.push(
1384                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1385            );
1386        }
1387        Ok(events)
1388    }
1389
1390    async fn read_range_bytes(
1391        &self,
1392        topic: &Topic,
1393        from: Option<EventId>,
1394        limit: usize,
1395    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1396        let connection = self
1397            .connection
1398            .lock()
1399            .expect("sqlite event log connection poisoned");
1400        let mut statement = connection
1401            .prepare(
1402                "SELECT event_id, kind, payload, headers, occurred_at_ms
1403                 FROM events
1404                 WHERE topic = ?1 AND event_id > ?2
1405                 ORDER BY event_id ASC
1406                 LIMIT ?3",
1407            )
1408            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1409        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1410        let rows = statement
1411            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1412                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1413                let headers: String = row.get(3)?;
1414                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1415                Ok((
1416                    event_id,
1417                    LogEventBytes {
1418                        kind: row.get(1)?,
1419                        payload: Bytes::from(payload),
1420                        headers: serde_json::from_str(&headers).map_err(|error| {
1421                            rusqlite::Error::FromSqlConversionFailure(
1422                                headers.len(),
1423                                rusqlite::types::Type::Text,
1424                                Box::new(error),
1425                            )
1426                        })?,
1427                        occurred_at_ms: row.get(4)?,
1428                    },
1429                ))
1430            })
1431            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1432        let mut events = Vec::new();
1433        for row in rows {
1434            events.push(
1435                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1436            );
1437        }
1438        Ok(events)
1439    }
1440
1441    async fn subscribe(
1442        self: Arc<Self>,
1443        topic: &Topic,
1444        from: Option<EventId>,
1445    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1446        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1447        let history = self.read_range(topic, from, usize::MAX).await?;
1448        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1449    }
1450
1451    async fn ack(
1452        &self,
1453        topic: &Topic,
1454        consumer: &ConsumerId,
1455        up_to: EventId,
1456    ) -> Result<(), LogError> {
1457        let connection = self
1458            .connection
1459            .lock()
1460            .expect("sqlite event log connection poisoned");
1461        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1462        connection
1463            .execute(
1464                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1465                 VALUES (?1, ?2, ?3, ?4)
1466                 ON CONFLICT(topic, consumer_id)
1467                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1468                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1469            )
1470            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1471        Ok(())
1472    }
1473
1474    async fn consumer_cursor(
1475        &self,
1476        topic: &Topic,
1477        consumer: &ConsumerId,
1478    ) -> Result<Option<EventId>, LogError> {
1479        let connection = self
1480            .connection
1481            .lock()
1482            .expect("sqlite event log connection poisoned");
1483        connection
1484            .query_row(
1485                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1486                params![topic.as_str(), consumer.as_str()],
1487                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1488            )
1489            .optional()
1490            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1491    }
1492
1493    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1494        let connection = self
1495            .connection
1496            .lock()
1497            .expect("sqlite event log connection poisoned");
1498        connection
1499            .query_row(
1500                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1501                params![topic.as_str()],
1502                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1503            )
1504            .optional()
1505            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1506    }
1507
1508    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1509        let connection = self
1510            .connection
1511            .lock()
1512            .expect("sqlite event log connection poisoned");
1513        let before_sql = event_id_to_sqlite_i64(before)?;
1514        let removed = connection
1515            .execute(
1516                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1517                params![topic.as_str(), before_sql],
1518            )
1519            .map_err(|error| {
1520                LogError::Sqlite(format!("event log compact delete error: {error}"))
1521            })?;
1522        let remaining = connection
1523            .query_row(
1524                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1525                params![topic.as_str()],
1526                |row| row.get::<_, i64>(0),
1527            )
1528            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1529            .and_then(sqlite_i64_to_usize)?;
1530        let latest = connection
1531            .query_row(
1532                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1533                params![topic.as_str()],
1534                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1535            )
1536            .optional()
1537            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1538        connection
1539            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1540            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1541        Ok(CompactReport {
1542            removed,
1543            remaining,
1544            latest,
1545            checkpointed: true,
1546        })
1547    }
1548}
1549
1550fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1551    let candidate = PathBuf::from(value);
1552    if candidate.is_absolute() {
1553        candidate
1554    } else {
1555        base_dir.join(candidate)
1556    }
1557}
1558
1559fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1560    let encoded = serde_json::to_vec_pretty(payload)
1561        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1562    crate::atomic_io::atomic_write(path, &encoded)
1563        .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1564}
1565
1566fn sanitize_filename(value: &str) -> String {
1567    sanitize_topic_component(value)
1568}
1569
1570pub fn sanitize_topic_component(value: &str) -> String {
1571    value
1572        .chars()
1573        .map(|ch| {
1574            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1575                ch
1576            } else {
1577                '_'
1578            }
1579        })
1580        .collect()
1581}
1582
1583fn dir_size_bytes(path: &Path) -> u64 {
1584    if !path.exists() {
1585        return 0;
1586    }
1587    let mut total = 0;
1588    if let Ok(entries) = std::fs::read_dir(path) {
1589        for entry in entries.flatten() {
1590            let path = entry.path();
1591            if path.is_dir() {
1592                total += dir_size_bytes(&path);
1593            } else if let Ok(metadata) = entry.metadata() {
1594                total += metadata.len();
1595            }
1596        }
1597    }
1598    total
1599}
1600
1601fn sqlite_size_bytes(path: &Path) -> u64 {
1602    let mut total = file_size(path);
1603    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1604    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1605    total
1606}
1607
1608fn file_size(path: &Path) -> u64 {
1609    std::fs::metadata(path)
1610        .map(|metadata| metadata.len())
1611        .unwrap_or(0)
1612}
1613
1614fn sync_tree(root: &Path) -> Result<(), LogError> {
1615    if !root.exists() {
1616        return Ok(());
1617    }
1618    for entry in std::fs::read_dir(root)
1619        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1620    {
1621        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1622        let path = entry.path();
1623        if path.is_dir() {
1624            sync_tree(&path)?;
1625            continue;
1626        }
1627        std::fs::File::open(&path)
1628            .and_then(|file| file.sync_all())
1629            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1630    }
1631    Ok(())
1632}
1633
1634fn now_ms() -> i64 {
1635    std::time::SystemTime::now()
1636        .duration_since(std::time::UNIX_EPOCH)
1637        .map(|duration| duration.as_millis() as i64)
1638        .unwrap_or(0)
1639}
1640
1641fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1642    i64::try_from(event_id)
1643        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1644}
1645
1646fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1647    u64::try_from(value)
1648        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1649}
1650
1651fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1652    u64::try_from(value).map_err(|_| {
1653        rusqlite::Error::FromSqlConversionFailure(
1654            std::mem::size_of::<i64>(),
1655            rusqlite::types::Type::Integer,
1656            "sqlite event id is negative".into(),
1657        )
1658    })
1659}
1660
1661fn sqlite_json_bytes_for_row(
1662    row: &rusqlite::Row<'_>,
1663    index: usize,
1664    name: &str,
1665) -> rusqlite::Result<Vec<u8>> {
1666    let value = row.get_ref(index)?;
1667    match value {
1668        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1669            Ok(bytes.to_vec())
1670        }
1671        other => Err(rusqlite::Error::InvalidColumnType(
1672            index,
1673            name.to_string(),
1674            other.data_type(),
1675        )),
1676    }
1677}
1678
1679fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1680    usize::try_from(value)
1681        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686    use super::*;
1687    use futures::StreamExt;
1688    use rand::{rngs::StdRng, RngExt, SeedableRng};
1689
1690    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1691        let topic = Topic::new("trigger.inbox").unwrap();
1692        for i in 0..10_000 {
1693            log.append(
1694                &topic,
1695                LogEvent::new("append", serde_json::json!({ "i": i })),
1696            )
1697            .await
1698            .unwrap();
1699        }
1700        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1701        assert_eq!(events.len(), 10_000);
1702        assert_eq!(events.first().unwrap().0, 1);
1703        assert_eq!(events.last().unwrap().0, 10_000);
1704    }
1705
1706    #[tokio::test(flavor = "current_thread")]
1707    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1708        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1709        exercise_basic_backend(log.clone()).await;
1710
1711        let topic = Topic::new("agent.transcript.demo").unwrap();
1712        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1713        let first = log
1714            .append(
1715                &topic,
1716                LogEvent::new("message", serde_json::json!({"text":"one"})),
1717            )
1718            .await
1719            .unwrap();
1720        let second = log
1721            .append(
1722                &topic,
1723                LogEvent::new("message", serde_json::json!({"text":"two"})),
1724            )
1725            .await
1726            .unwrap();
1727        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1728        assert_eq!(seen[0].as_ref().unwrap().0, first);
1729        assert_eq!(seen[1].as_ref().unwrap().0, second);
1730
1731        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1732            .await
1733            .unwrap();
1734        let compact = log.compact(&topic, first).await.unwrap();
1735        assert_eq!(compact.removed, 1);
1736        assert_eq!(compact.remaining, 1);
1737    }
1738
1739    #[tokio::test(flavor = "current_thread")]
1740    async fn file_backend_persists_across_reopen_and_compacts() {
1741        let dir = tempfile::tempdir().unwrap();
1742        let topic = Topic::new("trigger.outbox").unwrap();
1743        let first_log = Arc::new(AnyEventLog::File(
1744            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1745        ));
1746        first_log
1747            .append(
1748                &topic,
1749                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1750            )
1751            .await
1752            .unwrap();
1753        first_log
1754            .append(
1755                &topic,
1756                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1757            )
1758            .await
1759            .unwrap();
1760        drop(first_log);
1761
1762        let reopened = Arc::new(AnyEventLog::File(
1763            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1764        ));
1765        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1766        assert_eq!(events.len(), 2);
1767        let compact = reopened.compact(&topic, 1).await.unwrap();
1768        assert_eq!(compact.removed, 1);
1769        assert_eq!(
1770            reopened
1771                .read_range(&topic, None, usize::MAX)
1772                .await
1773                .unwrap()
1774                .len(),
1775            1
1776        );
1777    }
1778
1779    #[tokio::test(flavor = "current_thread")]
1780    async fn file_backend_skips_torn_tail_on_restart() {
1781        let dir = tempfile::tempdir().unwrap();
1782        let topic = Topic::new("trigger.inbox").unwrap();
1783        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1784        first_log
1785            .append(
1786                &topic,
1787                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1788            )
1789            .await
1790            .unwrap();
1791        drop(first_log);
1792
1793        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1794        use std::io::Write as _;
1795        let mut file = std::fs::OpenOptions::new()
1796            .append(true)
1797            .open(&topic_path)
1798            .unwrap();
1799        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1800        drop(file);
1801
1802        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1803        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1804        assert_eq!(events.len(), 1);
1805        assert_eq!(events[0].0, 1);
1806        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1807    }
1808
1809    #[tokio::test(flavor = "current_thread")]
1810    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1811        let dir = tempfile::tempdir().unwrap();
1812        let path = dir.path().join("events.sqlite");
1813        let topic = Topic::new("daemon.demo.state").unwrap();
1814        let first_log = Arc::new(AnyEventLog::Sqlite(
1815            SqliteEventLog::open(path.clone(), 8).unwrap(),
1816        ));
1817        first_log
1818            .append(
1819                &topic,
1820                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1821            )
1822            .await
1823            .unwrap();
1824        first_log
1825            .append(
1826                &topic,
1827                LogEvent::new("state", serde_json::json!({"state":"active"})),
1828            )
1829            .await
1830            .unwrap();
1831        drop(first_log);
1832
1833        let reopened = Arc::new(AnyEventLog::Sqlite(
1834            SqliteEventLog::open(path.clone(), 8).unwrap(),
1835        ));
1836        assert_eq!(
1837            reopened
1838                .read_range(&topic, None, usize::MAX)
1839                .await
1840                .unwrap()
1841                .len(),
1842            2
1843        );
1844        let compact = reopened.compact(&topic, 1).await.unwrap();
1845        assert!(compact.checkpointed);
1846        let wal = PathBuf::from(format!("{}-wal", path.display()));
1847        assert!(file_size(&wal) == 0 || !wal.exists());
1848    }
1849
1850    #[tokio::test(flavor = "current_thread")]
1851    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1852        let dir = tempfile::tempdir().unwrap();
1853        let path = dir.path().join("events.sqlite");
1854        let topic = Topic::new("observability.action_graph").unwrap();
1855        let log = SqliteEventLog::open(path, 8).unwrap();
1856        let event_id = log
1857            .append(
1858                &topic,
1859                LogEvent::new(
1860                    "snapshot",
1861                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1862                ),
1863            )
1864            .await
1865            .unwrap();
1866
1867        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1868        assert_eq!(events.len(), 1);
1869        assert_eq!(events[0].0, event_id);
1870        assert_eq!(
1871            events[0].1.payload_json().unwrap(),
1872            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1873        );
1874    }
1875
1876    #[tokio::test(flavor = "current_thread")]
1877    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1878        let dir = tempfile::tempdir().unwrap();
1879        let path = dir.path().join("events.sqlite");
1880        let topic = Topic::new("agent.transcript.legacy").unwrap();
1881        let log = SqliteEventLog::open(path, 8).unwrap();
1882        {
1883            let connection = log.connection.lock().unwrap();
1884            connection
1885                .execute(
1886                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1887                    params![topic.as_str()],
1888                )
1889                .unwrap();
1890            connection
1891                .execute(
1892                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1893                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1894                    params![topic.as_str(), "{\"text\":\"old\"}"],
1895                )
1896                .unwrap();
1897        }
1898
1899        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1900        assert_eq!(
1901            events[0].1.payload_json().unwrap(),
1902            serde_json::json!({"text": "old"})
1903        );
1904        assert_eq!(
1905            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1906            "legacy"
1907        );
1908    }
1909
1910    #[tokio::test(flavor = "current_thread")]
1911    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1912        let (sender, rx) = broadcast::channel(2);
1913        for i in 0..10 {
1914            sender
1915                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1916                .unwrap();
1917        }
1918        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1919
1920        match stream.next().await {
1921            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1922            other => panic!("subscriber should surface lag, got {other:?}"),
1923        }
1924    }
1925
1926    #[tokio::test(flavor = "current_thread")]
1927    async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1928        let (sender, rx) = broadcast::channel(2);
1929        let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1930        assert_eq!(sender.receiver_count(), 1);
1931        drop(stream);
1932
1933        tokio::time::timeout(std::time::Duration::from_millis(100), async {
1934            while sender.receiver_count() != 0 {
1935                tokio::task::yield_now().await;
1936            }
1937        })
1938        .await
1939        .expect("subscription receiver should close after consumer drop");
1940    }
1941
1942    #[tokio::test(flavor = "current_thread")]
1943    async fn randomized_reader_sequences_stay_monotonic() {
1944        let log = Arc::new(MemoryEventLog::new(32));
1945        let topic = Topic::new("fuzz.demo").unwrap();
1946        let mut readers = vec![
1947            log.clone().subscribe(&topic, None).await.unwrap(),
1948            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1949            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1950        ];
1951        let mut rng = StdRng::seed_from_u64(7);
1952        for _ in 0..64 {
1953            let value = rng.random_range(0..1000);
1954            log.append(
1955                &topic,
1956                LogEvent::new("rand", serde_json::json!({"value": value})),
1957            )
1958            .await
1959            .unwrap();
1960        }
1961
1962        let mut sequences = Vec::new();
1963        for reader in &mut readers {
1964            let mut ids = Vec::new();
1965            while let Some(item) = reader.next().await {
1966                match item {
1967                    Ok((event_id, _)) => {
1968                        ids.push(event_id);
1969                        if ids.len() >= 16 {
1970                            break;
1971                        }
1972                    }
1973                    Err(LogError::ConsumerLagged(_)) => break,
1974                    Err(error) => panic!("unexpected subscription error: {error}"),
1975                }
1976            }
1977            sequences.push(ids);
1978        }
1979
1980        for ids in sequences {
1981            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1982        }
1983    }
1984}