Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20pub(crate) use util::pin_test_occurred_at_ms;
21
22#[cfg(test)]
23mod tests;
24
25pub use file::FileEventLog;
26pub use memory::MemoryEventLog;
27pub use sqlite::SqliteEventLog;
28
29pub type EventId = u64;
30
31/// Topic the LLM observability writer appends `provider_call_response`
32/// (and sibling transcript) events to. Read by `harn portal` and
33/// `harn usage`; kept here so every reader/writer shares one literal.
34pub const HARN_LLM_TRANSCRIPT_TOPIC: &str = "agent.transcript.llm";
35
36pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
37pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
38pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
39pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
40
41#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
42pub struct Topic(String);
43
44impl Topic {
45    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
46        let value = value.into();
47        if value.is_empty() {
48            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
49        }
50        if !value
51            .chars()
52            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
53        {
54            return Err(LogError::InvalidTopic(format!(
55                "topic '{value}' contains unsupported characters"
56            )));
57        }
58        Ok(Self(value))
59    }
60
61    pub fn as_str(&self) -> &str {
62        &self.0
63    }
64}
65
66impl fmt::Display for Topic {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        self.0.fmt(f)
69    }
70}
71
72impl FromStr for Topic {
73    type Err = LogError;
74
75    fn from_str(s: &str) -> Result<Self, Self::Err> {
76        Self::new(s)
77    }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
81pub struct ConsumerId(String);
82
83impl ConsumerId {
84    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
85        let value = value.into();
86        if value.trim().is_empty() {
87            return Err(LogError::InvalidConsumer(
88                "consumer id cannot be empty".to_string(),
89            ));
90        }
91        Ok(Self(value))
92    }
93
94    pub fn as_str(&self) -> &str {
95        &self.0
96    }
97}
98
99impl fmt::Display for ConsumerId {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        self.0.fmt(f)
102    }
103}
104
105#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum EventLogBackendKind {
108    Memory,
109    File,
110    Sqlite,
111    Postgres,
112}
113
114impl fmt::Display for EventLogBackendKind {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            Self::Memory => write!(f, "memory"),
118            Self::File => write!(f, "file"),
119            Self::Sqlite => write!(f, "sqlite"),
120            Self::Postgres => write!(f, "postgres"),
121        }
122    }
123}
124
125impl FromStr for EventLogBackendKind {
126    type Err = LogError;
127
128    fn from_str(value: &str) -> Result<Self, Self::Err> {
129        match value.trim().to_ascii_lowercase().as_str() {
130            "memory" => Ok(Self::Memory),
131            "file" => Ok(Self::File),
132            "sqlite" => Ok(Self::Sqlite),
133            "postgres" => Ok(Self::Postgres),
134            other => Err(LogError::Config(format!(
135                "unsupported event log backend '{other}'"
136            ))),
137        }
138    }
139}
140
141#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
142pub struct LogEvent {
143    pub kind: String,
144    pub payload: serde_json::Value,
145    #[serde(default)]
146    pub headers: BTreeMap<String, String>,
147    pub occurred_at_ms: i64,
148}
149
150impl LogEvent {
151    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
152        Self {
153            kind: kind.into(),
154            payload,
155            headers: BTreeMap::new(),
156            occurred_at_ms: util::now_ms(),
157        }
158    }
159
160    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
161        self.headers = headers;
162        self
163    }
164
165    /// Apply the unified redaction policy to this event's headers and
166    /// payload. Backends are intentionally left unaware of redaction —
167    /// emitters that need scrubbed events call this before append, and
168    /// readers that materialize events for display can apply the policy
169    /// again as defense in depth.
170    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
171        self.headers = policy.redact_headers(&self.headers);
172        policy.redact_json_in_place(&mut self.payload);
173    }
174}
175
176/// Serialized event payload form for large read paths.
177///
178/// `payload` contains the original JSON bytes for backends that can expose
179/// them directly. Callers that only need to forward or hash the payload can
180/// avoid materializing a `serde_json::Value`; callers that need structured
181/// access can opt in with `payload_json`.
182#[derive(Clone, Debug, PartialEq, Eq)]
183pub struct LogEventBytes {
184    pub kind: String,
185    pub payload: Bytes,
186    pub headers: BTreeMap<String, String>,
187    pub occurred_at_ms: i64,
188}
189
190impl LogEventBytes {
191    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
192        serde_json::from_slice(&self.payload)
193            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
194    }
195
196    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
197        Ok(LogEvent {
198            kind: self.kind,
199            payload: serde_json::from_slice(&self.payload).map_err(|error| {
200                LogError::Serde(format!("event log payload parse error: {error}"))
201            })?,
202            headers: self.headers,
203            occurred_at_ms: self.occurred_at_ms,
204        })
205    }
206}
207
208impl TryFrom<LogEvent> for LogEventBytes {
209    type Error = LogError;
210
211    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
212        let payload = serde_json::to_vec(&event.payload)
213            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
214        Ok(Self {
215            kind: event.kind,
216            payload: Bytes::from(payload),
217            headers: event.headers,
218            occurred_at_ms: event.occurred_at_ms,
219        })
220    }
221}
222
223#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
224pub struct CompactReport {
225    pub removed: usize,
226    pub remaining: usize,
227    pub latest: Option<EventId>,
228    pub checkpointed: bool,
229}
230
231#[derive(Clone, Debug, PartialEq, Eq)]
232pub struct AppendOutcome {
233    pub event_id: EventId,
234    pub event: LogEvent,
235    pub inserted: bool,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct EventLogDescription {
240    pub backend: EventLogBackendKind,
241    pub location: Option<PathBuf>,
242    pub size_bytes: Option<u64>,
243    pub queue_depth: usize,
244}
245
246#[derive(Debug)]
247pub enum LogError {
248    Config(String),
249    InvalidTopic(String),
250    InvalidConsumer(String),
251    Io(String),
252    Serde(String),
253    Sqlite(String),
254    ConsumerLagged(EventId),
255}
256
257impl fmt::Display for LogError {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        match self {
260            Self::Config(message)
261            | Self::InvalidTopic(message)
262            | Self::InvalidConsumer(message)
263            | Self::Io(message)
264            | Self::Serde(message)
265            | Self::Sqlite(message) => message.fmt(f),
266            Self::ConsumerLagged(last_id) => {
267                write!(f, "subscriber lagged behind after event {last_id}")
268            }
269        }
270    }
271}
272
273impl std::error::Error for LogError {}
274
275#[allow(async_fn_in_trait)]
276pub trait EventLog: Send + Sync {
277    fn describe(&self) -> EventLogDescription;
278
279    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
280
281    async fn flush(&self) -> Result<(), LogError>;
282
283    /// Read events strictly after `from`. `None` starts from the
284    /// beginning of the topic.
285    async fn read_range(
286        &self,
287        topic: &Topic,
288        from: Option<EventId>,
289        limit: usize,
290    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
291
292    async fn read_range_bytes(
293        &self,
294        topic: &Topic,
295        from: Option<EventId>,
296        limit: usize,
297    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
298        let events = self.read_range(topic, from, limit).await?;
299        events
300            .into_iter()
301            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
302            .collect()
303    }
304
305    /// `async fn` keeps the ergonomic generic surface; the boxed stream
306    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
307    async fn subscribe(
308        self: Arc<Self>,
309        topic: &Topic,
310        from: Option<EventId>,
311    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
312
313    async fn ack(
314        &self,
315        topic: &Topic,
316        consumer: &ConsumerId,
317        up_to: EventId,
318    ) -> Result<(), LogError>;
319
320    async fn consumer_cursor(
321        &self,
322        topic: &Topic,
323        consumer: &ConsumerId,
324    ) -> Result<Option<EventId>, LogError>;
325
326    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
327
328    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
329}
330
331#[derive(Clone, Debug)]
332pub struct EventLogConfig {
333    pub backend: EventLogBackendKind,
334    pub file_dir: PathBuf,
335    pub sqlite_path: PathBuf,
336    pub queue_depth: usize,
337}
338
339impl EventLogConfig {
340    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
341        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
342            .ok()
343            .map(|value| value.parse())
344            .transpose()?
345            .unwrap_or(EventLogBackendKind::Sqlite);
346        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
347            .ok()
348            .and_then(|value| value.parse::<usize>().ok())
349            .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
350            .max(1);
351
352        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
353            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
354            _ => crate::runtime_paths::event_log_dir(base_dir),
355        };
356        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
357            Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
358            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
359        };
360
361        Ok(Self {
362            backend,
363            file_dir,
364            sqlite_path,
365            queue_depth,
366        })
367    }
368
369    pub fn location(&self) -> Option<PathBuf> {
370        match self.backend {
371            EventLogBackendKind::Memory => None,
372            EventLogBackendKind::File => Some(self.file_dir.clone()),
373            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
374            EventLogBackendKind::Postgres => None,
375        }
376    }
377}
378
379thread_local! {
380    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
381    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
382}
383
384pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
385    let config = EventLogConfig::for_base_dir(base_dir)?;
386    let log = open_event_log(&config)?;
387    ACTIVE_EVENT_LOG.with(|slot| {
388        *slot.borrow_mut() = Some(log.clone());
389    });
390    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
391        *slot.borrow_mut() = None;
392    });
393    Ok(log)
394}
395
396pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
397    let config = EventLogConfig::for_base_dir(base_dir)?;
398    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
399    if !has_active {
400        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
401            *slot.borrow_mut() = Some(config);
402        });
403    }
404    Ok(())
405}
406
407pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
408    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
409    ACTIVE_EVENT_LOG.with(|slot| {
410        *slot.borrow_mut() = Some(log.clone());
411    });
412    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
413        *slot.borrow_mut() = None;
414    });
415    log
416}
417
418pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
419    ACTIVE_EVENT_LOG.with(|slot| {
420        *slot.borrow_mut() = Some(log.clone());
421    });
422    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
423        *slot.borrow_mut() = None;
424    });
425    log
426}
427
428pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
429    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
430        return Some(log);
431    }
432
433    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
434    match open_event_log(&config) {
435        Ok(log) => Some(install_active_event_log(log)),
436        Err(error) => {
437            crate::events::log_warn("event_log.init", &error.to_string());
438            None
439        }
440    }
441}
442
443pub fn reset_active_event_log() {
444    ACTIVE_EVENT_LOG.with(|slot| {
445        *slot.borrow_mut() = None;
446    });
447    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
448        *slot.borrow_mut() = None;
449    });
450}
451
452pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
453    let config = EventLogConfig::for_base_dir(base_dir)?;
454    let description = match config.backend {
455        EventLogBackendKind::Memory => EventLogDescription {
456            backend: EventLogBackendKind::Memory,
457            location: None,
458            size_bytes: None,
459            queue_depth: config.queue_depth,
460        },
461        EventLogBackendKind::File => EventLogDescription {
462            backend: EventLogBackendKind::File,
463            size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
464            location: Some(config.file_dir),
465            queue_depth: config.queue_depth,
466        },
467        EventLogBackendKind::Sqlite => EventLogDescription {
468            backend: EventLogBackendKind::Sqlite,
469            size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
470            location: Some(config.sqlite_path),
471            queue_depth: config.queue_depth,
472        },
473        EventLogBackendKind::Postgres => EventLogDescription {
474            backend: EventLogBackendKind::Postgres,
475            location: None,
476            size_bytes: None,
477            queue_depth: config.queue_depth,
478        },
479    };
480    Ok(description)
481}
482
483pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
484    match config.backend {
485        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
486            config.queue_depth,
487        )))),
488        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
489            config.file_dir.clone(),
490            config.queue_depth,
491        )?))),
492        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
493            config.sqlite_path.clone(),
494            config.queue_depth,
495        )?))),
496        EventLogBackendKind::Postgres => Err(LogError::Config(
497            "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
498                .to_string(),
499        )),
500    }
501}
502
503pub enum AnyEventLog {
504    Memory(MemoryEventLog),
505    File(FileEventLog),
506    Sqlite(SqliteEventLog),
507}
508
509impl AnyEventLog {
510    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
511        match self {
512            Self::Memory(log) => log.topics().await,
513            Self::File(log) => log.topics(),
514            Self::Sqlite(log) => log.topics(),
515        }
516    }
517
518    pub async fn append_idempotent_by_header(
519        &self,
520        topic: &Topic,
521        header: &str,
522        value: &str,
523        event: LogEvent,
524    ) -> Result<AppendOutcome, LogError> {
525        if header.trim().is_empty() {
526            return Err(LogError::Config(
527                "idempotent append header cannot be empty".to_string(),
528            ));
529        }
530        match self {
531            Self::Memory(log) => {
532                log.append_idempotent_by_header(topic, header, value, event)
533                    .await
534            }
535            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
536            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
537        }
538    }
539
540    /// Read the event previously appended under `(header, value)`, the read
541    /// counterpart of [`Self::append_idempotent_by_header`]. On the SQLite
542    /// backend this is an indexed JOIN; the memory/file dev backends scan.
543    pub async fn read_idempotent_by_header(
544        &self,
545        topic: &Topic,
546        header: &str,
547        value: &str,
548    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
549        if header.trim().is_empty() {
550            return Err(LogError::Config(
551                "idempotent read header cannot be empty".to_string(),
552            ));
553        }
554        match self {
555            Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
556            Self::File(log) => log.read_idempotent_by_header(topic, header, value),
557            Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
558        }
559    }
560}
561
562impl EventLog for AnyEventLog {
563    fn describe(&self) -> EventLogDescription {
564        match self {
565            Self::Memory(log) => log.describe(),
566            Self::File(log) => log.describe(),
567            Self::Sqlite(log) => log.describe(),
568        }
569    }
570
571    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
572        match self {
573            Self::Memory(log) => log.append(topic, event).await,
574            Self::File(log) => log.append(topic, event).await,
575            Self::Sqlite(log) => log.append(topic, event).await,
576        }
577    }
578
579    async fn flush(&self) -> Result<(), LogError> {
580        match self {
581            Self::Memory(log) => log.flush().await,
582            Self::File(log) => log.flush().await,
583            Self::Sqlite(log) => log.flush().await,
584        }
585    }
586
587    async fn read_range(
588        &self,
589        topic: &Topic,
590        from: Option<EventId>,
591        limit: usize,
592    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
593        match self {
594            Self::Memory(log) => log.read_range(topic, from, limit).await,
595            Self::File(log) => log.read_range(topic, from, limit).await,
596            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
597        }
598    }
599
600    async fn read_range_bytes(
601        &self,
602        topic: &Topic,
603        from: Option<EventId>,
604        limit: usize,
605    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
606        match self {
607            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
608            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
609            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
610        }
611    }
612
613    async fn subscribe(
614        self: Arc<Self>,
615        topic: &Topic,
616        from: Option<EventId>,
617    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
618        let (rx, queue_depth) = match self.as_ref() {
619            Self::Memory(log) => (
620                log.broadcasts.subscribe(topic, log.queue_depth),
621                log.queue_depth,
622            ),
623            Self::File(log) => (
624                log.broadcasts.subscribe(topic, log.queue_depth),
625                log.queue_depth,
626            ),
627            Self::Sqlite(log) => (
628                log.broadcasts.subscribe(topic, log.queue_depth),
629                log.queue_depth,
630            ),
631        };
632        let history = self.read_range(topic, from, usize::MAX).await?;
633        Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
634    }
635
636    async fn ack(
637        &self,
638        topic: &Topic,
639        consumer: &ConsumerId,
640        up_to: EventId,
641    ) -> Result<(), LogError> {
642        match self {
643            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
644            Self::File(log) => log.ack(topic, consumer, up_to).await,
645            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
646        }
647    }
648
649    async fn consumer_cursor(
650        &self,
651        topic: &Topic,
652        consumer: &ConsumerId,
653    ) -> Result<Option<EventId>, LogError> {
654        match self {
655            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
656            Self::File(log) => log.consumer_cursor(topic, consumer).await,
657            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
658        }
659    }
660
661    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
662        match self {
663            Self::Memory(log) => log.latest(topic).await,
664            Self::File(log) => log.latest(topic).await,
665            Self::Sqlite(log) => log.latest(topic).await,
666        }
667    }
668
669    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
670        match self {
671            Self::Memory(log) => log.compact(topic, before).await,
672            Self::File(log) => log.compact(topic, before).await,
673            Self::Sqlite(log) => log.compact(topic, before).await,
674        }
675    }
676}
677
678pub fn sanitize_topic_component(value: &str) -> String {
679    value
680        .chars()
681        .map(|ch| {
682            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
683                ch
684            } else {
685                '_'
686            }
687        })
688        .collect()
689}