1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20pub(crate) use util::pin_test_occurred_at_ms;
21
22#[cfg(test)]
23mod tests;
24
25pub use file::FileEventLog;
26pub use memory::MemoryEventLog;
27pub use sqlite::SqliteEventLog;
28
29pub type EventId = u64;
30
31pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
32pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
33pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
34pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
35
36#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
37pub struct Topic(String);
38
39impl Topic {
40 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
41 let value = value.into();
42 if value.is_empty() {
43 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
44 }
45 if !value
46 .chars()
47 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
48 {
49 return Err(LogError::InvalidTopic(format!(
50 "topic '{value}' contains unsupported characters"
51 )));
52 }
53 Ok(Self(value))
54 }
55
56 pub fn as_str(&self) -> &str {
57 &self.0
58 }
59}
60
61impl fmt::Display for Topic {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.0.fmt(f)
64 }
65}
66
67impl FromStr for Topic {
68 type Err = LogError;
69
70 fn from_str(s: &str) -> Result<Self, Self::Err> {
71 Self::new(s)
72 }
73}
74
75#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
76pub struct ConsumerId(String);
77
78impl ConsumerId {
79 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
80 let value = value.into();
81 if value.trim().is_empty() {
82 return Err(LogError::InvalidConsumer(
83 "consumer id cannot be empty".to_string(),
84 ));
85 }
86 Ok(Self(value))
87 }
88
89 pub fn as_str(&self) -> &str {
90 &self.0
91 }
92}
93
94impl fmt::Display for ConsumerId {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 self.0.fmt(f)
97 }
98}
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub enum EventLogBackendKind {
103 Memory,
104 File,
105 Sqlite,
106 Postgres,
107}
108
109impl fmt::Display for EventLogBackendKind {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 match self {
112 Self::Memory => write!(f, "memory"),
113 Self::File => write!(f, "file"),
114 Self::Sqlite => write!(f, "sqlite"),
115 Self::Postgres => write!(f, "postgres"),
116 }
117 }
118}
119
120impl FromStr for EventLogBackendKind {
121 type Err = LogError;
122
123 fn from_str(value: &str) -> Result<Self, Self::Err> {
124 match value.trim().to_ascii_lowercase().as_str() {
125 "memory" => Ok(Self::Memory),
126 "file" => Ok(Self::File),
127 "sqlite" => Ok(Self::Sqlite),
128 "postgres" => Ok(Self::Postgres),
129 other => Err(LogError::Config(format!(
130 "unsupported event log backend '{other}'"
131 ))),
132 }
133 }
134}
135
136#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137pub struct LogEvent {
138 pub kind: String,
139 pub payload: serde_json::Value,
140 #[serde(default)]
141 pub headers: BTreeMap<String, String>,
142 pub occurred_at_ms: i64,
143}
144
145impl LogEvent {
146 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
147 Self {
148 kind: kind.into(),
149 payload,
150 headers: BTreeMap::new(),
151 occurred_at_ms: util::now_ms(),
152 }
153 }
154
155 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
156 self.headers = headers;
157 self
158 }
159
160 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
166 self.headers = policy.redact_headers(&self.headers);
167 policy.redact_json_in_place(&mut self.payload);
168 }
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
178pub struct LogEventBytes {
179 pub kind: String,
180 pub payload: Bytes,
181 pub headers: BTreeMap<String, String>,
182 pub occurred_at_ms: i64,
183}
184
185impl LogEventBytes {
186 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
187 serde_json::from_slice(&self.payload)
188 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
189 }
190
191 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
192 Ok(LogEvent {
193 kind: self.kind,
194 payload: serde_json::from_slice(&self.payload).map_err(|error| {
195 LogError::Serde(format!("event log payload parse error: {error}"))
196 })?,
197 headers: self.headers,
198 occurred_at_ms: self.occurred_at_ms,
199 })
200 }
201}
202
203impl TryFrom<LogEvent> for LogEventBytes {
204 type Error = LogError;
205
206 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
207 let payload = serde_json::to_vec(&event.payload)
208 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
209 Ok(Self {
210 kind: event.kind,
211 payload: Bytes::from(payload),
212 headers: event.headers,
213 occurred_at_ms: event.occurred_at_ms,
214 })
215 }
216}
217
218#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
219pub struct CompactReport {
220 pub removed: usize,
221 pub remaining: usize,
222 pub latest: Option<EventId>,
223 pub checkpointed: bool,
224}
225
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub struct AppendOutcome {
228 pub event_id: EventId,
229 pub event: LogEvent,
230 pub inserted: bool,
231}
232
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct EventLogDescription {
235 pub backend: EventLogBackendKind,
236 pub location: Option<PathBuf>,
237 pub size_bytes: Option<u64>,
238 pub queue_depth: usize,
239}
240
241#[derive(Debug)]
242pub enum LogError {
243 Config(String),
244 InvalidTopic(String),
245 InvalidConsumer(String),
246 Io(String),
247 Serde(String),
248 Sqlite(String),
249 ConsumerLagged(EventId),
250}
251
252impl fmt::Display for LogError {
253 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254 match self {
255 Self::Config(message)
256 | Self::InvalidTopic(message)
257 | Self::InvalidConsumer(message)
258 | Self::Io(message)
259 | Self::Serde(message)
260 | Self::Sqlite(message) => message.fmt(f),
261 Self::ConsumerLagged(last_id) => {
262 write!(f, "subscriber lagged behind after event {last_id}")
263 }
264 }
265 }
266}
267
268impl std::error::Error for LogError {}
269
270#[allow(async_fn_in_trait)]
271pub trait EventLog: Send + Sync {
272 fn describe(&self) -> EventLogDescription;
273
274 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
275
276 async fn flush(&self) -> Result<(), LogError>;
277
278 async fn read_range(
281 &self,
282 topic: &Topic,
283 from: Option<EventId>,
284 limit: usize,
285 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
286
287 async fn read_range_bytes(
288 &self,
289 topic: &Topic,
290 from: Option<EventId>,
291 limit: usize,
292 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
293 let events = self.read_range(topic, from, limit).await?;
294 events
295 .into_iter()
296 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
297 .collect()
298 }
299
300 async fn subscribe(
303 self: Arc<Self>,
304 topic: &Topic,
305 from: Option<EventId>,
306 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
307
308 async fn ack(
309 &self,
310 topic: &Topic,
311 consumer: &ConsumerId,
312 up_to: EventId,
313 ) -> Result<(), LogError>;
314
315 async fn consumer_cursor(
316 &self,
317 topic: &Topic,
318 consumer: &ConsumerId,
319 ) -> Result<Option<EventId>, LogError>;
320
321 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
322
323 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
324}
325
326#[derive(Clone, Debug)]
327pub struct EventLogConfig {
328 pub backend: EventLogBackendKind,
329 pub file_dir: PathBuf,
330 pub sqlite_path: PathBuf,
331 pub queue_depth: usize,
332}
333
334impl EventLogConfig {
335 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
336 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
337 .ok()
338 .map(|value| value.parse())
339 .transpose()?
340 .unwrap_or(EventLogBackendKind::Sqlite);
341 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
342 .ok()
343 .and_then(|value| value.parse::<usize>().ok())
344 .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
345 .max(1);
346
347 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
348 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
349 _ => crate::runtime_paths::event_log_dir(base_dir),
350 };
351 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
352 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
353 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
354 };
355
356 Ok(Self {
357 backend,
358 file_dir,
359 sqlite_path,
360 queue_depth,
361 })
362 }
363
364 pub fn location(&self) -> Option<PathBuf> {
365 match self.backend {
366 EventLogBackendKind::Memory => None,
367 EventLogBackendKind::File => Some(self.file_dir.clone()),
368 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
369 EventLogBackendKind::Postgres => None,
370 }
371 }
372}
373
374thread_local! {
375 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
376 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
377}
378
379pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
380 let config = EventLogConfig::for_base_dir(base_dir)?;
381 let log = open_event_log(&config)?;
382 ACTIVE_EVENT_LOG.with(|slot| {
383 *slot.borrow_mut() = Some(log.clone());
384 });
385 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
386 *slot.borrow_mut() = None;
387 });
388 Ok(log)
389}
390
391pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
392 let config = EventLogConfig::for_base_dir(base_dir)?;
393 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
394 if !has_active {
395 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
396 *slot.borrow_mut() = Some(config);
397 });
398 }
399 Ok(())
400}
401
402pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
403 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
404 ACTIVE_EVENT_LOG.with(|slot| {
405 *slot.borrow_mut() = Some(log.clone());
406 });
407 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
408 *slot.borrow_mut() = None;
409 });
410 log
411}
412
413pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
414 ACTIVE_EVENT_LOG.with(|slot| {
415 *slot.borrow_mut() = Some(log.clone());
416 });
417 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
418 *slot.borrow_mut() = None;
419 });
420 log
421}
422
423pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
424 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
425 return Some(log);
426 }
427
428 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
429 match open_event_log(&config) {
430 Ok(log) => Some(install_active_event_log(log)),
431 Err(error) => {
432 crate::events::log_warn("event_log.init", &error.to_string());
433 None
434 }
435 }
436}
437
438pub fn reset_active_event_log() {
439 ACTIVE_EVENT_LOG.with(|slot| {
440 *slot.borrow_mut() = None;
441 });
442 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
443 *slot.borrow_mut() = None;
444 });
445}
446
447pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
448 let config = EventLogConfig::for_base_dir(base_dir)?;
449 let description = match config.backend {
450 EventLogBackendKind::Memory => EventLogDescription {
451 backend: EventLogBackendKind::Memory,
452 location: None,
453 size_bytes: None,
454 queue_depth: config.queue_depth,
455 },
456 EventLogBackendKind::File => EventLogDescription {
457 backend: EventLogBackendKind::File,
458 size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
459 location: Some(config.file_dir),
460 queue_depth: config.queue_depth,
461 },
462 EventLogBackendKind::Sqlite => EventLogDescription {
463 backend: EventLogBackendKind::Sqlite,
464 size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
465 location: Some(config.sqlite_path),
466 queue_depth: config.queue_depth,
467 },
468 EventLogBackendKind::Postgres => EventLogDescription {
469 backend: EventLogBackendKind::Postgres,
470 location: None,
471 size_bytes: None,
472 queue_depth: config.queue_depth,
473 },
474 };
475 Ok(description)
476}
477
478pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
479 match config.backend {
480 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
481 config.queue_depth,
482 )))),
483 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
484 config.file_dir.clone(),
485 config.queue_depth,
486 )?))),
487 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
488 config.sqlite_path.clone(),
489 config.queue_depth,
490 )?))),
491 EventLogBackendKind::Postgres => Err(LogError::Config(
492 "postgres event logs are host-provided; the built-in event log factory supports memory, file, and sqlite"
493 .to_string(),
494 )),
495 }
496}
497
498pub enum AnyEventLog {
499 Memory(MemoryEventLog),
500 File(FileEventLog),
501 Sqlite(SqliteEventLog),
502}
503
504impl AnyEventLog {
505 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
506 match self {
507 Self::Memory(log) => log.topics().await,
508 Self::File(log) => log.topics(),
509 Self::Sqlite(log) => log.topics(),
510 }
511 }
512
513 pub async fn append_idempotent_by_header(
514 &self,
515 topic: &Topic,
516 header: &str,
517 value: &str,
518 event: LogEvent,
519 ) -> Result<AppendOutcome, LogError> {
520 if header.trim().is_empty() {
521 return Err(LogError::Config(
522 "idempotent append header cannot be empty".to_string(),
523 ));
524 }
525 match self {
526 Self::Memory(log) => {
527 log.append_idempotent_by_header(topic, header, value, event)
528 .await
529 }
530 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
531 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
532 }
533 }
534
535 pub async fn read_idempotent_by_header(
539 &self,
540 topic: &Topic,
541 header: &str,
542 value: &str,
543 ) -> Result<Option<(EventId, LogEvent)>, LogError> {
544 if header.trim().is_empty() {
545 return Err(LogError::Config(
546 "idempotent read header cannot be empty".to_string(),
547 ));
548 }
549 match self {
550 Self::Memory(log) => log.read_idempotent_by_header(topic, header, value).await,
551 Self::File(log) => log.read_idempotent_by_header(topic, header, value),
552 Self::Sqlite(log) => log.read_idempotent_by_header(topic, header, value),
553 }
554 }
555}
556
557impl EventLog for AnyEventLog {
558 fn describe(&self) -> EventLogDescription {
559 match self {
560 Self::Memory(log) => log.describe(),
561 Self::File(log) => log.describe(),
562 Self::Sqlite(log) => log.describe(),
563 }
564 }
565
566 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
567 match self {
568 Self::Memory(log) => log.append(topic, event).await,
569 Self::File(log) => log.append(topic, event).await,
570 Self::Sqlite(log) => log.append(topic, event).await,
571 }
572 }
573
574 async fn flush(&self) -> Result<(), LogError> {
575 match self {
576 Self::Memory(log) => log.flush().await,
577 Self::File(log) => log.flush().await,
578 Self::Sqlite(log) => log.flush().await,
579 }
580 }
581
582 async fn read_range(
583 &self,
584 topic: &Topic,
585 from: Option<EventId>,
586 limit: usize,
587 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
588 match self {
589 Self::Memory(log) => log.read_range(topic, from, limit).await,
590 Self::File(log) => log.read_range(topic, from, limit).await,
591 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
592 }
593 }
594
595 async fn read_range_bytes(
596 &self,
597 topic: &Topic,
598 from: Option<EventId>,
599 limit: usize,
600 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
601 match self {
602 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
603 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
604 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
605 }
606 }
607
608 async fn subscribe(
609 self: Arc<Self>,
610 topic: &Topic,
611 from: Option<EventId>,
612 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
613 let (rx, queue_depth) = match self.as_ref() {
614 Self::Memory(log) => (
615 log.broadcasts.subscribe(topic, log.queue_depth),
616 log.queue_depth,
617 ),
618 Self::File(log) => (
619 log.broadcasts.subscribe(topic, log.queue_depth),
620 log.queue_depth,
621 ),
622 Self::Sqlite(log) => (
623 log.broadcasts.subscribe(topic, log.queue_depth),
624 log.queue_depth,
625 ),
626 };
627 let history = self.read_range(topic, from, usize::MAX).await?;
628 Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
629 }
630
631 async fn ack(
632 &self,
633 topic: &Topic,
634 consumer: &ConsumerId,
635 up_to: EventId,
636 ) -> Result<(), LogError> {
637 match self {
638 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
639 Self::File(log) => log.ack(topic, consumer, up_to).await,
640 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
641 }
642 }
643
644 async fn consumer_cursor(
645 &self,
646 topic: &Topic,
647 consumer: &ConsumerId,
648 ) -> Result<Option<EventId>, LogError> {
649 match self {
650 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
651 Self::File(log) => log.consumer_cursor(topic, consumer).await,
652 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
653 }
654 }
655
656 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
657 match self {
658 Self::Memory(log) => log.latest(topic).await,
659 Self::File(log) => log.latest(topic).await,
660 Self::Sqlite(log) => log.latest(topic).await,
661 }
662 }
663
664 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
665 match self {
666 Self::Memory(log) => log.compact(topic, before).await,
667 Self::File(log) => log.compact(topic, before).await,
668 Self::Sqlite(log) => log.compact(topic, before).await,
669 }
670 }
671}
672
673pub fn sanitize_topic_component(value: &str) -> String {
674 value
675 .chars()
676 .map(|ch| {
677 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
678 ch
679 } else {
680 '_'
681 }
682 })
683 .collect()
684}