1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20pub(crate) use util::pin_test_occurred_at_ms;
21
22#[cfg(test)]
23mod tests;
24
25pub use file::FileEventLog;
26pub use memory::MemoryEventLog;
27pub use sqlite::SqliteEventLog;
28
29pub type EventId = u64;
30
31pub const HARN_LLM_TRANSCRIPT_TOPIC: &str = "agent.transcript.llm";
35
36pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
37pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
38pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
39pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
40
41#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
42pub struct Topic(String);
43
44impl Topic {
45 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
46 let value = value.into();
47 if value.is_empty() {
48 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
49 }
50 if !value
51 .chars()
52 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
53 {
54 return Err(LogError::InvalidTopic(format!(
55 "topic '{value}' contains unsupported characters"
56 )));
57 }
58 Ok(Self(value))
59 }
60
61 pub fn as_str(&self) -> &str {
62 &self.0
63 }
64}
65
66impl fmt::Display for Topic {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 self.0.fmt(f)
69 }
70}
71
72impl FromStr for Topic {
73 type Err = LogError;
74
75 fn from_str(s: &str) -> Result<Self, Self::Err> {
76 Self::new(s)
77 }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
81pub struct ConsumerId(String);
82
83impl ConsumerId {
84 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
85 let value = value.into();
86 if value.trim().is_empty() {
87 return Err(LogError::InvalidConsumer(
88 "consumer id cannot be empty".to_string(),
89 ));
90 }
91 Ok(Self(value))
92 }
93
94 pub fn as_str(&self) -> &str {
95 &self.0
96 }
97}
98
99impl fmt::Display for ConsumerId {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 self.0.fmt(f)
102 }
103}
104
105#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum EventLogBackendKind {
108 Memory,
109 File,
110 Sqlite,
111 Postgres,
112}
113
114impl fmt::Display for EventLogBackendKind {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 Self::Memory => write!(f, "memory"),
118 Self::File => write!(f, "file"),
119 Self::Sqlite => write!(f, "sqlite"),
120 Self::Postgres => write!(f, "postgres"),
121 }
122 }
123}
124
125impl FromStr for EventLogBackendKind {
126 type Err = LogError;
127
128 fn from_str(value: &str) -> Result<Self, Self::Err> {
129 match value.trim().to_ascii_lowercase().as_str() {
130 "memory" => Ok(Self::Memory),
131 "file" => Ok(Self::File),
132 "sqlite" => Ok(Self::Sqlite),
133 "postgres" => Ok(Self::Postgres),
134 other => Err(LogError::Config(format!(
135 "unsupported event log backend '{other}'"
136 ))),
137 }
138 }
139}
140
141#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
142pub struct LogEvent {
143 pub kind: String,
144 pub payload: serde_json::Value,
145 #[serde(default)]
146 pub headers: BTreeMap<String, String>,
147 pub occurred_at_ms: i64,
148}
149
150impl LogEvent {
151 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
152 Self {
153 kind: kind.into(),
154 payload,
155 headers: BTreeMap::new(),
156 occurred_at_ms: util::now_ms(),
157 }
158 }
159
160 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
161 self.headers = headers;
162 self
163 }
164
165 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
171 self.headers = policy.redact_headers(&self.headers);
172 policy.redact_json_in_place(&mut self.payload);
173 }
174}
175
176#[derive(Clone, Debug, PartialEq, Eq)]
183pub struct LogEventBytes {
184 pub kind: String,
185 pub payload: Bytes,
186 pub headers: BTreeMap<String, String>,
187 pub occurred_at_ms: i64,
188}
189
190impl LogEventBytes {
191 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
192 serde_json::from_slice(&self.payload)
193 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
194 }
195
196 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
197 Ok(LogEvent {
198 kind: self.kind,
199 payload: serde_json::from_slice(&self.payload).map_err(|error| {
200 LogError::Serde(format!("event log payload parse error: {error}"))
201 })?,
202 headers: self.headers,
203 occurred_at_ms: self.occurred_at_ms,
204 })
205 }
206}
207
208impl TryFrom<LogEvent> for LogEventBytes {
209 type Error = LogError;
210
211 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
212 let payload = serde_json::to_vec(&event.payload)
213 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
214 Ok(Self {
215 kind: event.kind,
216 payload: Bytes::from(payload),
217 headers: event.headers,
218 occurred_at_ms: event.occurred_at_ms,
219 })
220 }
221}
222
223#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
224pub struct CompactReport {
225 pub removed: usize,
226 pub remaining: usize,
227 pub latest: Option<EventId>,
228 pub checkpointed: bool,
229}
230
231#[derive(Clone, Debug, PartialEq, Eq)]
232pub struct AppendOutcome {
233 pub event_id: EventId,
234 pub event: LogEvent,
235 pub inserted: bool,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239pub struct EventLogDescription {
240 pub backend: EventLogBackendKind,
241 pub location: Option<PathBuf>,
242 pub size_bytes: Option<u64>,
243 pub queue_depth: usize,
244}
245
246#[derive(Debug)]
247pub enum LogError {
248 Config(String),
249 InvalidTopic(String),
250 InvalidConsumer(String),
251 Io(String),
252 Serde(String),
253 Sqlite(String),
254 ConsumerLagged(EventId),
255}
256
257impl fmt::Display for LogError {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 match self {
260 Self::Config(message)
261 | Self::InvalidTopic(message)
262 | Self::InvalidConsumer(message)
263 | Self::Io(message)
264 | Self::Serde(message)
265 | Self::Sqlite(message) => message.fmt(f),
266 Self::ConsumerLagged(last_id) => {
267 write!(f, "subscriber lagged behind after event {last_id}")
268 }
269 }
270 }
271}
272
273impl std::error::Error for LogError {}
274
275#[allow(async_fn_in_trait)]
276pub trait EventLog: Send + Sync {
277 fn describe(&self) -> EventLogDescription;
278
279 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
280
281 async fn flush(&self) -> Result<(), LogError>;
282
283 async fn read_range(
286 &self,
287 topic: &Topic,
288 from: Option<EventId>,
289 limit: usize,
290 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
291
292 async fn read_range_bytes(
293 &self,
294 topic: &Topic,
295 from: Option<EventId>,
296 limit: usize,
297 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
298 let events = self.read_range(topic, from, limit).await?;
299 events
300 .into_iter()
301 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
302 .collect()
303 }
304
305 async fn subscribe(
308 self: Arc<Self>,
309 topic: &Topic,
310 from: Option<EventId>,
311 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
312
313 async fn ack(
314 &self,
315 topic: &Topic,
316 consumer: &ConsumerId,
317 up_to: EventId,
318 ) -> Result<(), LogError>;
319
320 async fn consumer_cursor(
321 &self,
322 topic: &Topic,
323 consumer: &ConsumerId,
324 ) -> Result<Option<EventId>, LogError>;
325
326 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
327
328 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
329}
330
331#[derive(Clone, Debug)]
332pub struct EventLogConfig {
333 pub backend: EventLogBackendKind,
334 pub file_dir: PathBuf,
335 pub sqlite_path: PathBuf,
336 pub queue_depth: usize,
337}
338
339impl EventLogConfig {
340 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
341 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
342 .ok()
343 .map(|value| value.parse())
344 .transpose()?
345 .unwrap_or(EventLogBackendKind::Sqlite);
346 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
347 .ok()
348 .and_then(|value| value.parse::<usize>().ok())
349 .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
350 .max(1);
351
352 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
353 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
354 _ => crate::runtime_paths::event_log_dir(base_dir),
355 };
356 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
357 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
358 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
359 };
360
361 Ok(Self {
362 backend,
363 file_dir,
364 sqlite_path,
365 queue_depth,
366 })
367 }
368
369 pub fn location(&self) -> Option<PathBuf> {
370 match self.backend {
371 EventLogBackendKind::Memory => None,
372 EventLogBackendKind::File => Some(self.file_dir.clone()),
373 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
374 EventLogBackendKind::Postgres => None,
375 }
376 }
377}
378
379thread_local! {
380 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
381 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
382}
383
384pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
385 let config = EventLogConfig::for_base_dir(base_dir)?;
386 let log = open_event_log(&config)?;
387 ACTIVE_EVENT_LOG.with(|slot| {
388 *slot.borrow_mut() = Some(log.clone());
389 });
390 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
391 *slot.borrow_mut() = None;
392 });
393 Ok(log)
394}
395
396pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
397 let config = EventLogConfig::for_base_dir(base_dir)?;
398 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
399 if !has_active {
400 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
401 *slot.borrow_mut() = Some(config);
402 });
403 }
404 Ok(())
405}
406
407pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
408 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
409 ACTIVE_EVENT_LOG.with(|slot| {
410 *slot.borrow_mut() = Some(log.clone());
411 });
412 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
413 *slot.borrow_mut() = None;
414 });
415 log
416}
417
418pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
419 ACTIVE_EVENT_LOG.with(|slot| {
420 *slot.borrow_mut() = Some(log.clone());
421 });
422 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
423 *slot.borrow_mut() = None;
424 });
425 log
426}
427
428pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
429 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
430 return Some(log);
431 }
432
433 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
434 match open_event_log(&config) {
435 Ok(log) => Some(install_active_event_log(log)),
436 Err(error) => {
437 crate::events::log_warn("event_log.init", &error.to_string());
438 None
439 }
440 }
441}
442
443pub fn reset_active_event_log() {
444 ACTIVE_EVENT_LOG.with(|slot| {
445 *slot.borrow_mut() = None;
446 });
447 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
448 *slot.borrow_mut() = None;
449 });
450}
451
452pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
453 let config = EventLogConfig::for_base_dir(base_dir)?;
454 let description = match config.backend {
455 EventLogBackendKind::Memory => EventLogDescription {
456 backend: EventLogBackendKind::Memory,
457 location: None,
458 size_bytes: None,
459 queue_depth: config.queue_depth,
460 },
461 EventLogBackendKind::File => EventLogDescription {
462 backend: EventLogBackendKind::File,
463 size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
464 location: Some(config.file_dir),
465 queue_depth: config.queue_depth,
466 },
467 EventLogBackendKind::Sqlite => EventLogDescription {
468 backend: EventLogBackendKind::Sqlite,
469 size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
470 location: Some(config.sqlite_path),
471 queue_depth: config.queue_depth,
472 },
473 EventLogBackendKind::Postgres => EventLogDescription {
474 backend: EventLogBackendKind::Postgres,
475 location: None,
476 size_bytes: None,
477 queue_depth: config.queue_depth,
478 },
479 };
480 Ok(description)
481}
482
483pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
484 match config.backend {
485 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
486 config.queue_depth,
487 )))),
488 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
489 config.file_dir.clone(),
490 config.queue_depth,
491 )?))),
492 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
493 config.sqlite_path.clone(),
494 config.queue_depth,
495 )?))),
496 EventLogBackendKind::Postgres => Err(LogError::Config(
497 "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
498 .to_string(),
499 )),
500 }
501}
502
503pub enum AnyEventLog {
504 Memory(MemoryEventLog),
505 File(FileEventLog),
506 Sqlite(SqliteEventLog),
507}
508
509impl AnyEventLog {
510 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
511 match self {
512 Self::Memory(log) => log.topics().await,
513 Self::File(log) => log.topics(),
514 Self::Sqlite(log) => log.topics(),
515 }
516 }
517
518 pub async fn append_idempotent_by_header(
519 &self,
520 topic: &Topic,
521 header: &str,
522 value: &str,
523 event: LogEvent,
524 ) -> Result<AppendOutcome, LogError> {
525 if header.trim().is_empty() {
526 return Err(LogError::Config(
527 "idempotent append header cannot be empty".to_string(),
528 ));
529 }
530 match self {
531 Self::Memory(log) => {
532 log.append_idempotent_by_header(topic, header, value, event)
533 .await
534 }
535 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
536 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
537 }
538 }
539
540 pub async fn read_idempotent_by_header(
544 &self,
545 topic: &Topic,
546 header: &str,
547 value: &str,
548 ) -> Result<Option<(EventId, LogEvent)>, LogError> {
549 if header.trim().is_empty() {
550 return Err(LogError::Config(
551 "idempotent read header cannot be empty".to_string(),
552 ));
553 }
554 match self {
555 Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
556 Self::File(log) => log.read_idempotent_by_header(topic, header, value),
557 Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
558 }
559 }
560}
561
562impl EventLog for AnyEventLog {
563 fn describe(&self) -> EventLogDescription {
564 match self {
565 Self::Memory(log) => log.describe(),
566 Self::File(log) => log.describe(),
567 Self::Sqlite(log) => log.describe(),
568 }
569 }
570
571 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
572 match self {
573 Self::Memory(log) => log.append(topic, event).await,
574 Self::File(log) => log.append(topic, event).await,
575 Self::Sqlite(log) => log.append(topic, event).await,
576 }
577 }
578
579 async fn flush(&self) -> Result<(), LogError> {
580 match self {
581 Self::Memory(log) => log.flush().await,
582 Self::File(log) => log.flush().await,
583 Self::Sqlite(log) => log.flush().await,
584 }
585 }
586
587 async fn read_range(
588 &self,
589 topic: &Topic,
590 from: Option<EventId>,
591 limit: usize,
592 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
593 match self {
594 Self::Memory(log) => log.read_range(topic, from, limit).await,
595 Self::File(log) => log.read_range(topic, from, limit).await,
596 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
597 }
598 }
599
600 async fn read_range_bytes(
601 &self,
602 topic: &Topic,
603 from: Option<EventId>,
604 limit: usize,
605 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
606 match self {
607 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
608 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
609 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
610 }
611 }
612
613 async fn subscribe(
614 self: Arc<Self>,
615 topic: &Topic,
616 from: Option<EventId>,
617 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
618 let (rx, queue_depth) = match self.as_ref() {
619 Self::Memory(log) => (
620 log.broadcasts.subscribe(topic, log.queue_depth),
621 log.queue_depth,
622 ),
623 Self::File(log) => (
624 log.broadcasts.subscribe(topic, log.queue_depth),
625 log.queue_depth,
626 ),
627 Self::Sqlite(log) => (
628 log.broadcasts.subscribe(topic, log.queue_depth),
629 log.queue_depth,
630 ),
631 };
632 let history = self.read_range(topic, from, usize::MAX).await?;
633 Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
634 }
635
636 async fn ack(
637 &self,
638 topic: &Topic,
639 consumer: &ConsumerId,
640 up_to: EventId,
641 ) -> Result<(), LogError> {
642 match self {
643 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
644 Self::File(log) => log.ack(topic, consumer, up_to).await,
645 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
646 }
647 }
648
649 async fn consumer_cursor(
650 &self,
651 topic: &Topic,
652 consumer: &ConsumerId,
653 ) -> Result<Option<EventId>, LogError> {
654 match self {
655 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
656 Self::File(log) => log.consumer_cursor(topic, consumer).await,
657 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
658 }
659 }
660
661 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
662 match self {
663 Self::Memory(log) => log.latest(topic).await,
664 Self::File(log) => log.latest(topic).await,
665 Self::Sqlite(log) => log.latest(topic).await,
666 }
667 }
668
669 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
670 match self {
671 Self::Memory(log) => log.compact(topic, before).await,
672 Self::File(log) => log.compact(topic, before).await,
673 Self::Sqlite(log) => log.compact(topic, before).await,
674 }
675 }
676}
677
678pub fn sanitize_topic_component(value: &str) -> String {
679 value
680 .chars()
681 .map(|ch| {
682 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
683 ch
684 } else {
685 '_'
686 }
687 })
688 .collect()
689}