1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use serde::{Deserialize, Serialize};
11
12use crate::runtime_limits::RuntimeLimits;
13
14mod file;
15mod memory;
16mod sqlite;
17mod util;
18
19#[cfg(test)]
20mod tests;
21
22pub use file::FileEventLog;
23pub use memory::MemoryEventLog;
24pub use sqlite::SqliteEventLog;
25
26pub type EventId = u64;
27
28pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
29pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
30pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
31pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
32
33#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
34pub struct Topic(String);
35
36impl Topic {
37 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
38 let value = value.into();
39 if value.is_empty() {
40 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
41 }
42 if !value
43 .chars()
44 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
45 {
46 return Err(LogError::InvalidTopic(format!(
47 "topic '{value}' contains unsupported characters"
48 )));
49 }
50 Ok(Self(value))
51 }
52
53 pub fn as_str(&self) -> &str {
54 &self.0
55 }
56}
57
58impl fmt::Display for Topic {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 self.0.fmt(f)
61 }
62}
63
64impl FromStr for Topic {
65 type Err = LogError;
66
67 fn from_str(s: &str) -> Result<Self, Self::Err> {
68 Self::new(s)
69 }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
73pub struct ConsumerId(String);
74
75impl ConsumerId {
76 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
77 let value = value.into();
78 if value.trim().is_empty() {
79 return Err(LogError::InvalidConsumer(
80 "consumer id cannot be empty".to_string(),
81 ));
82 }
83 Ok(Self(value))
84 }
85
86 pub fn as_str(&self) -> &str {
87 &self.0
88 }
89}
90
91impl fmt::Display for ConsumerId {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 self.0.fmt(f)
94 }
95}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99pub enum EventLogBackendKind {
100 Memory,
101 File,
102 Sqlite,
103}
104
105impl fmt::Display for EventLogBackendKind {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 match self {
108 Self::Memory => write!(f, "memory"),
109 Self::File => write!(f, "file"),
110 Self::Sqlite => write!(f, "sqlite"),
111 }
112 }
113}
114
115impl FromStr for EventLogBackendKind {
116 type Err = LogError;
117
118 fn from_str(value: &str) -> Result<Self, Self::Err> {
119 match value.trim().to_ascii_lowercase().as_str() {
120 "memory" => Ok(Self::Memory),
121 "file" => Ok(Self::File),
122 "sqlite" => Ok(Self::Sqlite),
123 other => Err(LogError::Config(format!(
124 "unsupported event log backend '{other}'"
125 ))),
126 }
127 }
128}
129
130#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
131pub struct LogEvent {
132 pub kind: String,
133 pub payload: serde_json::Value,
134 #[serde(default)]
135 pub headers: BTreeMap<String, String>,
136 pub occurred_at_ms: i64,
137}
138
139impl LogEvent {
140 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
141 Self {
142 kind: kind.into(),
143 payload,
144 headers: BTreeMap::new(),
145 occurred_at_ms: util::now_ms(),
146 }
147 }
148
149 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
150 self.headers = headers;
151 self
152 }
153
154 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
160 self.headers = policy.redact_headers(&self.headers);
161 policy.redact_json_in_place(&mut self.payload);
162 }
163}
164
165#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct LogEventBytes {
173 pub kind: String,
174 pub payload: Bytes,
175 pub headers: BTreeMap<String, String>,
176 pub occurred_at_ms: i64,
177}
178
179impl LogEventBytes {
180 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
181 serde_json::from_slice(&self.payload)
182 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
183 }
184
185 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
186 Ok(LogEvent {
187 kind: self.kind,
188 payload: serde_json::from_slice(&self.payload).map_err(|error| {
189 LogError::Serde(format!("event log payload parse error: {error}"))
190 })?,
191 headers: self.headers,
192 occurred_at_ms: self.occurred_at_ms,
193 })
194 }
195}
196
197impl TryFrom<LogEvent> for LogEventBytes {
198 type Error = LogError;
199
200 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
201 let payload = serde_json::to_vec(&event.payload)
202 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
203 Ok(Self {
204 kind: event.kind,
205 payload: Bytes::from(payload),
206 headers: event.headers,
207 occurred_at_ms: event.occurred_at_ms,
208 })
209 }
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
213pub struct CompactReport {
214 pub removed: usize,
215 pub remaining: usize,
216 pub latest: Option<EventId>,
217 pub checkpointed: bool,
218}
219
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct AppendOutcome {
222 pub event_id: EventId,
223 pub event: LogEvent,
224 pub inserted: bool,
225}
226
227#[derive(Clone, Debug, PartialEq, Eq)]
228pub struct EventLogDescription {
229 pub backend: EventLogBackendKind,
230 pub location: Option<PathBuf>,
231 pub size_bytes: Option<u64>,
232 pub queue_depth: usize,
233}
234
235#[derive(Debug)]
236pub enum LogError {
237 Config(String),
238 InvalidTopic(String),
239 InvalidConsumer(String),
240 Io(String),
241 Serde(String),
242 Sqlite(String),
243 ConsumerLagged(EventId),
244}
245
246impl fmt::Display for LogError {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 match self {
249 Self::Config(message)
250 | Self::InvalidTopic(message)
251 | Self::InvalidConsumer(message)
252 | Self::Io(message)
253 | Self::Serde(message)
254 | Self::Sqlite(message) => message.fmt(f),
255 Self::ConsumerLagged(last_id) => {
256 write!(f, "subscriber lagged behind after event {last_id}")
257 }
258 }
259 }
260}
261
262impl std::error::Error for LogError {}
263
264#[allow(async_fn_in_trait)]
265pub trait EventLog: Send + Sync {
266 fn describe(&self) -> EventLogDescription;
267
268 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
269
270 async fn flush(&self) -> Result<(), LogError>;
271
272 async fn read_range(
275 &self,
276 topic: &Topic,
277 from: Option<EventId>,
278 limit: usize,
279 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
280
281 async fn read_range_bytes(
282 &self,
283 topic: &Topic,
284 from: Option<EventId>,
285 limit: usize,
286 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
287 let events = self.read_range(topic, from, limit).await?;
288 events
289 .into_iter()
290 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
291 .collect()
292 }
293
294 async fn subscribe(
297 self: Arc<Self>,
298 topic: &Topic,
299 from: Option<EventId>,
300 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
301
302 async fn ack(
303 &self,
304 topic: &Topic,
305 consumer: &ConsumerId,
306 up_to: EventId,
307 ) -> Result<(), LogError>;
308
309 async fn consumer_cursor(
310 &self,
311 topic: &Topic,
312 consumer: &ConsumerId,
313 ) -> Result<Option<EventId>, LogError>;
314
315 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
316
317 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
318}
319
320#[derive(Clone, Debug)]
321pub struct EventLogConfig {
322 pub backend: EventLogBackendKind,
323 pub file_dir: PathBuf,
324 pub sqlite_path: PathBuf,
325 pub queue_depth: usize,
326}
327
328impl EventLogConfig {
329 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
330 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
331 .ok()
332 .map(|value| value.parse())
333 .transpose()?
334 .unwrap_or(EventLogBackendKind::Sqlite);
335 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
336 .ok()
337 .and_then(|value| value.parse::<usize>().ok())
338 .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
339 .max(1);
340
341 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
342 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
343 _ => crate::runtime_paths::event_log_dir(base_dir),
344 };
345 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
346 Ok(value) if !value.trim().is_empty() => util::resolve_path(base_dir, &value),
347 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
348 };
349
350 Ok(Self {
351 backend,
352 file_dir,
353 sqlite_path,
354 queue_depth,
355 })
356 }
357
358 pub fn location(&self) -> Option<PathBuf> {
359 match self.backend {
360 EventLogBackendKind::Memory => None,
361 EventLogBackendKind::File => Some(self.file_dir.clone()),
362 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
363 }
364 }
365}
366
367thread_local! {
368 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
369 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
370}
371
372pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
373 let config = EventLogConfig::for_base_dir(base_dir)?;
374 let log = open_event_log(&config)?;
375 ACTIVE_EVENT_LOG.with(|slot| {
376 *slot.borrow_mut() = Some(log.clone());
377 });
378 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
379 *slot.borrow_mut() = None;
380 });
381 Ok(log)
382}
383
384pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
385 let config = EventLogConfig::for_base_dir(base_dir)?;
386 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
387 if !has_active {
388 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
389 *slot.borrow_mut() = Some(config);
390 });
391 }
392 Ok(())
393}
394
395pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
396 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
397 ACTIVE_EVENT_LOG.with(|slot| {
398 *slot.borrow_mut() = Some(log.clone());
399 });
400 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
401 *slot.borrow_mut() = None;
402 });
403 log
404}
405
406pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
407 ACTIVE_EVENT_LOG.with(|slot| {
408 *slot.borrow_mut() = Some(log.clone());
409 });
410 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
411 *slot.borrow_mut() = None;
412 });
413 log
414}
415
416pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
417 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
418 return Some(log);
419 }
420
421 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
422 match open_event_log(&config) {
423 Ok(log) => Some(install_active_event_log(log)),
424 Err(error) => {
425 crate::events::log_warn("event_log.init", &error.to_string());
426 None
427 }
428 }
429}
430
431pub fn reset_active_event_log() {
432 ACTIVE_EVENT_LOG.with(|slot| {
433 *slot.borrow_mut() = None;
434 });
435 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
436 *slot.borrow_mut() = None;
437 });
438}
439
440pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
441 let config = EventLogConfig::for_base_dir(base_dir)?;
442 let description = match config.backend {
443 EventLogBackendKind::Memory => EventLogDescription {
444 backend: EventLogBackendKind::Memory,
445 location: None,
446 size_bytes: None,
447 queue_depth: config.queue_depth,
448 },
449 EventLogBackendKind::File => EventLogDescription {
450 backend: EventLogBackendKind::File,
451 size_bytes: Some(util::dir_size_bytes(&config.file_dir)),
452 location: Some(config.file_dir),
453 queue_depth: config.queue_depth,
454 },
455 EventLogBackendKind::Sqlite => EventLogDescription {
456 backend: EventLogBackendKind::Sqlite,
457 size_bytes: Some(util::sqlite_size_bytes(&config.sqlite_path)),
458 location: Some(config.sqlite_path),
459 queue_depth: config.queue_depth,
460 },
461 };
462 Ok(description)
463}
464
465pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
466 match config.backend {
467 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
468 config.queue_depth,
469 )))),
470 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
471 config.file_dir.clone(),
472 config.queue_depth,
473 )?))),
474 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
475 config.sqlite_path.clone(),
476 config.queue_depth,
477 )?))),
478 }
479}
480
481pub enum AnyEventLog {
482 Memory(MemoryEventLog),
483 File(FileEventLog),
484 Sqlite(SqliteEventLog),
485}
486
487impl AnyEventLog {
488 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
489 match self {
490 Self::Memory(log) => log.topics().await,
491 Self::File(log) => log.topics(),
492 Self::Sqlite(log) => log.topics(),
493 }
494 }
495
496 pub async fn append_idempotent_by_header(
497 &self,
498 topic: &Topic,
499 header: &str,
500 value: &str,
501 event: LogEvent,
502 ) -> Result<AppendOutcome, LogError> {
503 if header.trim().is_empty() {
504 return Err(LogError::Config(
505 "idempotent append header cannot be empty".to_string(),
506 ));
507 }
508 match self {
509 Self::Memory(log) => {
510 log.append_idempotent_by_header(topic, header, value, event)
511 .await
512 }
513 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
514 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
515 }
516 }
517}
518
519impl EventLog for AnyEventLog {
520 fn describe(&self) -> EventLogDescription {
521 match self {
522 Self::Memory(log) => log.describe(),
523 Self::File(log) => log.describe(),
524 Self::Sqlite(log) => log.describe(),
525 }
526 }
527
528 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
529 match self {
530 Self::Memory(log) => log.append(topic, event).await,
531 Self::File(log) => log.append(topic, event).await,
532 Self::Sqlite(log) => log.append(topic, event).await,
533 }
534 }
535
536 async fn flush(&self) -> Result<(), LogError> {
537 match self {
538 Self::Memory(log) => log.flush().await,
539 Self::File(log) => log.flush().await,
540 Self::Sqlite(log) => log.flush().await,
541 }
542 }
543
544 async fn read_range(
545 &self,
546 topic: &Topic,
547 from: Option<EventId>,
548 limit: usize,
549 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
550 match self {
551 Self::Memory(log) => log.read_range(topic, from, limit).await,
552 Self::File(log) => log.read_range(topic, from, limit).await,
553 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
554 }
555 }
556
557 async fn read_range_bytes(
558 &self,
559 topic: &Topic,
560 from: Option<EventId>,
561 limit: usize,
562 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
563 match self {
564 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
565 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
566 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
567 }
568 }
569
570 async fn subscribe(
571 self: Arc<Self>,
572 topic: &Topic,
573 from: Option<EventId>,
574 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
575 let (rx, queue_depth) = match self.as_ref() {
576 Self::Memory(log) => (
577 log.broadcasts.subscribe(topic, log.queue_depth),
578 log.queue_depth,
579 ),
580 Self::File(log) => (
581 log.broadcasts.subscribe(topic, log.queue_depth),
582 log.queue_depth,
583 ),
584 Self::Sqlite(log) => (
585 log.broadcasts.subscribe(topic, log.queue_depth),
586 log.queue_depth,
587 ),
588 };
589 let history = self.read_range(topic, from, usize::MAX).await?;
590 Ok(util::stream_from_broadcast(history, from, rx, queue_depth))
591 }
592
593 async fn ack(
594 &self,
595 topic: &Topic,
596 consumer: &ConsumerId,
597 up_to: EventId,
598 ) -> Result<(), LogError> {
599 match self {
600 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
601 Self::File(log) => log.ack(topic, consumer, up_to).await,
602 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
603 }
604 }
605
606 async fn consumer_cursor(
607 &self,
608 topic: &Topic,
609 consumer: &ConsumerId,
610 ) -> Result<Option<EventId>, LogError> {
611 match self {
612 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
613 Self::File(log) => log.consumer_cursor(topic, consumer).await,
614 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
615 }
616 }
617
618 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
619 match self {
620 Self::Memory(log) => log.latest(topic).await,
621 Self::File(log) => log.latest(topic).await,
622 Self::Sqlite(log) => log.latest(topic).await,
623 }
624 }
625
626 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
627 match self {
628 Self::Memory(log) => log.compact(topic, before).await,
629 Self::File(log) => log.compact(topic, before).await,
630 Self::Sqlite(log) => log.compact(topic, before).await,
631 }
632 }
633}
634
635pub fn sanitize_topic_component(value: &str) -> String {
636 value
637 .chars()
638 .map(|ch| {
639 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
640 ch
641 } else {
642 '_'
643 }
644 })
645 .collect()
646}