Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20pub(crate) use util::pin_test_occurred_at_ms;
21
22#[cfg(test)]
23mod tests;
24
25pub use file::FileEventLog;
26pub use memory::MemoryEventLog;
27pub use sqlite::SqliteEventLog;
28
29pub type EventId = u64;
30
31pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
32pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
33pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
34pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
35
36#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
37pub struct Topic(String);
38
39impl Topic {
40    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
41        let value = value.into();
42        if value.is_empty() {
43            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
44        }
45        if !value
46            .chars()
47            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
48        {
49            return Err(LogError::InvalidTopic(format!(
50                "topic '{value}' contains unsupported characters"
51            )));
52        }
53        Ok(Self(value))
54    }
55
56    pub fn as_str(&self) -> &str {
57        &self.0
58    }
59}
60
61impl fmt::Display for Topic {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        self.0.fmt(f)
64    }
65}
66
67impl FromStr for Topic {
68    type Err = LogError;
69
70    fn from_str(s: &str) -> Result<Self, Self::Err> {
71        Self::new(s)
72    }
73}
74
75#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
76pub struct ConsumerId(String);
77
78impl ConsumerId {
79    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
80        let value = value.into();
81        if value.trim().is_empty() {
82            return Err(LogError::InvalidConsumer(
83                "consumer id cannot be empty".to_string(),
84            ));
85        }
86        Ok(Self(value))
87    }
88
89    pub fn as_str(&self) -> &str {
90        &self.0
91    }
92}
93
94impl fmt::Display for ConsumerId {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        self.0.fmt(f)
97    }
98}
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub enum EventLogBackendKind {
103    Memory,
104    File,
105    Sqlite,
106    Postgres,
107}
108
109impl fmt::Display for EventLogBackendKind {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            Self::Memory => write!(f, "memory"),
113            Self::File => write!(f, "file"),
114            Self::Sqlite => write!(f, "sqlite"),
115            Self::Postgres => write!(f, "postgres"),
116        }
117    }
118}
119
120impl FromStr for EventLogBackendKind {
121    type Err = LogError;
122
123    fn from_str(value: &str) -> Result<Self, Self::Err> {
124        match value.trim().to_ascii_lowercase().as_str() {
125            "memory" => Ok(Self::Memory),
126            "file" => Ok(Self::File),
127            "sqlite" => Ok(Self::Sqlite),
128            "postgres" => Ok(Self::Postgres),
129            other => Err(LogError::Config(format!(
130                "unsupported event log backend '{other}'"
131            ))),
132        }
133    }
134}
135
136#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137pub struct LogEvent {
138    pub kind: String,
139    pub payload: serde_json::Value,
140    #[serde(default)]
141    pub headers: BTreeMap<String, String>,
142    pub occurred_at_ms: i64,
143}
144
145impl LogEvent {
146    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
147        Self {
148            kind: kind.into(),
149            payload,
150            headers: BTreeMap::new(),
151            occurred_at_ms: util::now_ms(),
152        }
153    }
154
155    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
156        self.headers = headers;
157        self
158    }
159
160    /// Apply the unified redaction policy to this event's headers and
161    /// payload. Backends are intentionally left unaware of redaction —
162    /// emitters that need scrubbed events call this before append, and
163    /// readers that materialize events for display can apply the policy
164    /// again as defense in depth.
165    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
166        self.headers = policy.redact_headers(&self.headers);
167        policy.redact_json_in_place(&mut self.payload);
168    }
169}
170
171/// Serialized event payload form for large read paths.
172///
173/// `payload` contains the original JSON bytes for backends that can expose
174/// them directly. Callers that only need to forward or hash the payload can
175/// avoid materializing a `serde_json::Value`; callers that need structured
176/// access can opt in with `payload_json`.
177#[derive(Clone, Debug, PartialEq, Eq)]
178pub struct LogEventBytes {
179    pub kind: String,
180    pub payload: Bytes,
181    pub headers: BTreeMap<String, String>,
182    pub occurred_at_ms: i64,
183}
184
185impl LogEventBytes {
186    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
187        serde_json::from_slice(&self.payload)
188            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
189    }
190
191    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
192        Ok(LogEvent {
193            kind: self.kind,
194            payload: serde_json::from_slice(&self.payload).map_err(|error| {
195                LogError::Serde(format!("event log payload parse error: {error}"))
196            })?,
197            headers: self.headers,
198            occurred_at_ms: self.occurred_at_ms,
199        })
200    }
201}
202
203impl TryFrom<LogEvent> for LogEventBytes {
204    type Error = LogError;
205
206    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
207        let payload = serde_json::to_vec(&event.payload)
208            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
209        Ok(Self {
210            kind: event.kind,
211            payload: Bytes::from(payload),
212            headers: event.headers,
213            occurred_at_ms: event.occurred_at_ms,
214        })
215    }
216}
217
218#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
219pub struct CompactReport {
220    pub removed: usize,
221    pub remaining: usize,
222    pub latest: Option<EventId>,
223    pub checkpointed: bool,
224}
225
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub struct AppendOutcome {
228    pub event_id: EventId,
229    pub event: LogEvent,
230    pub inserted: bool,
231}
232
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct EventLogDescription {
235    pub backend: EventLogBackendKind,
236    pub location: Option<PathBuf>,
237    pub size_bytes: Option<u64>,
238    pub queue_depth: usize,
239}
240
241#[derive(Debug)]
242pub enum LogError {
243    Config(String),
244    InvalidTopic(String),
245    InvalidConsumer(String),
246    Io(String),
247    Serde(String),
248    Sqlite(String),
249    ConsumerLagged(EventId),
250}
251
252impl fmt::Display for LogError {
253    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254        match self {
255            Self::Config(message)
256            | Self::InvalidTopic(message)
257            | Self::InvalidConsumer(message)
258            | Self::Io(message)
259            | Self::Serde(message)
260            | Self::Sqlite(message) => message.fmt(f),
261            Self::ConsumerLagged(last_id) => {
262                write!(f, "subscriber lagged behind after event {last_id}")
263            }
264        }
265    }
266}
267
268impl std::error::Error for LogError {}
269
270#[allow(async_fn_in_trait)]
271pub trait EventLog: Send + Sync {
272    fn describe(&self) -> EventLogDescription;
273
274    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
275
276    async fn flush(&self) -> Result<(), LogError>;
277
278    /// Read events strictly after `from`. `None` starts from the
279    /// beginning of the topic.
280    async fn read_range(
281        &self,
282        topic: &Topic,
283        from: Option<EventId>,
284        limit: usize,
285    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
286
287    async fn read_range_bytes(
288        &self,
289        topic: &Topic,
290        from: Option<EventId>,
291        limit: usize,
292    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
293        let events = self.read_range(topic, from, limit).await?;
294        events
295            .into_iter()
296            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
297            .collect()
298    }
299
300    /// `async fn` keeps the ergonomic generic surface; the boxed stream
301    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
302    async fn subscribe(
303        self: Arc<Self>,
304        topic: &Topic,
305        from: Option<EventId>,
306    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
307
308    async fn ack(
309        &self,
310        topic: &Topic,
311        consumer: &ConsumerId,
312        up_to: EventId,
313    ) -> Result<(), LogError>;
314
315    async fn consumer_cursor(
316        &self,
317        topic: &Topic,
318        consumer: &ConsumerId,
319    ) -> Result<Option<EventId>, LogError>;
320
321    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
322
323    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
324}
325
326#[derive(Clone, Debug)]
327pub struct EventLogConfig {
328    pub backend: EventLogBackendKind,
329    pub file_dir: PathBuf,
330    pub sqlite_path: PathBuf,
331    pub queue_depth: usize,
332}
333
334impl EventLogConfig {
335    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
336        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
337            .ok()
338            .map(|value| value.parse())
339            .transpose()?
340            .unwrap_or(EventLogBackendKind::Sqlite);
341        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
342            .ok()
343            .and_then(|value| value.parse::<usize>().ok())
344            .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
345            .max(1);
346
347        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
348            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
349            _ => crate::runtime_paths::event_log_dir(base_dir),
350        };
351        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
352            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
353            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
354        };
355
356        Ok(Self {
357            backend,
358            file_dir,
359            sqlite_path,
360            queue_depth,
361        })
362    }
363
364    pub fn location(&self) -> Option<PathBuf> {
365        match self.backend {
366            EventLogBackendKind::Memory => None,
367            EventLogBackendKind::File => Some(self.file_dir.clone()),
368            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
369            EventLogBackendKind::Postgres => None,
370        }
371    }
372}
373
374thread_local! {
375    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
376    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
377}
378
379pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
380    let config = EventLogConfig::for_base_dir(base_dir)?;
381    let log = open_event_log(&config)?;
382    ACTIVE_EVENT_LOG.with(|slot| {
383        *slot.borrow_mut() = Some(log.clone());
384    });
385    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
386        *slot.borrow_mut() = None;
387    });
388    Ok(log)
389}
390
391pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
392    let config = EventLogConfig::for_base_dir(base_dir)?;
393    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
394    if !has_active {
395        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
396            *slot.borrow_mut() = Some(config);
397        });
398    }
399    Ok(())
400}
401
402pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
403    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
404    ACTIVE_EVENT_LOG.with(|slot| {
405        *slot.borrow_mut() = Some(log.clone());
406    });
407    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
408        *slot.borrow_mut() = None;
409    });
410    log
411}
412
413pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
414    ACTIVE_EVENT_LOG.with(|slot| {
415        *slot.borrow_mut() = Some(log.clone());
416    });
417    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
418        *slot.borrow_mut() = None;
419    });
420    log
421}
422
423pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
424    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
425        return Some(log);
426    }
427
428    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
429    match open_event_log(&config) {
430        Ok(log) => Some(install_active_event_log(log)),
431        Err(error) => {
432            crate::events::log_warn("event_log.init", &error.to_string());
433            None
434        }
435    }
436}
437
438pub fn reset_active_event_log() {
439    ACTIVE_EVENT_LOG.with(|slot| {
440        *slot.borrow_mut() = None;
441    });
442    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
443        *slot.borrow_mut() = None;
444    });
445}
446
447pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
448    let config = EventLogConfig::for_base_dir(base_dir)?;
449    let description = match config.backend {
450        EventLogBackendKind::Memory => EventLogDescription {
451            backend: EventLogBackendKind::Memory,
452            location: None,
453            size_bytes: None,
454            queue_depth: config.queue_depth,
455        },
456        EventLogBackendKind::File => EventLogDescription {
457            backend: EventLogBackendKind::File,
458            size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
459            location: Some(config.file_dir),
460            queue_depth: config.queue_depth,
461        },
462        EventLogBackendKind::Sqlite => EventLogDescription {
463            backend: EventLogBackendKind::Sqlite,
464            size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
465            location: Some(config.sqlite_path),
466            queue_depth: config.queue_depth,
467        },
468        EventLogBackendKind::Postgres => EventLogDescription {
469            backend: EventLogBackendKind::Postgres,
470            location: None,
471            size_bytes: None,
472            queue_depth: config.queue_depth,
473        },
474    };
475    Ok(description)
476}
477
478pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
479    match config.backend {
480        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
481            config.queue_depth,
482        )))),
483        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
484            config.file_dir.clone(),
485            config.queue_depth,
486        )?))),
487        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
488            config.sqlite_path.clone(),
489            config.queue_depth,
490        )?))),
491        EventLogBackendKind::Postgres => Err(LogError::Config(
492            "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
493                .to_string(),
494        )),
495    }
496}
497
498pub enum AnyEventLog {
499    Memory(MemoryEventLog),
500    File(FileEventLog),
501    Sqlite(SqliteEventLog),
502}
503
504impl AnyEventLog {
505    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
506        match self {
507            Self::Memory(log) => log.topics().await,
508            Self::File(log) => log.topics(),
509            Self::Sqlite(log) => log.topics(),
510        }
511    }
512
513    pub async fn append_idempotent_by_header(
514        &self,
515        topic: &Topic,
516        header: &str,
517        value: &str,
518        event: LogEvent,
519    ) -> Result<AppendOutcome, LogError> {
520        if header.trim().is_empty() {
521            return Err(LogError::Config(
522                "idempotent append header cannot be empty".to_string(),
523            ));
524        }
525        match self {
526            Self::Memory(log) => {
527                log.append_idempotent_by_header(topic, header, value, event)
528                    .await
529            }
530            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
531            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
532        }
533    }
534
535    /// Read the event previously appended under `(header, value)`, the read
536    /// counterpart of [`Self::append_idempotent_by_header`]. On the SQLite
537    /// backend this is an indexed JOIN; the memory/file dev backends scan.
538    pub async fn read_idempotent_by_header(
539        &self,
540        topic: &Topic,
541        header: &str,
542        value: &str,
543    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
544        if header.trim().is_empty() {
545            return Err(LogError::Config(
546                "idempotent read header cannot be empty".to_string(),
547            ));
548        }
549        match self {
550            Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
551            Self::File(log) => log.read_idempotent_by_header(topic, header, value),
552            Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
553        }
554    }
555}
556
557impl EventLog for AnyEventLog {
558    fn describe(&self) -> EventLogDescription {
559        match self {
560            Self::Memory(log) => log.describe(),
561            Self::File(log) => log.describe(),
562            Self::Sqlite(log) => log.describe(),
563        }
564    }
565
566    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
567        match self {
568            Self::Memory(log) => log.append(topic, event).await,
569            Self::File(log) => log.append(topic, event).await,
570            Self::Sqlite(log) => log.append(topic, event).await,
571        }
572    }
573
574    async fn flush(&self) -> Result<(), LogError> {
575        match self {
576            Self::Memory(log) => log.flush().await,
577            Self::File(log) => log.flush().await,
578            Self::Sqlite(log) => log.flush().await,
579        }
580    }
581
582    async fn read_range(
583        &self,
584        topic: &Topic,
585        from: Option<EventId>,
586        limit: usize,
587    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
588        match self {
589            Self::Memory(log) => log.read_range(topic, from, limit).await,
590            Self::File(log) => log.read_range(topic, from, limit).await,
591            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
592        }
593    }
594
595    async fn read_range_bytes(
596        &self,
597        topic: &Topic,
598        from: Option<EventId>,
599        limit: usize,
600    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
601        match self {
602            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
603            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
604            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
605        }
606    }
607
608    async fn subscribe(
609        self: Arc<Self>,
610        topic: &Topic,
611        from: Option<EventId>,
612    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
613        let (rx, queue_depth) = match self.as_ref() {
614            Self::Memory(log) => (
615                log.broadcasts.subscribe(topic, log.queue_depth),
616                log.queue_depth,
617            ),
618            Self::File(log) => (
619                log.broadcasts.subscribe(topic, log.queue_depth),
620                log.queue_depth,
621            ),
622            Self::Sqlite(log) => (
623                log.broadcasts.subscribe(topic, log.queue_depth),
624                log.queue_depth,
625            ),
626        };
627        let history = self.read_range(topic, from, usize::MAX).await?;
628        Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
629    }
630
631    async fn ack(
632        &self,
633        topic: &Topic,
634        consumer: &ConsumerId,
635        up_to: EventId,
636    ) -> Result<(), LogError> {
637        match self {
638            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
639            Self::File(log) => log.ack(topic, consumer, up_to).await,
640            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
641        }
642    }
643
644    async fn consumer_cursor(
645        &self,
646        topic: &Topic,
647        consumer: &ConsumerId,
648    ) -> Result<Option<EventId>, LogError> {
649        match self {
650            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
651            Self::File(log) => log.consumer_cursor(topic, consumer).await,
652            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
653        }
654    }
655
656    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
657        match self {
658            Self::Memory(log) => log.latest(topic).await,
659            Self::File(log) => log.latest(topic).await,
660            Self::Sqlite(log) => log.latest(topic).await,
661        }
662    }
663
664    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
665        match self {
666            Self::Memory(log) => log.compact(topic, before).await,
667            Self::File(log) => log.compact(topic, before).await,
668            Self::Sqlite(log) => log.compact(topic, before).await,
669        }
670    }
671}
672
673pub fn sanitize_topic_component(value: &str) -> String {
674    value
675        .chars()
676        .map(|ch| {
677            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
678                ch
679            } else {
680                '_'
681            }
682        })
683        .collect()
684}