Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20mod tests;
21
22pub use file::FileEventLog;
23pub use memory::MemoryEventLog;
24pub use sqlite::SqliteEventLog;
25
26pub type EventId = u64;
27
28pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
29pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
30pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
31pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
32
33#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
34pub struct Topic(String);
35
36impl Topic {
37    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
38        let value = value.into();
39        if value.is_empty() {
40            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
41        }
42        if !value
43            .chars()
44            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
45        {
46            return Err(LogError::InvalidTopic(format!(
47                "topic '{value}' contains unsupported characters"
48            )));
49        }
50        Ok(Self(value))
51    }
52
53    pub fn as_str(&self) -> &str {
54        &self.0
55    }
56}
57
58impl fmt::Display for Topic {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        self.0.fmt(f)
61    }
62}
63
64impl FromStr for Topic {
65    type Err = LogError;
66
67    fn from_str(s: &str) -> Result<Self, Self::Err> {
68        Self::new(s)
69    }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub struct ConsumerId(String);
74
75impl ConsumerId {
76    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
77        let value = value.into();
78        if value.trim().is_empty() {
79            return Err(LogError::InvalidConsumer(
80                "consumer id cannot be empty".to_string(),
81            ));
82        }
83        Ok(Self(value))
84    }
85
86    pub fn as_str(&self) -> &str {
87        &self.0
88    }
89}
90
91impl fmt::Display for ConsumerId {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        self.0.fmt(f)
94    }
95}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum EventLogBackendKind {
100    Memory,
101    File,
102    Sqlite,
103    Postgres,
104}
105
106impl fmt::Display for EventLogBackendKind {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        match self {
109            Self::Memory => write!(f, "memory"),
110            Self::File => write!(f, "file"),
111            Self::Sqlite => write!(f, "sqlite"),
112            Self::Postgres => write!(f, "postgres"),
113        }
114    }
115}
116
117impl FromStr for EventLogBackendKind {
118    type Err = LogError;
119
120    fn from_str(value: &str) -> Result<Self, Self::Err> {
121        match value.trim().to_ascii_lowercase().as_str() {
122            "memory" => Ok(Self::Memory),
123            "file" => Ok(Self::File),
124            "sqlite" => Ok(Self::Sqlite),
125            "postgres" => Ok(Self::Postgres),
126            other => Err(LogError::Config(format!(
127                "unsupported event log backend '{other}'"
128            ))),
129        }
130    }
131}
132
133#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
134pub struct LogEvent {
135    pub kind: String,
136    pub payload: serde_json::Value,
137    #[serde(default)]
138    pub headers: BTreeMap<String, String>,
139    pub occurred_at_ms: i64,
140}
141
142impl LogEvent {
143    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
144        Self {
145            kind: kind.into(),
146            payload,
147            headers: BTreeMap::new(),
148            occurred_at_ms: util::now_ms(),
149        }
150    }
151
152    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
153        self.headers = headers;
154        self
155    }
156
157    /// Apply the unified redaction policy to this event's headers and
158    /// payload. Backends are intentionally left unaware of redaction —
159    /// emitters that need scrubbed events call this before append, and
160    /// readers that materialize events for display can apply the policy
161    /// again as defense in depth.
162    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
163        self.headers = policy.redact_headers(&self.headers);
164        policy.redact_json_in_place(&mut self.payload);
165    }
166}
167
168/// Serialized event payload form for large read paths.
169///
170/// `payload` contains the original JSON bytes for backends that can expose
171/// them directly. Callers that only need to forward or hash the payload can
172/// avoid materializing a `serde_json::Value`; callers that need structured
173/// access can opt in with `payload_json`.
174#[derive(Clone, Debug, PartialEq, Eq)]
175pub struct LogEventBytes {
176    pub kind: String,
177    pub payload: Bytes,
178    pub headers: BTreeMap<String, String>,
179    pub occurred_at_ms: i64,
180}
181
182impl LogEventBytes {
183    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
184        serde_json::from_slice(&self.payload)
185            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
186    }
187
188    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
189        Ok(LogEvent {
190            kind: self.kind,
191            payload: serde_json::from_slice(&self.payload).map_err(|error| {
192                LogError::Serde(format!("event log payload parse error: {error}"))
193            })?,
194            headers: self.headers,
195            occurred_at_ms: self.occurred_at_ms,
196        })
197    }
198}
199
200impl TryFrom<LogEvent> for LogEventBytes {
201    type Error = LogError;
202
203    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
204        let payload = serde_json::to_vec(&event.payload)
205            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
206        Ok(Self {
207            kind: event.kind,
208            payload: Bytes::from(payload),
209            headers: event.headers,
210            occurred_at_ms: event.occurred_at_ms,
211        })
212    }
213}
214
215#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
216pub struct CompactReport {
217    pub removed: usize,
218    pub remaining: usize,
219    pub latest: Option<EventId>,
220    pub checkpointed: bool,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
224pub struct AppendOutcome {
225    pub event_id: EventId,
226    pub event: LogEvent,
227    pub inserted: bool,
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct EventLogDescription {
232    pub backend: EventLogBackendKind,
233    pub location: Option<PathBuf>,
234    pub size_bytes: Option<u64>,
235    pub queue_depth: usize,
236}
237
238#[derive(Debug)]
239pub enum LogError {
240    Config(String),
241    InvalidTopic(String),
242    InvalidConsumer(String),
243    Io(String),
244    Serde(String),
245    Sqlite(String),
246    ConsumerLagged(EventId),
247}
248
249impl fmt::Display for LogError {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        match self {
252            Self::Config(message)
253            | Self::InvalidTopic(message)
254            | Self::InvalidConsumer(message)
255            | Self::Io(message)
256            | Self::Serde(message)
257            | Self::Sqlite(message) => message.fmt(f),
258            Self::ConsumerLagged(last_id) => {
259                write!(f, "subscriber lagged behind after event {last_id}")
260            }
261        }
262    }
263}
264
265impl std::error::Error for LogError {}
266
267#[allow(async_fn_in_trait)]
268pub trait EventLog: Send + Sync {
269    fn describe(&self) -> EventLogDescription;
270
271    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
272
273    async fn flush(&self) -> Result<(), LogError>;
274
275    /// Read events strictly after `from`. `None` starts from the
276    /// beginning of the topic.
277    async fn read_range(
278        &self,
279        topic: &Topic,
280        from: Option<EventId>,
281        limit: usize,
282    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
283
284    async fn read_range_bytes(
285        &self,
286        topic: &Topic,
287        from: Option<EventId>,
288        limit: usize,
289    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
290        let events = self.read_range(topic, from, limit).await?;
291        events
292            .into_iter()
293            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
294            .collect()
295    }
296
297    /// `async fn` keeps the ergonomic generic surface; the boxed stream
298    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
299    async fn subscribe(
300        self: Arc<Self>,
301        topic: &Topic,
302        from: Option<EventId>,
303    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
304
305    async fn ack(
306        &self,
307        topic: &Topic,
308        consumer: &ConsumerId,
309        up_to: EventId,
310    ) -> Result<(), LogError>;
311
312    async fn consumer_cursor(
313        &self,
314        topic: &Topic,
315        consumer: &ConsumerId,
316    ) -> Result<Option<EventId>, LogError>;
317
318    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
319
320    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
321}
322
323#[derive(Clone, Debug)]
324pub struct EventLogConfig {
325    pub backend: EventLogBackendKind,
326    pub file_dir: PathBuf,
327    pub sqlite_path: PathBuf,
328    pub queue_depth: usize,
329}
330
331impl EventLogConfig {
332    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
333        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
334            .ok()
335            .map(|value| value.parse())
336            .transpose()?
337            .unwrap_or(EventLogBackendKind::Sqlite);
338        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
339            .ok()
340            .and_then(|value| value.parse::<usize>().ok())
341            .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
342            .max(1);
343
344        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
345            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
346            _ => crate::runtime_paths::event_log_dir(base_dir),
347        };
348        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
349            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
350            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
351        };
352
353        Ok(Self {
354            backend,
355            file_dir,
356            sqlite_path,
357            queue_depth,
358        })
359    }
360
361    pub fn location(&self) -> Option<PathBuf> {
362        match self.backend {
363            EventLogBackendKind::Memory => None,
364            EventLogBackendKind::File => Some(self.file_dir.clone()),
365            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
366            EventLogBackendKind::Postgres => None,
367        }
368    }
369}
370
371thread_local! {
372    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
373    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
374}
375
376pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
377    let config = EventLogConfig::for_base_dir(base_dir)?;
378    let log = open_event_log(&config)?;
379    ACTIVE_EVENT_LOG.with(|slot| {
380        *slot.borrow_mut() = Some(log.clone());
381    });
382    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
383        *slot.borrow_mut() = None;
384    });
385    Ok(log)
386}
387
388pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
389    let config = EventLogConfig::for_base_dir(base_dir)?;
390    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
391    if !has_active {
392        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
393            *slot.borrow_mut() = Some(config);
394        });
395    }
396    Ok(())
397}
398
399pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
400    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
401    ACTIVE_EVENT_LOG.with(|slot| {
402        *slot.borrow_mut() = Some(log.clone());
403    });
404    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
405        *slot.borrow_mut() = None;
406    });
407    log
408}
409
410pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
411    ACTIVE_EVENT_LOG.with(|slot| {
412        *slot.borrow_mut() = Some(log.clone());
413    });
414    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
415        *slot.borrow_mut() = None;
416    });
417    log
418}
419
420pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
421    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
422        return Some(log);
423    }
424
425    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
426    match open_event_log(&config) {
427        Ok(log) => Some(install_active_event_log(log)),
428        Err(error) => {
429            crate::events::log_warn("event_log.init", &error.to_string());
430            None
431        }
432    }
433}
434
435pub fn reset_active_event_log() {
436    ACTIVE_EVENT_LOG.with(|slot| {
437        *slot.borrow_mut() = None;
438    });
439    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
440        *slot.borrow_mut() = None;
441    });
442}
443
444pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
445    let config = EventLogConfig::for_base_dir(base_dir)?;
446    let description = match config.backend {
447        EventLogBackendKind::Memory => EventLogDescription {
448            backend: EventLogBackendKind::Memory,
449            location: None,
450            size_bytes: None,
451            queue_depth: config.queue_depth,
452        },
453        EventLogBackendKind::File => EventLogDescription {
454            backend: EventLogBackendKind::File,
455            size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
456            location: Some(config.file_dir),
457            queue_depth: config.queue_depth,
458        },
459        EventLogBackendKind::Sqlite => EventLogDescription {
460            backend: EventLogBackendKind::Sqlite,
461            size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
462            location: Some(config.sqlite_path),
463            queue_depth: config.queue_depth,
464        },
465        EventLogBackendKind::Postgres => EventLogDescription {
466            backend: EventLogBackendKind::Postgres,
467            location: None,
468            size_bytes: None,
469            queue_depth: config.queue_depth,
470        },
471    };
472    Ok(description)
473}
474
475pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
476    match config.backend {
477        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
478            config.queue_depth,
479        )))),
480        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
481            config.file_dir.clone(),
482            config.queue_depth,
483        )?))),
484        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
485            config.sqlite_path.clone(),
486            config.queue_depth,
487        )?))),
488        EventLogBackendKind::Postgres => Err(LogError::Config(
489            "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
490                .to_string(),
491        )),
492    }
493}
494
495pub enum AnyEventLog {
496    Memory(MemoryEventLog),
497    File(FileEventLog),
498    Sqlite(SqliteEventLog),
499}
500
501impl AnyEventLog {
502    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
503        match self {
504            Self::Memory(log) => log.topics().await,
505            Self::File(log) => log.topics(),
506            Self::Sqlite(log) => log.topics(),
507        }
508    }
509
510    pub async fn append_idempotent_by_header(
511        &self,
512        topic: &Topic,
513        header: &str,
514        value: &str,
515        event: LogEvent,
516    ) -> Result<AppendOutcome, LogError> {
517        if header.trim().is_empty() {
518            return Err(LogError::Config(
519                "idempotent append header cannot be empty".to_string(),
520            ));
521        }
522        match self {
523            Self::Memory(log) => {
524                log.append_idempotent_by_header(topic, header, value, event)
525                    .await
526            }
527            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
528            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
529        }
530    }
531
532    /// Read the event previously appended under `(header, value)`, the read
533    /// counterpart of [`Self::append_idempotent_by_header`]. On the SQLite
534    /// backend this is an indexed JOIN; the memory/file dev backends scan.
535    pub async fn read_idempotent_by_header(
536        &self,
537        topic: &Topic,
538        header: &str,
539        value: &str,
540    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
541        if header.trim().is_empty() {
542            return Err(LogError::Config(
543                "idempotent read header cannot be empty".to_string(),
544            ));
545        }
546        match self {
547            Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
548            Self::File(log) => log.read_idempotent_by_header(topic, header, value),
549            Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
550        }
551    }
552}
553
554impl EventLog for AnyEventLog {
555    fn describe(&self) -> EventLogDescription {
556        match self {
557            Self::Memory(log) => log.describe(),
558            Self::File(log) => log.describe(),
559            Self::Sqlite(log) => log.describe(),
560        }
561    }
562
563    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
564        match self {
565            Self::Memory(log) => log.append(topic, event).await,
566            Self::File(log) => log.append(topic, event).await,
567            Self::Sqlite(log) => log.append(topic, event).await,
568        }
569    }
570
571    async fn flush(&self) -> Result<(), LogError> {
572        match self {
573            Self::Memory(log) => log.flush().await,
574            Self::File(log) => log.flush().await,
575            Self::Sqlite(log) => log.flush().await,
576        }
577    }
578
579    async fn read_range(
580        &self,
581        topic: &Topic,
582        from: Option<EventId>,
583        limit: usize,
584    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
585        match self {
586            Self::Memory(log) => log.read_range(topic, from, limit).await,
587            Self::File(log) => log.read_range(topic, from, limit).await,
588            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
589        }
590    }
591
592    async fn read_range_bytes(
593        &self,
594        topic: &Topic,
595        from: Option<EventId>,
596        limit: usize,
597    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
598        match self {
599            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
600            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
601            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
602        }
603    }
604
605    async fn subscribe(
606        self: Arc<Self>,
607        topic: &Topic,
608        from: Option<EventId>,
609    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
610        let (rx, queue_depth) = match self.as_ref() {
611            Self::Memory(log) => (
612                log.broadcasts.subscribe(topic, log.queue_depth),
613                log.queue_depth,
614            ),
615            Self::File(log) => (
616                log.broadcasts.subscribe(topic, log.queue_depth),
617                log.queue_depth,
618            ),
619            Self::Sqlite(log) => (
620                log.broadcasts.subscribe(topic, log.queue_depth),
621                log.queue_depth,
622            ),
623        };
624        let history = self.read_range(topic, from, usize::MAX).await?;
625        Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
626    }
627
628    async fn ack(
629        &self,
630        topic: &Topic,
631        consumer: &ConsumerId,
632        up_to: EventId,
633    ) -> Result<(), LogError> {
634        match self {
635            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
636            Self::File(log) => log.ack(topic, consumer, up_to).await,
637            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
638        }
639    }
640
641    async fn consumer_cursor(
642        &self,
643        topic: &Topic,
644        consumer: &ConsumerId,
645    ) -> Result<Option<EventId>, LogError> {
646        match self {
647            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
648            Self::File(log) => log.consumer_cursor(topic, consumer).await,
649            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
650        }
651    }
652
653    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
654        match self {
655            Self::Memory(log) => log.latest(topic).await,
656            Self::File(log) => log.latest(topic).await,
657            Self::Sqlite(log) => log.latest(topic).await,
658        }
659    }
660
661    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
662        match self {
663            Self::Memory(log) => log.compact(topic, before).await,
664            Self::File(log) => log.compact(topic, before).await,
665            Self::Sqlite(log) => log.compact(topic, before).await,
666        }
667    }
668}
669
670pub fn sanitize_topic_component(value: &str) -> String {
671    value
672        .chars()
673        .map(|ch| {
674            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
675                ch
676            } else {
677                '_'
678            }
679        })
680        .collect()
681}