1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20mod tests;
21
22pub use file::FileEventLog;
23pub use memory::MemoryEventLog;
24pub use sqlite::SqliteEventLog;
25
26pub type EventId = u64;
27
28pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
29pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
30pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
31pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
32
33#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
34pub struct Topic(String);
35
36impl Topic {
37 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
38 let value = value.into();
39 if value.is_empty() {
40 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
41 }
42 if !value
43 .chars()
44 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
45 {
46 return Err(LogError::InvalidTopic(format!(
47 "topic '{value}' contains unsupported characters"
48 )));
49 }
50 Ok(Self(value))
51 }
52
53 pub fn as_str(&self) -> &str {
54 &self.0
55 }
56}
57
58impl fmt::Display for Topic {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 self.0.fmt(f)
61 }
62}
63
64impl FromStr for Topic {
65 type Err = LogError;
66
67 fn from_str(s: &str) -> Result<Self, Self::Err> {
68 Self::new(s)
69 }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub struct ConsumerId(String);
74
75impl ConsumerId {
76 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
77 let value = value.into();
78 if value.trim().is_empty() {
79 return Err(LogError::InvalidConsumer(
80 "consumer id cannot be empty".to_string(),
81 ));
82 }
83 Ok(Self(value))
84 }
85
86 pub fn as_str(&self) -> &str {
87 &self.0
88 }
89}
90
91impl fmt::Display for ConsumerId {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 self.0.fmt(f)
94 }
95}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum EventLogBackendKind {
100 Memory,
101 File,
102 Sqlite,
103 Postgres,
104}
105
106impl fmt::Display for EventLogBackendKind {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 match self {
109 Self::Memory => write!(f, "memory"),
110 Self::File => write!(f, "file"),
111 Self::Sqlite => write!(f, "sqlite"),
112 Self::Postgres => write!(f, "postgres"),
113 }
114 }
115}
116
117impl FromStr for EventLogBackendKind {
118 type Err = LogError;
119
120 fn from_str(value: &str) -> Result<Self, Self::Err> {
121 match value.trim().to_ascii_lowercase().as_str() {
122 "memory" => Ok(Self::Memory),
123 "file" => Ok(Self::File),
124 "sqlite" => Ok(Self::Sqlite),
125 "postgres" => Ok(Self::Postgres),
126 other => Err(LogError::Config(format!(
127 "unsupported event log backend '{other}'"
128 ))),
129 }
130 }
131}
132
133#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
134pub struct LogEvent {
135 pub kind: String,
136 pub payload: serde_json::Value,
137 #[serde(default)]
138 pub headers: BTreeMap<String, String>,
139 pub occurred_at_ms: i64,
140}
141
142impl LogEvent {
143 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
144 Self {
145 kind: kind.into(),
146 payload,
147 headers: BTreeMap::new(),
148 occurred_at_ms: util::now_ms(),
149 }
150 }
151
152 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
153 self.headers = headers;
154 self
155 }
156
157 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
163 self.headers = policy.redact_headers(&self.headers);
164 policy.redact_json_in_place(&mut self.payload);
165 }
166}
167
168#[derive(Clone, Debug, PartialEq, Eq)]
175pub struct LogEventBytes {
176 pub kind: String,
177 pub payload: Bytes,
178 pub headers: BTreeMap<String, String>,
179 pub occurred_at_ms: i64,
180}
181
182impl LogEventBytes {
183 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
184 serde_json::from_slice(&self.payload)
185 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
186 }
187
188 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
189 Ok(LogEvent {
190 kind: self.kind,
191 payload: serde_json::from_slice(&self.payload).map_err(|error| {
192 LogError::Serde(format!("event log payload parse error: {error}"))
193 })?,
194 headers: self.headers,
195 occurred_at_ms: self.occurred_at_ms,
196 })
197 }
198}
199
200impl TryFrom<LogEvent> for LogEventBytes {
201 type Error = LogError;
202
203 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
204 let payload = serde_json::to_vec(&event.payload)
205 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
206 Ok(Self {
207 kind: event.kind,
208 payload: Bytes::from(payload),
209 headers: event.headers,
210 occurred_at_ms: event.occurred_at_ms,
211 })
212 }
213}
214
215#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
216pub struct CompactReport {
217 pub removed: usize,
218 pub remaining: usize,
219 pub latest: Option<EventId>,
220 pub checkpointed: bool,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
224pub struct AppendOutcome {
225 pub event_id: EventId,
226 pub event: LogEvent,
227 pub inserted: bool,
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct EventLogDescription {
232 pub backend: EventLogBackendKind,
233 pub location: Option<PathBuf>,
234 pub size_bytes: Option<u64>,
235 pub queue_depth: usize,
236}
237
238#[derive(Debug)]
239pub enum LogError {
240 Config(String),
241 InvalidTopic(String),
242 InvalidConsumer(String),
243 Io(String),
244 Serde(String),
245 Sqlite(String),
246 ConsumerLagged(EventId),
247}
248
249impl fmt::Display for LogError {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 match self {
252 Self::Config(message)
253 | Self::InvalidTopic(message)
254 | Self::InvalidConsumer(message)
255 | Self::Io(message)
256 | Self::Serde(message)
257 | Self::Sqlite(message) => message.fmt(f),
258 Self::ConsumerLagged(last_id) => {
259 write!(f, "subscriber lagged behind after event {last_id}")
260 }
261 }
262 }
263}
264
265impl std::error::Error for LogError {}
266
267#[allow(async_fn_in_trait)]
268pub trait EventLog: Send + Sync {
269 fn describe(&self) -> EventLogDescription;
270
271 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
272
273 async fn flush(&self) -> Result<(), LogError>;
274
275 async fn read_range(
278 &self,
279 topic: &Topic,
280 from: Option<EventId>,
281 limit: usize,
282 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
283
284 async fn read_range_bytes(
285 &self,
286 topic: &Topic,
287 from: Option<EventId>,
288 limit: usize,
289 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
290 let events = self.read_range(topic, from, limit).await?;
291 events
292 .into_iter()
293 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
294 .collect()
295 }
296
297 async fn subscribe(
300 self: Arc<Self>,
301 topic: &Topic,
302 from: Option<EventId>,
303 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
304
305 async fn ack(
306 &self,
307 topic: &Topic,
308 consumer: &ConsumerId,
309 up_to: EventId,
310 ) -> Result<(), LogError>;
311
312 async fn consumer_cursor(
313 &self,
314 topic: &Topic,
315 consumer: &ConsumerId,
316 ) -> Result<Option<EventId>, LogError>;
317
318 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
319
320 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
321}
322
323#[derive(Clone, Debug)]
324pub struct EventLogConfig {
325 pub backend: EventLogBackendKind,
326 pub file_dir: PathBuf,
327 pub sqlite_path: PathBuf,
328 pub queue_depth: usize,
329}
330
331impl EventLogConfig {
332 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
333 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
334 .ok()
335 .map(|value| value.parse())
336 .transpose()?
337 .unwrap_or(EventLogBackendKind::Sqlite);
338 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
339 .ok()
340 .and_then(|value| value.parse::<usize>().ok())
341 .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
342 .max(1);
343
344 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
345 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
346 _ => crate::runtime_paths::event_log_dir(base_dir),
347 };
348 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
349 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
350 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
351 };
352
353 Ok(Self {
354 backend,
355 file_dir,
356 sqlite_path,
357 queue_depth,
358 })
359 }
360
361 pub fn location(&self) -> Option<PathBuf> {
362 match self.backend {
363 EventLogBackendKind::Memory => None,
364 EventLogBackendKind::File => Some(self.file_dir.clone()),
365 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
366 EventLogBackendKind::Postgres => None,
367 }
368 }
369}
370
371thread_local! {
372 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
373 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
374}
375
376pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
377 let config = EventLogConfig::for_base_dir(base_dir)?;
378 let log = open_event_log(&config)?;
379 ACTIVE_EVENT_LOG.with(|slot| {
380 *slot.borrow_mut() = Some(log.clone());
381 });
382 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
383 *slot.borrow_mut() = None;
384 });
385 Ok(log)
386}
387
388pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
389 let config = EventLogConfig::for_base_dir(base_dir)?;
390 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
391 if !has_active {
392 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
393 *slot.borrow_mut() = Some(config);
394 });
395 }
396 Ok(())
397}
398
399pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
400 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
401 ACTIVE_EVENT_LOG.with(|slot| {
402 *slot.borrow_mut() = Some(log.clone());
403 });
404 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
405 *slot.borrow_mut() = None;
406 });
407 log
408}
409
410pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
411 ACTIVE_EVENT_LOG.with(|slot| {
412 *slot.borrow_mut() = Some(log.clone());
413 });
414 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
415 *slot.borrow_mut() = None;
416 });
417 log
418}
419
420pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
421 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
422 return Some(log);
423 }
424
425 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
426 match open_event_log(&config) {
427 Ok(log) => Some(install_active_event_log(log)),
428 Err(error) => {
429 crate::events::log_warn("event_log.init", &error.to_string());
430 None
431 }
432 }
433}
434
435pub fn reset_active_event_log() {
436 ACTIVE_EVENT_LOG.with(|slot| {
437 *slot.borrow_mut() = None;
438 });
439 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
440 *slot.borrow_mut() = None;
441 });
442}
443
444pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
445 let config = EventLogConfig::for_base_dir(base_dir)?;
446 let description = match config.backend {
447 EventLogBackendKind::Memory => EventLogDescription {
448 backend: EventLogBackendKind::Memory,
449 location: None,
450 size_bytes: None,
451 queue_depth: config.queue_depth,
452 },
453 EventLogBackendKind::File => EventLogDescription {
454 backend: EventLogBackendKind::File,
455 size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
456 location: Some(config.file_dir),
457 queue_depth: config.queue_depth,
458 },
459 EventLogBackendKind::Sqlite => EventLogDescription {
460 backend: EventLogBackendKind::Sqlite,
461 size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
462 location: Some(config.sqlite_path),
463 queue_depth: config.queue_depth,
464 },
465 EventLogBackendKind::Postgres => EventLogDescription {
466 backend: EventLogBackendKind::Postgres,
467 location: None,
468 size_bytes: None,
469 queue_depth: config.queue_depth,
470 },
471 };
472 Ok(description)
473}
474
475pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
476 match config.backend {
477 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
478 config.queue_depth,
479 )))),
480 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
481 config.file_dir.clone(),
482 config.queue_depth,
483 )?))),
484 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
485 config.sqlite_path.clone(),
486 config.queue_depth,
487 )?))),
488 EventLogBackendKind::Postgres => Err(LogError::Config(
489 "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
490 .to_string(),
491 )),
492 }
493}
494
495pub enum AnyEventLog {
496 Memory(MemoryEventLog),
497 File(FileEventLog),
498 Sqlite(SqliteEventLog),
499}
500
501impl AnyEventLog {
502 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
503 match self {
504 Self::Memory(log) => log.topics().await,
505 Self::File(log) => log.topics(),
506 Self::Sqlite(log) => log.topics(),
507 }
508 }
509
510 pub async fn append_idempotent_by_header(
511 &self,
512 topic: &Topic,
513 header: &str,
514 value: &str,
515 event: LogEvent,
516 ) -> Result<AppendOutcome, LogError> {
517 if header.trim().is_empty() {
518 return Err(LogError::Config(
519 "idempotent append header cannot be empty".to_string(),
520 ));
521 }
522 match self {
523 Self::Memory(log) => {
524 log.append_idempotent_by_header(topic, header, value, event)
525 .await
526 }
527 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
528 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
529 }
530 }
531
532 pub async fn read_idempotent_by_header(
536 &self,
537 topic: &Topic,
538 header: &str,
539 value: &str,
540 ) -> Result<Option<(EventId, LogEvent)>, LogError> {
541 if header.trim().is_empty() {
542 return Err(LogError::Config(
543 "idempotent read header cannot be empty".to_string(),
544 ));
545 }
546 match self {
547 Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
548 Self::File(log) => log.read_idempotent_by_header(topic, header, value),
549 Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
550 }
551 }
552}
553
554impl EventLog for AnyEventLog {
555 fn describe(&self) -> EventLogDescription {
556 match self {
557 Self::Memory(log) => log.describe(),
558 Self::File(log) => log.describe(),
559 Self::Sqlite(log) => log.describe(),
560 }
561 }
562
563 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
564 match self {
565 Self::Memory(log) => log.append(topic, event).await,
566 Self::File(log) => log.append(topic, event).await,
567 Self::Sqlite(log) => log.append(topic, event).await,
568 }
569 }
570
571 async fn flush(&self) -> Result<(), LogError> {
572 match self {
573 Self::Memory(log) => log.flush().await,
574 Self::File(log) => log.flush().await,
575 Self::Sqlite(log) => log.flush().await,
576 }
577 }
578
579 async fn read_range(
580 &self,
581 topic: &Topic,
582 from: Option<EventId>,
583 limit: usize,
584 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
585 match self {
586 Self::Memory(log) => log.read_range(topic, from, limit).await,
587 Self::File(log) => log.read_range(topic, from, limit).await,
588 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
589 }
590 }
591
592 async fn read_range_bytes(
593 &self,
594 topic: &Topic,
595 from: Option<EventId>,
596 limit: usize,
597 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
598 match self {
599 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
600 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
601 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
602 }
603 }
604
605 async fn subscribe(
606 self: Arc<Self>,
607 topic: &Topic,
608 from: Option<EventId>,
609 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
610 let (rx, queue_depth) = match self.as_ref() {
611 Self::Memory(log) => (
612 log.broadcasts.subscribe(topic, log.queue_depth),
613 log.queue_depth,
614 ),
615 Self::File(log) => (
616 log.broadcasts.subscribe(topic, log.queue_depth),
617 log.queue_depth,
618 ),
619 Self::Sqlite(log) => (
620 log.broadcasts.subscribe(topic, log.queue_depth),
621 log.queue_depth,
622 ),
623 };
624 let history = self.read_range(topic, from, usize::MAX).await?;
625 Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
626 }
627
628 async fn ack(
629 &self,
630 topic: &Topic,
631 consumer: &ConsumerId,
632 up_to: EventId,
633 ) -> Result<(), LogError> {
634 match self {
635 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
636 Self::File(log) => log.ack(topic, consumer, up_to).await,
637 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
638 }
639 }
640
641 async fn consumer_cursor(
642 &self,
643 topic: &Topic,
644 consumer: &ConsumerId,
645 ) -> Result<Option<EventId>, LogError> {
646 match self {
647 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
648 Self::File(log) => log.consumer_cursor(topic, consumer).await,
649 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
650 }
651 }
652
653 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
654 match self {
655 Self::Memory(log) => log.latest(topic).await,
656 Self::File(log) => log.latest(topic).await,
657 Self::Sqlite(log) => log.latest(topic).await,
658 }
659 }
660
661 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
662 match self {
663 Self::Memory(log) => log.compact(topic, before).await,
664 Self::File(log) => log.compact(topic, before).await,
665 Self::Sqlite(log) => log.compact(topic, before).await,
666 }
667 }
668}
669
670pub fn sanitize_topic_component(value: &str) -> String {
671 value
672 .chars()
673 .map(|ch| {
674 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
675 ch
676 } else {
677 '_'
678 }
679 })
680 .collect()
681}