Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20mod tests;
21
22pub use file::FileEventLog;
23pub use memory::MemoryEventLog;
24pub use sqlite::SqliteEventLog;
25
26pub type EventId = u64;
27
28pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
29pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
30pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
31pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
32
33#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
34pub struct Topic(String);
35
36impl Topic {
37    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
38        let value = value.into();
39        if value.is_empty() {
40            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
41        }
42        if !value
43            .chars()
44            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
45        {
46            return Err(LogError::InvalidTopic(format!(
47                "topic '{value}' contains unsupported characters"
48            )));
49        }
50        Ok(Self(value))
51    }
52
53    pub fn as_str(&self) -> &str {
54        &self.0
55    }
56}
57
58impl fmt::Display for Topic {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        self.0.fmt(f)
61    }
62}
63
64impl FromStr for Topic {
65    type Err = LogError;
66
67    fn from_str(s: &str) -> Result<Self, Self::Err> {
68        Self::new(s)
69    }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub struct ConsumerId(String);
74
75impl ConsumerId {
76    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
77        let value = value.into();
78        if value.trim().is_empty() {
79            return Err(LogError::InvalidConsumer(
80                "consumer id cannot be empty".to_string(),
81            ));
82        }
83        Ok(Self(value))
84    }
85
86    pub fn as_str(&self) -> &str {
87        &self.0
88    }
89}
90
91impl fmt::Display for ConsumerId {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        self.0.fmt(f)
94    }
95}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum EventLogBackendKind {
100    Memory,
101    File,
102    Sqlite,
103}
104
105impl fmt::Display for EventLogBackendKind {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        match self {
108            Self::Memory => write!(f, "memory"),
109            Self::File => write!(f, "file"),
110            Self::Sqlite => write!(f, "sqlite"),
111        }
112    }
113}
114
115impl FromStr for EventLogBackendKind {
116    type Err = LogError;
117
118    fn from_str(value: &str) -> Result<Self, Self::Err> {
119        match value.trim().to_ascii_lowercase().as_str() {
120            "memory" => Ok(Self::Memory),
121            "file" => Ok(Self::File),
122            "sqlite" => Ok(Self::Sqlite),
123            other => Err(LogError::Config(format!(
124                "unsupported event log backend '{other}'"
125            ))),
126        }
127    }
128}
129
130#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
131pub struct LogEvent {
132    pub kind: String,
133    pub payload: serde_json::Value,
134    #[serde(default)]
135    pub headers: BTreeMap<String, String>,
136    pub occurred_at_ms: i64,
137}
138
139impl LogEvent {
140    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
141        Self {
142            kind: kind.into(),
143            payload,
144            headers: BTreeMap::new(),
145            occurred_at_ms: util::now_ms(),
146        }
147    }
148
149    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
150        self.headers = headers;
151        self
152    }
153
154    /// Apply the unified redaction policy to this event's headers and
155    /// payload. Backends are intentionally left unaware of redaction —
156    /// emitters that need scrubbed events call this before append, and
157    /// readers that materialize events for display can apply the policy
158    /// again as defense in depth.
159    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
160        self.headers = policy.redact_headers(&self.headers);
161        policy.redact_json_in_place(&mut self.payload);
162    }
163}
164
165/// Serialized event payload form for large read paths.
166///
167/// `payload` contains the original JSON bytes for backends that can expose
168/// them directly. Callers that only need to forward or hash the payload can
169/// avoid materializing a `serde_json::Value`; callers that need structured
170/// access can opt in with `payload_json`.
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct LogEventBytes {
173    pub kind: String,
174    pub payload: Bytes,
175    pub headers: BTreeMap<String, String>,
176    pub occurred_at_ms: i64,
177}
178
179impl LogEventBytes {
180    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
181        serde_json::from_slice(&self.payload)
182            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
183    }
184
185    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
186        Ok(LogEvent {
187            kind: self.kind,
188            payload: serde_json::from_slice(&self.payload).map_err(|error| {
189                LogError::Serde(format!("event log payload parse error: {error}"))
190            })?,
191            headers: self.headers,
192            occurred_at_ms: self.occurred_at_ms,
193        })
194    }
195}
196
197impl TryFrom<LogEvent> for LogEventBytes {
198    type Error = LogError;
199
200    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
201        let payload = serde_json::to_vec(&event.payload)
202            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
203        Ok(Self {
204            kind: event.kind,
205            payload: Bytes::from(payload),
206            headers: event.headers,
207            occurred_at_ms: event.occurred_at_ms,
208        })
209    }
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
213pub struct CompactReport {
214    pub removed: usize,
215    pub remaining: usize,
216    pub latest: Option<EventId>,
217    pub checkpointed: bool,
218}
219
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct AppendOutcome {
222    pub event_id: EventId,
223    pub event: LogEvent,
224    pub inserted: bool,
225}
226
227#[derive(Clone, Debug, PartialEq, Eq)]
228pub struct EventLogDescription {
229    pub backend: EventLogBackendKind,
230    pub location: Option<PathBuf>,
231    pub size_bytes: Option<u64>,
232    pub queue_depth: usize,
233}
234
235#[derive(Debug)]
236pub enum LogError {
237    Config(String),
238    InvalidTopic(String),
239    InvalidConsumer(String),
240    Io(String),
241    Serde(String),
242    Sqlite(String),
243    ConsumerLagged(EventId),
244}
245
246impl fmt::Display for LogError {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        match self {
249            Self::Config(message)
250            | Self::InvalidTopic(message)
251            | Self::InvalidConsumer(message)
252            | Self::Io(message)
253            | Self::Serde(message)
254            | Self::Sqlite(message) => message.fmt(f),
255            Self::ConsumerLagged(last_id) => {
256                write!(f, "subscriber lagged behind after event {last_id}")
257            }
258        }
259    }
260}
261
262impl std::error::Error for LogError {}
263
264#[allow(async_fn_in_trait)]
265pub trait EventLog: Send + Sync {
266    fn describe(&self) -> EventLogDescription;
267
268    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
269
270    async fn flush(&self) -> Result<(), LogError>;
271
272    /// Read events strictly after `from`. `None` starts from the
273    /// beginning of the topic.
274    async fn read_range(
275        &self,
276        topic: &Topic,
277        from: Option<EventId>,
278        limit: usize,
279    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
280
281    async fn read_range_bytes(
282        &self,
283        topic: &Topic,
284        from: Option<EventId>,
285        limit: usize,
286    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
287        let events = self.read_range(topic, from, limit).await?;
288        events
289            .into_iter()
290            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
291            .collect()
292    }
293
294    /// `async fn` keeps the ergonomic generic surface; the boxed stream
295    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
296    async fn subscribe(
297        self: Arc<Self>,
298        topic: &Topic,
299        from: Option<EventId>,
300    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
301
302    async fn ack(
303        &self,
304        topic: &Topic,
305        consumer: &ConsumerId,
306        up_to: EventId,
307    ) -> Result<(), LogError>;
308
309    async fn consumer_cursor(
310        &self,
311        topic: &Topic,
312        consumer: &ConsumerId,
313    ) -> Result<Option<EventId>, LogError>;
314
315    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
316
317    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
318}
319
320#[derive(Clone, Debug)]
321pub struct EventLogConfig {
322    pub backend: EventLogBackendKind,
323    pub file_dir: PathBuf,
324    pub sqlite_path: PathBuf,
325    pub queue_depth: usize,
326}
327
328impl EventLogConfig {
329    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
330        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
331            .ok()
332            .map(|value| value.parse())
333            .transpose()?
334            .unwrap_or(EventLogBackendKind::Sqlite);
335        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
336            .ok()
337            .and_then(|value| value.parse::<usize>().ok())
338            .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
339            .max(1);
340
341        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
342            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
343            _ => crate::runtime_paths::event_log_dir(base_dir),
344        };
345        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
346            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
347            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
348        };
349
350        Ok(Self {
351            backend,
352            file_dir,
353            sqlite_path,
354            queue_depth,
355        })
356    }
357
358    pub fn location(&self) -> Option<PathBuf> {
359        match self.backend {
360            EventLogBackendKind::Memory => None,
361            EventLogBackendKind::File => Some(self.file_dir.clone()),
362            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
363        }
364    }
365}
366
367thread_local! {
368    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
369    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
370}
371
372pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
373    let config = EventLogConfig::for_base_dir(base_dir)?;
374    let log = open_event_log(&config)?;
375    ACTIVE_EVENT_LOG.with(|slot| {
376        *slot.borrow_mut() = Some(log.clone());
377    });
378    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
379        *slot.borrow_mut() = None;
380    });
381    Ok(log)
382}
383
384pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
385    let config = EventLogConfig::for_base_dir(base_dir)?;
386    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
387    if !has_active {
388        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
389            *slot.borrow_mut() = Some(config);
390        });
391    }
392    Ok(())
393}
394
395pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
396    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
397    ACTIVE_EVENT_LOG.with(|slot| {
398        *slot.borrow_mut() = Some(log.clone());
399    });
400    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
401        *slot.borrow_mut() = None;
402    });
403    log
404}
405
406pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
407    ACTIVE_EVENT_LOG.with(|slot| {
408        *slot.borrow_mut() = Some(log.clone());
409    });
410    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
411        *slot.borrow_mut() = None;
412    });
413    log
414}
415
416pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
417    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
418        return Some(log);
419    }
420
421    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
422    match open_event_log(&config) {
423        Ok(log) => Some(install_active_event_log(log)),
424        Err(error) => {
425            crate::events::log_warn("event_log.init", &error.to_string());
426            None
427        }
428    }
429}
430
431pub fn reset_active_event_log() {
432    ACTIVE_EVENT_LOG.with(|slot| {
433        *slot.borrow_mut() = None;
434    });
435    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
436        *slot.borrow_mut() = None;
437    });
438}
439
440pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
441    let config = EventLogConfig::for_base_dir(base_dir)?;
442    let description = match config.backend {
443        EventLogBackendKind::Memory => EventLogDescription {
444            backend: EventLogBackendKind::Memory,
445            location: None,
446            size_bytes: None,
447            queue_depth: config.queue_depth,
448        },
449        EventLogBackendKind::File => EventLogDescription {
450            backend: EventLogBackendKind::File,
451            size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
452            location: Some(config.file_dir),
453            queue_depth: config.queue_depth,
454        },
455        EventLogBackendKind::Sqlite => EventLogDescription {
456            backend: EventLogBackendKind::Sqlite,
457            size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
458            location: Some(config.sqlite_path),
459            queue_depth: config.queue_depth,
460        },
461    };
462    Ok(description)
463}
464
465pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
466    match config.backend {
467        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
468            config.queue_depth,
469        )))),
470        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
471            config.file_dir.clone(),
472            config.queue_depth,
473        )?))),
474        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
475            config.sqlite_path.clone(),
476            config.queue_depth,
477        )?))),
478    }
479}
480
481pub enum AnyEventLog {
482    Memory(MemoryEventLog),
483    File(FileEventLog),
484    Sqlite(SqliteEventLog),
485}
486
487impl AnyEventLog {
488    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
489        match self {
490            Self::Memory(log) => log.topics().await,
491            Self::File(log) => log.topics(),
492            Self::Sqlite(log) => log.topics(),
493        }
494    }
495
496    pub async fn append_idempotent_by_header(
497        &self,
498        topic: &Topic,
499        header: &str,
500        value: &str,
501        event: LogEvent,
502    ) -> Result<AppendOutcome, LogError> {
503        if header.trim().is_empty() {
504            return Err(LogError::Config(
505                "idempotent append header cannot be empty".to_string(),
506            ));
507        }
508        match self {
509            Self::Memory(log) => {
510                log.append_idempotent_by_header(topic, header, value, event)
511                    .await
512            }
513            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
514            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
515        }
516    }
517}
518
519impl EventLog for AnyEventLog {
520    fn describe(&self) -> EventLogDescription {
521        match self {
522            Self::Memory(log) => log.describe(),
523            Self::File(log) => log.describe(),
524            Self::Sqlite(log) => log.describe(),
525        }
526    }
527
528    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
529        match self {
530            Self::Memory(log) => log.append(topic, event).await,
531            Self::File(log) => log.append(topic, event).await,
532            Self::Sqlite(log) => log.append(topic, event).await,
533        }
534    }
535
536    async fn flush(&self) -> Result<(), LogError> {
537        match self {
538            Self::Memory(log) => log.flush().await,
539            Self::File(log) => log.flush().await,
540            Self::Sqlite(log) => log.flush().await,
541        }
542    }
543
544    async fn read_range(
545        &self,
546        topic: &Topic,
547        from: Option<EventId>,
548        limit: usize,
549    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
550        match self {
551            Self::Memory(log) => log.read_range(topic, from, limit).await,
552            Self::File(log) => log.read_range(topic, from, limit).await,
553            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
554        }
555    }
556
557    async fn read_range_bytes(
558        &self,
559        topic: &Topic,
560        from: Option<EventId>,
561        limit: usize,
562    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
563        match self {
564            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
565            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
566            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
567        }
568    }
569
570    async fn subscribe(
571        self: Arc<Self>,
572        topic: &Topic,
573        from: Option<EventId>,
574    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
575        let (rx, queue_depth) = match self.as_ref() {
576            Self::Memory(log) => (
577                log.broadcasts.subscribe(topic, log.queue_depth),
578                log.queue_depth,
579            ),
580            Self::File(log) => (
581                log.broadcasts.subscribe(topic, log.queue_depth),
582                log.queue_depth,
583            ),
584            Self::Sqlite(log) => (
585                log.broadcasts.subscribe(topic, log.queue_depth),
586                log.queue_depth,
587            ),
588        };
589        let history = self.read_range(topic, from, usize::MAX).await?;
590        Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
591    }
592
593    async fn ack(
594        &self,
595        topic: &Topic,
596        consumer: &ConsumerId,
597        up_to: EventId,
598    ) -> Result<(), LogError> {
599        match self {
600            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
601            Self::File(log) => log.ack(topic, consumer, up_to).await,
602            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
603        }
604    }
605
606    async fn consumer_cursor(
607        &self,
608        topic: &Topic,
609        consumer: &ConsumerId,
610    ) -> Result<Option<EventId>, LogError> {
611        match self {
612            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
613            Self::File(log) => log.consumer_cursor(topic, consumer).await,
614            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
615        }
616    }
617
618    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
619        match self {
620            Self::Memory(log) => log.latest(topic).await,
621            Self::File(log) => log.latest(topic).await,
622            Self::Sqlite(log) => log.latest(topic).await,
623        }
624    }
625
626    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
627        match self {
628            Self::Memory(log) => log.compact(topic, before).await,
629            Self::File(log) => log.compact(topic, before).await,
630            Self::Sqlite(log) => log.compact(topic, before).await,
631        }
632    }
633}
634
635pub fn sanitize_topic_component(value: &str) -> String {
636    value
637        .chars()
638        .map(|ch| {
639            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
640                ch
641            } else {
642                '_'
643            }
644        })
645        .collect()
646}