1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27 let value = value.into();
28 if value.is_empty() {
29 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30 }
31 if !value
32 .chars()
33 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34 {
35 return Err(LogError::InvalidTopic(format!(
36 "topic '{value}' contains unsupported characters"
37 )));
38 }
39 Ok(Self(value))
40 }
41
42 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl fmt::Display for Topic {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 self.0.fmt(f)
50 }
51}
52
53impl FromStr for Topic {
54 type Err = LogError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 Self::new(s)
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66 let value = value.into();
67 if value.trim().is_empty() {
68 return Err(LogError::InvalidConsumer(
69 "consumer id cannot be empty".to_string(),
70 ));
71 }
72 Ok(Self(value))
73 }
74
75 pub fn as_str(&self) -> &str {
76 &self.0
77 }
78}
79
80impl fmt::Display for ConsumerId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 self.0.fmt(f)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89 Memory,
90 File,
91 Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::Memory => write!(f, "memory"),
98 Self::File => write!(f, "file"),
99 Self::Sqlite => write!(f, "sqlite"),
100 }
101 }
102}
103
104impl FromStr for EventLogBackendKind {
105 type Err = LogError;
106
107 fn from_str(value: &str) -> Result<Self, Self::Err> {
108 match value.trim().to_ascii_lowercase().as_str() {
109 "memory" => Ok(Self::Memory),
110 "file" => Ok(Self::File),
111 "sqlite" => Ok(Self::Sqlite),
112 other => Err(LogError::Config(format!(
113 "unsupported event log backend '{other}'"
114 ))),
115 }
116 }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121 pub kind: String,
122 pub payload: serde_json::Value,
123 #[serde(default)]
124 pub headers: BTreeMap<String, String>,
125 pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130 Self {
131 kind: kind.into(),
132 payload,
133 headers: BTreeMap::new(),
134 occurred_at_ms: now_ms(),
135 }
136 }
137
138 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139 self.headers = headers;
140 self
141 }
142
143 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
149 self.headers = policy.redact_headers(&self.headers);
150 policy.redact_json_in_place(&mut self.payload);
151 }
152}
153
154#[derive(Clone, Debug, PartialEq, Eq)]
161pub struct LogEventBytes {
162 pub kind: String,
163 pub payload: Bytes,
164 pub headers: BTreeMap<String, String>,
165 pub occurred_at_ms: i64,
166}
167
168impl LogEventBytes {
169 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
170 serde_json::from_slice(&self.payload)
171 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
172 }
173
174 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
175 Ok(LogEvent {
176 kind: self.kind,
177 payload: serde_json::from_slice(&self.payload).map_err(|error| {
178 LogError::Serde(format!("event log payload parse error: {error}"))
179 })?,
180 headers: self.headers,
181 occurred_at_ms: self.occurred_at_ms,
182 })
183 }
184}
185
186impl TryFrom<LogEvent> for LogEventBytes {
187 type Error = LogError;
188
189 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
190 let payload = serde_json::to_vec(&event.payload)
191 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
192 Ok(Self {
193 kind: event.kind,
194 payload: Bytes::from(payload),
195 headers: event.headers,
196 occurred_at_ms: event.occurred_at_ms,
197 })
198 }
199}
200
201#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
202pub struct CompactReport {
203 pub removed: usize,
204 pub remaining: usize,
205 pub latest: Option<EventId>,
206 pub checkpointed: bool,
207}
208
209#[derive(Clone, Debug, PartialEq)]
210pub struct AppendOutcome {
211 pub event_id: EventId,
212 pub event: LogEvent,
213 pub inserted: bool,
214}
215
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct EventLogDescription {
218 pub backend: EventLogBackendKind,
219 pub location: Option<PathBuf>,
220 pub size_bytes: Option<u64>,
221 pub queue_depth: usize,
222}
223
224#[derive(Debug)]
225pub enum LogError {
226 Config(String),
227 InvalidTopic(String),
228 InvalidConsumer(String),
229 Io(String),
230 Serde(String),
231 Sqlite(String),
232 ConsumerLagged(EventId),
233}
234
235impl fmt::Display for LogError {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 match self {
238 Self::Config(message)
239 | Self::InvalidTopic(message)
240 | Self::InvalidConsumer(message)
241 | Self::Io(message)
242 | Self::Serde(message)
243 | Self::Sqlite(message) => message.fmt(f),
244 Self::ConsumerLagged(last_id) => {
245 write!(f, "subscriber lagged behind after event {last_id}")
246 }
247 }
248 }
249}
250
251impl std::error::Error for LogError {}
252
253#[allow(async_fn_in_trait)]
254pub trait EventLog: Send + Sync {
255 fn describe(&self) -> EventLogDescription;
256
257 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
258
259 async fn flush(&self) -> Result<(), LogError>;
260
261 async fn read_range(
264 &self,
265 topic: &Topic,
266 from: Option<EventId>,
267 limit: usize,
268 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
269
270 async fn read_range_bytes(
271 &self,
272 topic: &Topic,
273 from: Option<EventId>,
274 limit: usize,
275 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
276 let events = self.read_range(topic, from, limit).await?;
277 events
278 .into_iter()
279 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
280 .collect()
281 }
282
283 async fn subscribe(
286 self: Arc<Self>,
287 topic: &Topic,
288 from: Option<EventId>,
289 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
290
291 async fn ack(
292 &self,
293 topic: &Topic,
294 consumer: &ConsumerId,
295 up_to: EventId,
296 ) -> Result<(), LogError>;
297
298 async fn consumer_cursor(
299 &self,
300 topic: &Topic,
301 consumer: &ConsumerId,
302 ) -> Result<Option<EventId>, LogError>;
303
304 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
305
306 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
307}
308
309#[derive(Clone, Debug)]
310pub struct EventLogConfig {
311 pub backend: EventLogBackendKind,
312 pub file_dir: PathBuf,
313 pub sqlite_path: PathBuf,
314 pub queue_depth: usize,
315}
316
317impl EventLogConfig {
318 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
319 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
320 .ok()
321 .map(|value| value.parse())
322 .transpose()?
323 .unwrap_or(EventLogBackendKind::Sqlite);
324 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
325 .ok()
326 .and_then(|value| value.parse::<usize>().ok())
327 .unwrap_or(128)
328 .max(1);
329
330 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
331 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
332 _ => crate::runtime_paths::event_log_dir(base_dir),
333 };
334 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
335 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
336 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
337 };
338
339 Ok(Self {
340 backend,
341 file_dir,
342 sqlite_path,
343 queue_depth,
344 })
345 }
346
347 pub fn location(&self) -> Option<PathBuf> {
348 match self.backend {
349 EventLogBackendKind::Memory => None,
350 EventLogBackendKind::File => Some(self.file_dir.clone()),
351 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
352 }
353 }
354}
355
356thread_local! {
357 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
358 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
359}
360
361pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
362 let config = EventLogConfig::for_base_dir(base_dir)?;
363 let log = open_event_log(&config)?;
364 ACTIVE_EVENT_LOG.with(|slot| {
365 *slot.borrow_mut() = Some(log.clone());
366 });
367 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
368 *slot.borrow_mut() = None;
369 });
370 Ok(log)
371}
372
373pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
374 let config = EventLogConfig::for_base_dir(base_dir)?;
375 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
376 if !has_active {
377 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
378 *slot.borrow_mut() = Some(config);
379 });
380 }
381 Ok(())
382}
383
384pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
385 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
386 ACTIVE_EVENT_LOG.with(|slot| {
387 *slot.borrow_mut() = Some(log.clone());
388 });
389 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
390 *slot.borrow_mut() = None;
391 });
392 log
393}
394
395pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
396 ACTIVE_EVENT_LOG.with(|slot| {
397 *slot.borrow_mut() = Some(log.clone());
398 });
399 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
400 *slot.borrow_mut() = None;
401 });
402 log
403}
404
405pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
406 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
407 return Some(log);
408 }
409
410 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
411 match open_event_log(&config) {
412 Ok(log) => Some(install_active_event_log(log)),
413 Err(error) => {
414 crate::events::log_warn("event_log.init", &error.to_string());
415 None
416 }
417 }
418}
419
420pub fn reset_active_event_log() {
421 ACTIVE_EVENT_LOG.with(|slot| {
422 *slot.borrow_mut() = None;
423 });
424 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
425 *slot.borrow_mut() = None;
426 });
427}
428
429pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
430 let config = EventLogConfig::for_base_dir(base_dir)?;
431 let description = match config.backend {
432 EventLogBackendKind::Memory => EventLogDescription {
433 backend: EventLogBackendKind::Memory,
434 location: None,
435 size_bytes: None,
436 queue_depth: config.queue_depth,
437 },
438 EventLogBackendKind::File => EventLogDescription {
439 backend: EventLogBackendKind::File,
440 size_bytes: Some(dir_size_bytes(&config.file_dir)),
441 location: Some(config.file_dir),
442 queue_depth: config.queue_depth,
443 },
444 EventLogBackendKind::Sqlite => EventLogDescription {
445 backend: EventLogBackendKind::Sqlite,
446 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
447 location: Some(config.sqlite_path),
448 queue_depth: config.queue_depth,
449 },
450 };
451 Ok(description)
452}
453
454pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
455 match config.backend {
456 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
457 config.queue_depth,
458 )))),
459 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
460 config.file_dir.clone(),
461 config.queue_depth,
462 )?))),
463 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
464 config.sqlite_path.clone(),
465 config.queue_depth,
466 )?))),
467 }
468}
469
470pub enum AnyEventLog {
471 Memory(MemoryEventLog),
472 File(FileEventLog),
473 Sqlite(SqliteEventLog),
474}
475
476impl AnyEventLog {
477 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
478 match self {
479 Self::Memory(log) => log.topics().await,
480 Self::File(log) => log.topics(),
481 Self::Sqlite(log) => log.topics(),
482 }
483 }
484
485 pub async fn append_idempotent_by_header(
486 &self,
487 topic: &Topic,
488 header: &str,
489 value: &str,
490 event: LogEvent,
491 ) -> Result<AppendOutcome, LogError> {
492 if header.trim().is_empty() {
493 return Err(LogError::Config(
494 "idempotent append header cannot be empty".to_string(),
495 ));
496 }
497 match self {
498 Self::Memory(log) => {
499 log.append_idempotent_by_header(topic, header, value, event)
500 .await
501 }
502 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
503 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
504 }
505 }
506}
507
508impl EventLog for AnyEventLog {
509 fn describe(&self) -> EventLogDescription {
510 match self {
511 Self::Memory(log) => log.describe(),
512 Self::File(log) => log.describe(),
513 Self::Sqlite(log) => log.describe(),
514 }
515 }
516
517 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
518 match self {
519 Self::Memory(log) => log.append(topic, event).await,
520 Self::File(log) => log.append(topic, event).await,
521 Self::Sqlite(log) => log.append(topic, event).await,
522 }
523 }
524
525 async fn flush(&self) -> Result<(), LogError> {
526 match self {
527 Self::Memory(log) => log.flush().await,
528 Self::File(log) => log.flush().await,
529 Self::Sqlite(log) => log.flush().await,
530 }
531 }
532
533 async fn read_range(
534 &self,
535 topic: &Topic,
536 from: Option<EventId>,
537 limit: usize,
538 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
539 match self {
540 Self::Memory(log) => log.read_range(topic, from, limit).await,
541 Self::File(log) => log.read_range(topic, from, limit).await,
542 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
543 }
544 }
545
546 async fn read_range_bytes(
547 &self,
548 topic: &Topic,
549 from: Option<EventId>,
550 limit: usize,
551 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
552 match self {
553 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
554 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
555 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
556 }
557 }
558
559 async fn subscribe(
560 self: Arc<Self>,
561 topic: &Topic,
562 from: Option<EventId>,
563 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
564 let (rx, queue_depth) = match self.as_ref() {
565 Self::Memory(log) => (
566 log.broadcasts.subscribe(topic, log.queue_depth),
567 log.queue_depth,
568 ),
569 Self::File(log) => (
570 log.broadcasts.subscribe(topic, log.queue_depth),
571 log.queue_depth,
572 ),
573 Self::Sqlite(log) => (
574 log.broadcasts.subscribe(topic, log.queue_depth),
575 log.queue_depth,
576 ),
577 };
578 let history = self.read_range(topic, from, usize::MAX).await?;
579 Ok(stream_from_broadcast(history, from, rx, queue_depth))
580 }
581
582 async fn ack(
583 &self,
584 topic: &Topic,
585 consumer: &ConsumerId,
586 up_to: EventId,
587 ) -> Result<(), LogError> {
588 match self {
589 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
590 Self::File(log) => log.ack(topic, consumer, up_to).await,
591 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
592 }
593 }
594
595 async fn consumer_cursor(
596 &self,
597 topic: &Topic,
598 consumer: &ConsumerId,
599 ) -> Result<Option<EventId>, LogError> {
600 match self {
601 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
602 Self::File(log) => log.consumer_cursor(topic, consumer).await,
603 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
604 }
605 }
606
607 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
608 match self {
609 Self::Memory(log) => log.latest(topic).await,
610 Self::File(log) => log.latest(topic).await,
611 Self::Sqlite(log) => log.latest(topic).await,
612 }
613 }
614
615 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
616 match self {
617 Self::Memory(log) => log.compact(topic, before).await,
618 Self::File(log) => log.compact(topic, before).await,
619 Self::Sqlite(log) => log.compact(topic, before).await,
620 }
621 }
622}
623
624#[derive(Default)]
625struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
626
627impl BroadcastMap {
628 fn subscribe(
629 &self,
630 topic: &Topic,
631 capacity: usize,
632 ) -> broadcast::Receiver<(EventId, LogEvent)> {
633 self.sender(topic, capacity).subscribe()
634 }
635
636 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
637 let _ = self.sender(topic, capacity).send(record);
638 }
639
640 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
641 let mut map = self.0.lock().expect("event log broadcast map poisoned");
642 map.entry(topic.as_str().to_string())
643 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
644 .clone()
645 }
646}
647
648fn stream_from_broadcast(
649 history: Vec<(EventId, LogEvent)>,
650 from: Option<EventId>,
651 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
652 queue_depth: usize,
653) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
654 let (tx, rx) = mpsc::channel(queue_depth.max(1));
655 tokio::spawn(async move {
663 let mut last_seen = from.unwrap_or(0);
664 for (event_id, event) in history {
665 last_seen = event_id;
666 if tx.send(Ok((event_id, event))).await.is_err() {
667 return;
668 }
669 }
670
671 loop {
672 tokio::select! {
673 _ = tx.closed() => return,
674 received = live_rx.recv() => {
675 match received {
676 Ok((event_id, event)) if event_id > last_seen => {
677 last_seen = event_id;
678 if tx.send(Ok((event_id, event))).await.is_err() {
679 return;
680 }
681 }
682 Ok(_) => {}
683 Err(broadcast::error::RecvError::Closed) => return,
684 Err(broadcast::error::RecvError::Lagged(_)) => {
685 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
686 return;
687 }
688 }
689 }
690 }
691 }
692 });
693 Box::pin(ReceiverStream::new(rx))
694}
695
696fn prepare_event_after(
697 topic: &Topic,
698 event_id: EventId,
699 previous: Option<(EventId, &LogEvent)>,
700 event: LogEvent,
701) -> Result<LogEvent, LogError> {
702 let previous_hash = previous
703 .map(|(previous_id, previous_event)| {
704 crate::provenance::event_record_hash_from_headers(
705 topic.as_str(),
706 previous_id,
707 previous_event,
708 )
709 })
710 .transpose()?;
711 crate::provenance::prepare_event_for_append(topic.as_str(), event_id, previous_hash, event)
712}
713
714#[derive(Default)]
715struct MemoryState {
716 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
717 latest: HashMap<String, EventId>,
718 consumers: HashMap<(String, String), EventId>,
719}
720
721pub struct MemoryEventLog {
722 state: Mutex<MemoryState>,
723 broadcasts: BroadcastMap,
724 queue_depth: usize,
725}
726
727impl MemoryEventLog {
728 pub fn new(queue_depth: usize) -> Self {
729 Self {
730 state: Mutex::new(MemoryState::default()),
731 broadcasts: BroadcastMap::default(),
732 queue_depth: queue_depth.max(1),
733 }
734 }
735
736 fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
737 self.state
738 .lock()
739 .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
740 }
741
742 async fn topics(&self) -> Result<Vec<Topic>, LogError> {
743 let state = self.state()?;
744 let mut topics = state
745 .topics
746 .keys()
747 .map(|topic| Topic::new(topic.clone()))
748 .collect::<Result<Vec<_>, _>>()?;
749 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
750 Ok(topics)
751 }
752
753 async fn append_idempotent_by_header(
754 &self,
755 topic: &Topic,
756 header: &str,
757 value: &str,
758 event: LogEvent,
759 ) -> Result<AppendOutcome, LogError> {
760 let mut state = self.state()?;
761 if let Some((event_id, existing)) = state
762 .topics
763 .get(topic.as_str())
764 .into_iter()
765 .flat_map(|events| events.iter())
766 .find(|(_, event)| {
767 event
768 .headers
769 .get(header)
770 .is_some_and(|found| found == value)
771 })
772 {
773 return Ok(AppendOutcome {
774 event_id: *event_id,
775 event: existing.clone(),
776 inserted: false,
777 });
778 }
779
780 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
781 let previous = state
782 .topics
783 .get(topic.as_str())
784 .and_then(|events| events.back())
785 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
786 let event = prepare_event_after(topic, event_id, previous, event)?;
787 state.latest.insert(topic.as_str().to_string(), event_id);
788 state
789 .topics
790 .entry(topic.as_str().to_string())
791 .or_default()
792 .push_back((event_id, event.clone()));
793 drop(state);
794 self.broadcasts
795 .publish(topic, self.queue_depth, (event_id, event.clone()));
796 Ok(AppendOutcome {
797 event_id,
798 event,
799 inserted: true,
800 })
801 }
802}
803
804impl EventLog for MemoryEventLog {
805 fn describe(&self) -> EventLogDescription {
806 EventLogDescription {
807 backend: EventLogBackendKind::Memory,
808 location: None,
809 size_bytes: None,
810 queue_depth: self.queue_depth,
811 }
812 }
813
814 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
815 let mut state = self.state()?;
816 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
817 let previous = state
818 .topics
819 .get(topic.as_str())
820 .and_then(|events| events.back())
821 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
822 let event = prepare_event_after(topic, event_id, previous, event)?;
823 state.latest.insert(topic.as_str().to_string(), event_id);
824 state
825 .topics
826 .entry(topic.as_str().to_string())
827 .or_default()
828 .push_back((event_id, event.clone()));
829 drop(state);
830 self.broadcasts
831 .publish(topic, self.queue_depth, (event_id, event));
832 Ok(event_id)
833 }
834
835 async fn flush(&self) -> Result<(), LogError> {
836 Ok(())
837 }
838
839 async fn read_range(
840 &self,
841 topic: &Topic,
842 from: Option<EventId>,
843 limit: usize,
844 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
845 let from = from.unwrap_or(0);
846 let state = self.state()?;
847 let events = state
848 .topics
849 .get(topic.as_str())
850 .into_iter()
851 .flat_map(|events| events.iter())
852 .filter(|(event_id, _)| *event_id > from)
853 .take(limit)
854 .map(|(event_id, event)| (*event_id, event.clone()))
855 .collect();
856 Ok(events)
857 }
858
859 async fn subscribe(
860 self: Arc<Self>,
861 topic: &Topic,
862 from: Option<EventId>,
863 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
864 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
865 let history = self.read_range(topic, from, usize::MAX).await?;
866 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
867 }
868
869 async fn ack(
870 &self,
871 topic: &Topic,
872 consumer: &ConsumerId,
873 up_to: EventId,
874 ) -> Result<(), LogError> {
875 let mut state = self.state()?;
876 state.consumers.insert(
877 (topic.as_str().to_string(), consumer.as_str().to_string()),
878 up_to,
879 );
880 Ok(())
881 }
882
883 async fn consumer_cursor(
884 &self,
885 topic: &Topic,
886 consumer: &ConsumerId,
887 ) -> Result<Option<EventId>, LogError> {
888 let state = self.state()?;
889 Ok(state
890 .consumers
891 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
892 .copied())
893 }
894
895 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
896 let state = self.state()?;
897 Ok(state.latest.get(topic.as_str()).copied())
898 }
899
900 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
901 let mut state = self.state()?;
902 let Some(events) = state.topics.get_mut(topic.as_str()) else {
903 return Ok(CompactReport::default());
904 };
905 let removed = events
906 .iter()
907 .take_while(|(event_id, _)| *event_id <= before)
908 .count();
909 for _ in 0..removed {
910 events.pop_front();
911 }
912 Ok(CompactReport {
913 removed,
914 remaining: events.len(),
915 latest: state.latest.get(topic.as_str()).copied(),
916 checkpointed: false,
917 })
918 }
919}
920
921#[derive(Serialize, Deserialize)]
922struct FileRecord {
923 id: EventId,
924 event: LogEvent,
925}
926
927pub struct FileEventLog {
928 root: PathBuf,
929 latest_ids: Mutex<HashMap<String, EventId>>,
930 write_lock: Mutex<()>,
931 broadcasts: BroadcastMap,
932 queue_depth: usize,
933}
934
935impl FileEventLog {
936 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
937 std::fs::create_dir_all(root.join("topics"))
938 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
939 std::fs::create_dir_all(root.join("consumers"))
940 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
941 Ok(Self {
942 root,
943 latest_ids: Mutex::new(HashMap::new()),
944 write_lock: Mutex::new(()),
945 broadcasts: BroadcastMap::default(),
946 queue_depth: queue_depth.max(1),
947 })
948 }
949
950 fn topic_path(&self, topic: &Topic) -> PathBuf {
951 self.root
952 .join("topics")
953 .join(format!("{}.jsonl", topic.as_str()))
954 }
955
956 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
957 self.root.join("consumers").join(format!(
958 "{}__{}.json",
959 topic.as_str(),
960 sanitize_filename(consumer.as_str())
961 ))
962 }
963
964 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
965 if let Some(event_id) = self
966 .latest_ids
967 .lock()
968 .expect("file event log latest ids poisoned")
969 .get(topic.as_str())
970 .copied()
971 {
972 return Ok(event_id);
973 }
974
975 let mut latest = 0;
976 let path = self.topic_path(topic);
977 if path.is_file() {
978 for record in read_file_records(&path)? {
979 latest = record.id;
980 }
981 }
982 self.latest_ids
983 .lock()
984 .expect("file event log latest ids poisoned")
985 .insert(topic.as_str().to_string(), latest);
986 Ok(latest)
987 }
988
989 fn read_range_sync(
990 &self,
991 topic: &Topic,
992 from: Option<EventId>,
993 limit: usize,
994 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
995 let path = self.topic_path(topic);
996 if !path.is_file() {
997 return Ok(Vec::new());
998 }
999 let from = from.unwrap_or(0);
1000 let mut events = Vec::new();
1001 for record in read_file_records(&path)? {
1002 if record.id > from {
1003 events.push((record.id, record.event));
1004 }
1005 if events.len() >= limit {
1006 break;
1007 }
1008 }
1009 Ok(events)
1010 }
1011
1012 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1013 let topics_dir = self.root.join("topics");
1014 if !topics_dir.is_dir() {
1015 return Ok(Vec::new());
1016 }
1017 let mut topics = Vec::new();
1018 for entry in std::fs::read_dir(&topics_dir)
1019 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
1020 {
1021 let entry = entry
1022 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
1023 let path = entry.path();
1024 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
1025 continue;
1026 }
1027 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1028 continue;
1029 };
1030 topics.push(Topic::new(stem.to_string())?);
1031 }
1032 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
1033 Ok(topics)
1034 }
1035
1036 fn append_idempotent_by_header(
1037 &self,
1038 topic: &Topic,
1039 header: &str,
1040 value: &str,
1041 event: LogEvent,
1042 ) -> Result<AppendOutcome, LogError> {
1043 let _guard = self
1044 .write_lock
1045 .lock()
1046 .expect("file event log write lock poisoned");
1047 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1048 if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
1049 event
1050 .headers
1051 .get(header)
1052 .is_some_and(|found| found == value)
1053 }) {
1054 return Ok(AppendOutcome {
1055 event_id: *event_id,
1056 event: existing.clone(),
1057 inserted: false,
1058 });
1059 }
1060
1061 let next_id = self.latest_id_for_topic(topic)? + 1;
1062 let previous = existing_events
1063 .last()
1064 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1065 let event = prepare_event_after(topic, next_id, previous, event)?;
1066 self.append_record_locked(topic, next_id, event)
1067 }
1068
1069 fn append_record_locked(
1070 &self,
1071 topic: &Topic,
1072 event_id: EventId,
1073 event: LogEvent,
1074 ) -> Result<AppendOutcome, LogError> {
1075 let record = FileRecord {
1076 id: event_id,
1077 event: event.clone(),
1078 };
1079 let path = self.topic_path(topic);
1080 if let Some(parent) = path.parent() {
1081 std::fs::create_dir_all(parent)
1082 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1083 }
1084 let line = serde_json::to_string(&record)
1085 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1086 use std::io::Write as _;
1087 let mut file = std::fs::OpenOptions::new()
1088 .create(true)
1089 .append(true)
1090 .open(&path)
1091 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1092 writeln!(file, "{line}")
1093 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1094 self.latest_ids
1095 .lock()
1096 .expect("file event log latest ids poisoned")
1097 .insert(topic.as_str().to_string(), event_id);
1098 self.broadcasts
1099 .publish(topic, self.queue_depth, (event_id, event.clone()));
1100 Ok(AppendOutcome {
1101 event_id,
1102 event,
1103 inserted: true,
1104 })
1105 }
1106}
1107
1108fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
1109 let file = std::fs::File::open(path)
1110 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1111 let mut reader = std::io::BufReader::new(file);
1112 let mut records = Vec::new();
1113 let mut line = Vec::new();
1114 loop {
1115 line.clear();
1116 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
1117 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
1118 if bytes_read == 0 {
1119 break;
1120 }
1121 if line.iter().all(u8::is_ascii_whitespace) {
1122 continue;
1123 }
1124 let complete_line = line.ends_with(b"\n");
1125 match serde_json::from_slice::<FileRecord>(&line) {
1126 Ok(record) => records.push(record),
1127 Err(_) if !complete_line => break,
1128 Err(error) => {
1129 return Err(LogError::Serde(format!("event log parse error: {error}")));
1130 }
1131 }
1132 }
1133 Ok(records)
1134}
1135
1136impl EventLog for FileEventLog {
1137 fn describe(&self) -> EventLogDescription {
1138 EventLogDescription {
1139 backend: EventLogBackendKind::File,
1140 location: Some(self.root.clone()),
1141 size_bytes: Some(dir_size_bytes(&self.root)),
1142 queue_depth: self.queue_depth,
1143 }
1144 }
1145
1146 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1147 let _guard = self
1148 .write_lock
1149 .lock()
1150 .expect("file event log write lock poisoned");
1151 let next_id = self.latest_id_for_topic(topic)? + 1;
1152 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1153 let previous = existing_events
1154 .last()
1155 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1156 let event = prepare_event_after(topic, next_id, previous, event)?;
1157 self.append_record_locked(topic, next_id, event)
1158 .map(|outcome| outcome.event_id)
1159 }
1160
1161 async fn flush(&self) -> Result<(), LogError> {
1162 sync_tree(&self.root)
1163 }
1164
1165 async fn read_range(
1166 &self,
1167 topic: &Topic,
1168 from: Option<EventId>,
1169 limit: usize,
1170 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1171 self.read_range_sync(topic, from, limit)
1172 }
1173
1174 async fn read_range_bytes(
1175 &self,
1176 topic: &Topic,
1177 from: Option<EventId>,
1178 limit: usize,
1179 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1180 self.read_range_sync(topic, from, limit)?
1181 .into_iter()
1182 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1183 .collect()
1184 }
1185
1186 async fn subscribe(
1187 self: Arc<Self>,
1188 topic: &Topic,
1189 from: Option<EventId>,
1190 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1191 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1192 let history = self.read_range_sync(topic, from, usize::MAX)?;
1193 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1194 }
1195
1196 async fn ack(
1197 &self,
1198 topic: &Topic,
1199 consumer: &ConsumerId,
1200 up_to: EventId,
1201 ) -> Result<(), LogError> {
1202 let path = self.consumer_path(topic, consumer);
1203 let payload = serde_json::json!({
1204 "topic": topic.as_str(),
1205 "consumer_id": consumer.as_str(),
1206 "cursor": up_to,
1207 "updated_at_ms": now_ms(),
1208 });
1209 write_json_atomically(&path, &payload)
1210 }
1211
1212 async fn consumer_cursor(
1213 &self,
1214 topic: &Topic,
1215 consumer: &ConsumerId,
1216 ) -> Result<Option<EventId>, LogError> {
1217 let path = self.consumer_path(topic, consumer);
1218 if !path.is_file() {
1219 return Ok(None);
1220 }
1221 let raw = std::fs::read_to_string(&path)
1222 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1223 let payload: serde_json::Value = serde_json::from_str(&raw)
1224 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1225 let cursor = payload
1226 .get("cursor")
1227 .and_then(serde_json::Value::as_u64)
1228 .ok_or_else(|| {
1229 LogError::Serde("event log consumer record missing numeric cursor".to_string())
1230 })?;
1231 Ok(Some(cursor))
1232 }
1233
1234 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1235 let latest = self.latest_id_for_topic(topic)?;
1236 if latest == 0 {
1237 Ok(None)
1238 } else {
1239 Ok(Some(latest))
1240 }
1241 }
1242
1243 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1244 let _guard = self
1245 .write_lock
1246 .lock()
1247 .expect("file event log write lock poisoned");
1248 let path = self.topic_path(topic);
1249 if !path.is_file() {
1250 return Ok(CompactReport::default());
1251 }
1252 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1253 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1254 if retained.is_empty() {
1255 let _ = std::fs::remove_file(&path);
1256 } else {
1257 crate::atomic_io::atomic_write_with(&path, |writer| {
1258 use std::io::Write as _;
1259 for (event_id, event) in &retained {
1260 let line = serde_json::to_string(&FileRecord {
1261 id: *event_id,
1262 event: event.clone(),
1263 })
1264 .map_err(|error| std::io::Error::other(error.to_string()))?;
1265 writeln!(writer, "{line}")?;
1266 }
1267 Ok(())
1268 })
1269 .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1270 }
1271 let latest = retained.last().map(|(event_id, _)| *event_id);
1272 self.latest_ids
1273 .lock()
1274 .expect("file event log latest ids poisoned")
1275 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1276 Ok(CompactReport {
1277 removed,
1278 remaining: retained.len(),
1279 latest,
1280 checkpointed: false,
1281 })
1282 }
1283}
1284
1285pub struct SqliteEventLog {
1286 path: PathBuf,
1287 connection: Mutex<Connection>,
1288 broadcasts: BroadcastMap,
1289 queue_depth: usize,
1290}
1291
1292impl SqliteEventLog {
1293 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1294 if let Some(parent) = path.parent() {
1295 std::fs::create_dir_all(parent)
1296 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1297 }
1298 let connection = Connection::open(&path)
1299 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1300 connection
1307 .busy_timeout(std::time::Duration::from_secs(5))
1308 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1309 connection
1310 .pragma_update(None, "journal_mode", "WAL")
1311 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1312 connection
1313 .pragma_update(None, "synchronous", "NORMAL")
1314 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1315 connection
1316 .execute_batch(
1317 "CREATE TABLE IF NOT EXISTS topic_heads (
1318 topic TEXT PRIMARY KEY,
1319 last_id INTEGER NOT NULL
1320 );
1321 CREATE TABLE IF NOT EXISTS events (
1322 topic TEXT NOT NULL,
1323 event_id INTEGER NOT NULL,
1324 kind TEXT NOT NULL,
1325 payload BLOB NOT NULL,
1326 headers TEXT NOT NULL,
1327 occurred_at_ms INTEGER NOT NULL,
1328 PRIMARY KEY (topic, event_id)
1329 );
1330 CREATE TABLE IF NOT EXISTS consumers (
1331 topic TEXT NOT NULL,
1332 consumer_id TEXT NOT NULL,
1333 cursor INTEGER NOT NULL,
1334 updated_at_ms INTEGER NOT NULL,
1335 PRIMARY KEY (topic, consumer_id)
1336 );
1337 CREATE TABLE IF NOT EXISTS event_idempotency_keys (
1338 topic TEXT NOT NULL,
1339 key TEXT NOT NULL,
1340 value TEXT NOT NULL,
1341 event_id INTEGER NOT NULL,
1342 PRIMARY KEY (topic, key, value),
1343 FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
1344 );",
1345 )
1346 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1347 Ok(Self {
1348 path,
1349 connection: Mutex::new(connection),
1350 broadcasts: BroadcastMap::default(),
1351 queue_depth: queue_depth.max(1),
1352 })
1353 }
1354
1355 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1356 let connection = self
1357 .connection
1358 .lock()
1359 .expect("sqlite event log connection poisoned");
1360 let mut statement = connection
1361 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1362 .map_err(|error| {
1363 LogError::Sqlite(format!("event log topics prepare error: {error}"))
1364 })?;
1365 let rows = statement
1366 .query_map([], |row| row.get::<_, String>(0))
1367 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1368 let mut topics = Vec::new();
1369 for row in rows {
1370 topics.push(Topic::new(row.map_err(|error| {
1371 LogError::Sqlite(format!("event log topic row error: {error}"))
1372 })?)?);
1373 }
1374 Ok(topics)
1375 }
1376
1377 fn append_idempotent_by_header(
1378 &self,
1379 topic: &Topic,
1380 header: &str,
1381 value: &str,
1382 event: LogEvent,
1383 ) -> Result<AppendOutcome, LogError> {
1384 let mut connection = self
1385 .connection
1386 .lock()
1387 .expect("sqlite event log connection poisoned");
1388 let tx = connection
1389 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1390 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1391
1392 let existing = tx
1393 .query_row(
1394 "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
1395 FROM event_idempotency_keys k
1396 JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
1397 WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3",
1398 params![topic.as_str(), header, value],
1399 |row| {
1400 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1401 let headers: String = row.get(3)?;
1402 Ok((
1403 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1404 LogEvent {
1405 kind: row.get(1)?,
1406 payload: serde_json::from_slice(&payload).map_err(|error| {
1407 rusqlite::Error::FromSqlConversionFailure(
1408 payload.len(),
1409 rusqlite::types::Type::Blob,
1410 Box::new(error),
1411 )
1412 })?,
1413 headers: serde_json::from_str(&headers).map_err(|error| {
1414 rusqlite::Error::FromSqlConversionFailure(
1415 headers.len(),
1416 rusqlite::types::Type::Text,
1417 Box::new(error),
1418 )
1419 })?,
1420 occurred_at_ms: row.get(4)?,
1421 },
1422 ))
1423 },
1424 )
1425 .optional()
1426 .map_err(|error| {
1427 LogError::Sqlite(format!("event log idempotency read error: {error}"))
1428 })?;
1429
1430 if let Some((event_id, event)) = existing {
1431 return Ok(AppendOutcome {
1432 event_id,
1433 event,
1434 inserted: false,
1435 });
1436 }
1437
1438 tx.execute(
1439 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1440 params![topic.as_str()],
1441 )
1442 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1443 tx.execute(
1444 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1445 params![topic.as_str()],
1446 )
1447 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1448 let event_id = tx
1449 .query_row(
1450 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1451 params![topic.as_str()],
1452 |row| row.get::<_, i64>(0),
1453 )
1454 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1455 .and_then(sqlite_i64_to_event_id)?;
1456 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1457 let previous = tx
1458 .query_row(
1459 "SELECT event_id, kind, payload, headers, occurred_at_ms
1460 FROM events
1461 WHERE topic = ?1 AND event_id < ?2
1462 ORDER BY event_id DESC
1463 LIMIT 1",
1464 params![topic.as_str(), event_id_sql],
1465 |row| {
1466 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1467 let headers: String = row.get(3)?;
1468 Ok((
1469 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1470 LogEvent {
1471 kind: row.get(1)?,
1472 payload: serde_json::from_slice(&payload).map_err(|error| {
1473 rusqlite::Error::FromSqlConversionFailure(
1474 payload.len(),
1475 rusqlite::types::Type::Blob,
1476 Box::new(error),
1477 )
1478 })?,
1479 headers: serde_json::from_str(&headers).map_err(|error| {
1480 rusqlite::Error::FromSqlConversionFailure(
1481 headers.len(),
1482 rusqlite::types::Type::Text,
1483 Box::new(error),
1484 )
1485 })?,
1486 occurred_at_ms: row.get(4)?,
1487 },
1488 ))
1489 },
1490 )
1491 .optional()
1492 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1493 let event = prepare_event_after(
1494 topic,
1495 event_id,
1496 previous
1497 .as_ref()
1498 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1499 event,
1500 )?;
1501 tx.execute(
1502 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1503 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1504 params![
1505 topic.as_str(),
1506 event_id_sql,
1507 event.kind,
1508 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1509 "event log payload encode error: {error}"
1510 )))?,
1511 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1512 "event log headers encode error: {error}"
1513 )))?,
1514 event.occurred_at_ms
1515 ],
1516 )
1517 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1518 tx.execute(
1519 "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
1520 VALUES (?1, ?2, ?3, ?4)",
1521 params![topic.as_str(), header, value, event_id_sql],
1522 )
1523 .map_err(|error| {
1524 LogError::Sqlite(format!("event log idempotency insert error: {error}"))
1525 })?;
1526 tx.commit()
1527 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1528 self.broadcasts
1529 .publish(topic, self.queue_depth, (event_id, event.clone()));
1530 Ok(AppendOutcome {
1531 event_id,
1532 event,
1533 inserted: true,
1534 })
1535 }
1536}
1537
1538impl EventLog for SqliteEventLog {
1539 fn describe(&self) -> EventLogDescription {
1540 EventLogDescription {
1541 backend: EventLogBackendKind::Sqlite,
1542 location: Some(self.path.clone()),
1543 size_bytes: Some(sqlite_size_bytes(&self.path)),
1544 queue_depth: self.queue_depth,
1545 }
1546 }
1547
1548 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1549 let mut connection = self
1550 .connection
1551 .lock()
1552 .expect("sqlite event log connection poisoned");
1553 let tx = connection
1554 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1555 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1556 tx.execute(
1557 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1558 params![topic.as_str()],
1559 )
1560 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1561 tx.execute(
1562 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1563 params![topic.as_str()],
1564 )
1565 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1566 let event_id = tx
1567 .query_row(
1568 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1569 params![topic.as_str()],
1570 |row| row.get::<_, i64>(0),
1571 )
1572 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1573 .and_then(sqlite_i64_to_event_id)?;
1574 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1575 let previous = tx
1576 .query_row(
1577 "SELECT event_id, kind, payload, headers, occurred_at_ms
1578 FROM events
1579 WHERE topic = ?1 AND event_id < ?2
1580 ORDER BY event_id DESC
1581 LIMIT 1",
1582 params![topic.as_str(), event_id_sql],
1583 |row| {
1584 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1585 let headers: String = row.get(3)?;
1586 Ok((
1587 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1588 LogEvent {
1589 kind: row.get(1)?,
1590 payload: serde_json::from_slice(&payload).map_err(|error| {
1591 rusqlite::Error::FromSqlConversionFailure(
1592 payload.len(),
1593 rusqlite::types::Type::Blob,
1594 Box::new(error),
1595 )
1596 })?,
1597 headers: serde_json::from_str(&headers).map_err(|error| {
1598 rusqlite::Error::FromSqlConversionFailure(
1599 headers.len(),
1600 rusqlite::types::Type::Text,
1601 Box::new(error),
1602 )
1603 })?,
1604 occurred_at_ms: row.get(4)?,
1605 },
1606 ))
1607 },
1608 )
1609 .optional()
1610 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1611 let event = prepare_event_after(
1612 topic,
1613 event_id,
1614 previous
1615 .as_ref()
1616 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1617 event,
1618 )?;
1619 tx.execute(
1620 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1621 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1622 params![
1623 topic.as_str(),
1624 event_id_sql,
1625 event.kind,
1626 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1627 "event log payload encode error: {error}"
1628 )))?,
1629 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1630 "event log headers encode error: {error}"
1631 )))?,
1632 event.occurred_at_ms
1633 ],
1634 )
1635 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1636 tx.commit()
1637 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1638 self.broadcasts
1639 .publish(topic, self.queue_depth, (event_id, event.clone()));
1640 Ok(event_id)
1641 }
1642
1643 async fn flush(&self) -> Result<(), LogError> {
1644 let connection = self
1645 .connection
1646 .lock()
1647 .expect("sqlite event log connection poisoned");
1648 connection
1649 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1650 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1651 Ok(())
1652 }
1653
1654 async fn read_range(
1655 &self,
1656 topic: &Topic,
1657 from: Option<EventId>,
1658 limit: usize,
1659 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1660 let connection = self
1661 .connection
1662 .lock()
1663 .expect("sqlite event log connection poisoned");
1664 let mut statement = connection
1665 .prepare(
1666 "SELECT event_id, kind, payload, headers, occurred_at_ms
1667 FROM events
1668 WHERE topic = ?1 AND event_id > ?2
1669 ORDER BY event_id ASC
1670 LIMIT ?3",
1671 )
1672 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1673 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1674 let rows = statement
1675 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1676 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1677 let headers: String = row.get(3)?;
1678 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1679 Ok((
1680 event_id,
1681 LogEvent {
1682 kind: row.get(1)?,
1683 payload: serde_json::from_slice(&payload).map_err(|error| {
1684 rusqlite::Error::FromSqlConversionFailure(
1685 payload.len(),
1686 rusqlite::types::Type::Blob,
1687 Box::new(error),
1688 )
1689 })?,
1690 headers: serde_json::from_str(&headers).map_err(|error| {
1691 rusqlite::Error::FromSqlConversionFailure(
1692 headers.len(),
1693 rusqlite::types::Type::Text,
1694 Box::new(error),
1695 )
1696 })?,
1697 occurred_at_ms: row.get(4)?,
1698 },
1699 ))
1700 })
1701 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1702 let mut events = Vec::new();
1703 for row in rows {
1704 events.push(
1705 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1706 );
1707 }
1708 Ok(events)
1709 }
1710
1711 async fn read_range_bytes(
1712 &self,
1713 topic: &Topic,
1714 from: Option<EventId>,
1715 limit: usize,
1716 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1717 let connection = self
1718 .connection
1719 .lock()
1720 .expect("sqlite event log connection poisoned");
1721 let mut statement = connection
1722 .prepare(
1723 "SELECT event_id, kind, payload, headers, occurred_at_ms
1724 FROM events
1725 WHERE topic = ?1 AND event_id > ?2
1726 ORDER BY event_id ASC
1727 LIMIT ?3",
1728 )
1729 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1730 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1731 let rows = statement
1732 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1733 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1734 let headers: String = row.get(3)?;
1735 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1736 Ok((
1737 event_id,
1738 LogEventBytes {
1739 kind: row.get(1)?,
1740 payload: Bytes::from(payload),
1741 headers: serde_json::from_str(&headers).map_err(|error| {
1742 rusqlite::Error::FromSqlConversionFailure(
1743 headers.len(),
1744 rusqlite::types::Type::Text,
1745 Box::new(error),
1746 )
1747 })?,
1748 occurred_at_ms: row.get(4)?,
1749 },
1750 ))
1751 })
1752 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1753 let mut events = Vec::new();
1754 for row in rows {
1755 events.push(
1756 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1757 );
1758 }
1759 Ok(events)
1760 }
1761
1762 async fn subscribe(
1763 self: Arc<Self>,
1764 topic: &Topic,
1765 from: Option<EventId>,
1766 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1767 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1768 let history = self.read_range(topic, from, usize::MAX).await?;
1769 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1770 }
1771
1772 async fn ack(
1773 &self,
1774 topic: &Topic,
1775 consumer: &ConsumerId,
1776 up_to: EventId,
1777 ) -> Result<(), LogError> {
1778 let connection = self
1779 .connection
1780 .lock()
1781 .expect("sqlite event log connection poisoned");
1782 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1783 connection
1784 .execute(
1785 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1786 VALUES (?1, ?2, ?3, ?4)
1787 ON CONFLICT(topic, consumer_id)
1788 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1789 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1790 )
1791 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1792 Ok(())
1793 }
1794
1795 async fn consumer_cursor(
1796 &self,
1797 topic: &Topic,
1798 consumer: &ConsumerId,
1799 ) -> Result<Option<EventId>, LogError> {
1800 let connection = self
1801 .connection
1802 .lock()
1803 .expect("sqlite event log connection poisoned");
1804 connection
1805 .query_row(
1806 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1807 params![topic.as_str(), consumer.as_str()],
1808 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1809 )
1810 .optional()
1811 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1812 }
1813
1814 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1815 let connection = self
1816 .connection
1817 .lock()
1818 .expect("sqlite event log connection poisoned");
1819 connection
1820 .query_row(
1821 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1822 params![topic.as_str()],
1823 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1824 )
1825 .optional()
1826 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1827 }
1828
1829 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1830 let connection = self
1831 .connection
1832 .lock()
1833 .expect("sqlite event log connection poisoned");
1834 let before_sql = event_id_to_sqlite_i64(before)?;
1835 connection
1836 .execute(
1837 "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
1838 params![topic.as_str(), before_sql],
1839 )
1840 .map_err(|error| {
1841 LogError::Sqlite(format!("event log idempotency compact error: {error}"))
1842 })?;
1843 let removed = connection
1844 .execute(
1845 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1846 params![topic.as_str(), before_sql],
1847 )
1848 .map_err(|error| {
1849 LogError::Sqlite(format!("event log compact delete error: {error}"))
1850 })?;
1851 let remaining = connection
1852 .query_row(
1853 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1854 params![topic.as_str()],
1855 |row| row.get::<_, i64>(0),
1856 )
1857 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1858 .and_then(sqlite_i64_to_usize)?;
1859 let latest = connection
1860 .query_row(
1861 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1862 params![topic.as_str()],
1863 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1864 )
1865 .optional()
1866 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1867 connection
1868 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1869 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1870 Ok(CompactReport {
1871 removed,
1872 remaining,
1873 latest,
1874 checkpointed: true,
1875 })
1876 }
1877}
1878
1879fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1880 let candidate = PathBuf::from(value);
1881 if candidate.is_absolute() {
1882 candidate
1883 } else {
1884 base_dir.join(candidate)
1885 }
1886}
1887
1888fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1889 let encoded = serde_json::to_vec_pretty(payload)
1890 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1891 crate::atomic_io::atomic_write(path, &encoded)
1892 .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1893}
1894
1895fn sanitize_filename(value: &str) -> String {
1896 sanitize_topic_component(value)
1897}
1898
1899pub fn sanitize_topic_component(value: &str) -> String {
1900 value
1901 .chars()
1902 .map(|ch| {
1903 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1904 ch
1905 } else {
1906 '_'
1907 }
1908 })
1909 .collect()
1910}
1911
1912fn dir_size_bytes(path: &Path) -> u64 {
1913 if !path.exists() {
1914 return 0;
1915 }
1916 let mut total = 0;
1917 if let Ok(entries) = std::fs::read_dir(path) {
1918 for entry in entries.flatten() {
1919 let path = entry.path();
1920 if path.is_dir() {
1921 total += dir_size_bytes(&path);
1922 } else if let Ok(metadata) = entry.metadata() {
1923 total += metadata.len();
1924 }
1925 }
1926 }
1927 total
1928}
1929
1930fn sqlite_size_bytes(path: &Path) -> u64 {
1931 let mut total = file_size(path);
1932 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1933 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1934 total
1935}
1936
1937fn file_size(path: &Path) -> u64 {
1938 std::fs::metadata(path)
1939 .map(|metadata| metadata.len())
1940 .unwrap_or(0)
1941}
1942
1943fn sync_tree(root: &Path) -> Result<(), LogError> {
1944 if !root.exists() {
1945 return Ok(());
1946 }
1947 for entry in std::fs::read_dir(root)
1948 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1949 {
1950 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1951 let path = entry.path();
1952 if path.is_dir() {
1953 sync_tree(&path)?;
1954 continue;
1955 }
1956 std::fs::File::open(&path)
1957 .and_then(|file| file.sync_all())
1958 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1959 }
1960 Ok(())
1961}
1962
1963fn now_ms() -> i64 {
1964 std::time::SystemTime::now()
1965 .duration_since(std::time::UNIX_EPOCH)
1966 .map(|duration| duration.as_millis() as i64)
1967 .unwrap_or(0)
1968}
1969
1970fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1971 i64::try_from(event_id)
1972 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1973}
1974
1975fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1976 u64::try_from(value)
1977 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1978}
1979
1980fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1981 u64::try_from(value).map_err(|_| {
1982 rusqlite::Error::FromSqlConversionFailure(
1983 std::mem::size_of::<i64>(),
1984 rusqlite::types::Type::Integer,
1985 "sqlite event id is negative".into(),
1986 )
1987 })
1988}
1989
1990fn sqlite_json_bytes_for_row(
1991 row: &rusqlite::Row<'_>,
1992 index: usize,
1993 name: &str,
1994) -> rusqlite::Result<Vec<u8>> {
1995 let value = row.get_ref(index)?;
1996 match value {
1997 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1998 Ok(bytes.to_vec())
1999 }
2000 other => Err(rusqlite::Error::InvalidColumnType(
2001 index,
2002 name.to_string(),
2003 other.data_type(),
2004 )),
2005 }
2006}
2007
2008fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
2009 usize::try_from(value)
2010 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
2011}
2012
2013#[cfg(test)]
2014mod tests {
2015 use super::*;
2016 use futures::StreamExt;
2017 use rand::{rngs::StdRng, RngExt, SeedableRng};
2018
2019 #[test]
2020 fn lazy_default_event_log_opens_on_first_access() {
2021 reset_active_event_log();
2022 let dir = tempfile::tempdir().unwrap();
2023
2024 install_lazy_default_for_base_dir(dir.path()).unwrap();
2025 assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2026 assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2027
2028 let _log = active_event_log().expect("lazy event log should open on demand");
2029 assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2030 assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2031 reset_active_event_log();
2032 }
2033
2034 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
2035 let topic = Topic::new("trigger.inbox").unwrap();
2036 for i in 0..10_000 {
2037 log.append(
2038 &topic,
2039 LogEvent::new("append", serde_json::json!({ "i": i })),
2040 )
2041 .await
2042 .unwrap();
2043 }
2044 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2045 assert_eq!(events.len(), 10_000);
2046 assert_eq!(events.first().unwrap().0, 1);
2047 assert_eq!(events.last().unwrap().0, 10_000);
2048 }
2049
2050 async fn exercise_idempotent_append(log: Arc<AnyEventLog>) {
2051 let topic = Topic::new("channel.tenant.default.pr").unwrap();
2052 let mut first_headers = BTreeMap::new();
2053 first_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2054 let first = log
2055 .append_idempotent_by_header(
2056 &topic,
2057 "harn.channel.id",
2058 "event-1",
2059 LogEvent::new("channel.emit", serde_json::json!({"n": 1}))
2060 .with_headers(first_headers),
2061 )
2062 .await
2063 .unwrap();
2064 assert!(first.inserted);
2065 assert_eq!(first.event_id, 1);
2066
2067 let mut duplicate_headers = BTreeMap::new();
2068 duplicate_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2069 let duplicate = log
2070 .append_idempotent_by_header(
2071 &topic,
2072 "harn.channel.id",
2073 "event-1",
2074 LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2075 .with_headers(duplicate_headers),
2076 )
2077 .await
2078 .unwrap();
2079 assert!(!duplicate.inserted);
2080 assert_eq!(duplicate.event_id, first.event_id);
2081 assert_eq!(duplicate.event.payload, serde_json::json!({"n": 1}));
2082
2083 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2084 assert_eq!(events.len(), 1);
2085 assert_eq!(events[0].0, first.event_id);
2086 }
2087
2088 #[tokio::test(flavor = "current_thread")]
2089 async fn memory_backend_supports_append_read_subscribe_and_compact() {
2090 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2091 exercise_basic_backend(log.clone()).await;
2092
2093 let topic = Topic::new("agent.transcript.demo").unwrap();
2094 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
2095 let first = log
2096 .append(
2097 &topic,
2098 LogEvent::new("message", serde_json::json!({"text":"one"})),
2099 )
2100 .await
2101 .unwrap();
2102 let second = log
2103 .append(
2104 &topic,
2105 LogEvent::new("message", serde_json::json!({"text":"two"})),
2106 )
2107 .await
2108 .unwrap();
2109 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
2110 assert_eq!(seen[0].as_ref().unwrap().0, first);
2111 assert_eq!(seen[1].as_ref().unwrap().0, second);
2112
2113 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
2114 .await
2115 .unwrap();
2116 let compact = log.compact(&topic, first).await.unwrap();
2117 assert_eq!(compact.removed, 1);
2118 assert_eq!(compact.remaining, 1);
2119 }
2120
2121 #[tokio::test(flavor = "current_thread")]
2122 async fn memory_backend_idempotent_append_returns_original_event() {
2123 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2124 exercise_idempotent_append(log).await;
2125 }
2126
2127 #[tokio::test(flavor = "current_thread")]
2128 async fn file_backend_persists_across_reopen_and_compacts() {
2129 let dir = tempfile::tempdir().unwrap();
2130 let topic = Topic::new("trigger.outbox").unwrap();
2131 let first_log = Arc::new(AnyEventLog::File(
2132 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2133 ));
2134 first_log
2135 .append(
2136 &topic,
2137 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
2138 )
2139 .await
2140 .unwrap();
2141 first_log
2142 .append(
2143 &topic,
2144 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
2145 )
2146 .await
2147 .unwrap();
2148 drop(first_log);
2149
2150 let reopened = Arc::new(AnyEventLog::File(
2151 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2152 ));
2153 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2154 assert_eq!(events.len(), 2);
2155 let compact = reopened.compact(&topic, 1).await.unwrap();
2156 assert_eq!(compact.removed, 1);
2157 assert_eq!(
2158 reopened
2159 .read_range(&topic, None, usize::MAX)
2160 .await
2161 .unwrap()
2162 .len(),
2163 1
2164 );
2165 }
2166
2167 #[tokio::test(flavor = "current_thread")]
2168 async fn file_backend_skips_torn_tail_on_restart() {
2169 let dir = tempfile::tempdir().unwrap();
2170 let topic = Topic::new("trigger.inbox").unwrap();
2171 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2172 first_log
2173 .append(
2174 &topic,
2175 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
2176 )
2177 .await
2178 .unwrap();
2179 drop(first_log);
2180
2181 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
2182 use std::io::Write as _;
2183 let mut file = std::fs::OpenOptions::new()
2184 .append(true)
2185 .open(&topic_path)
2186 .unwrap();
2187 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
2188 drop(file);
2189
2190 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2191 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2192 assert_eq!(events.len(), 1);
2193 assert_eq!(events[0].0, 1);
2194 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
2195 }
2196
2197 #[tokio::test(flavor = "current_thread")]
2198 async fn file_backend_idempotent_append_returns_original_event() {
2199 let dir = tempfile::tempdir().unwrap();
2200 let log = Arc::new(AnyEventLog::File(
2201 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2202 ));
2203 exercise_idempotent_append(log).await;
2204 }
2205
2206 #[tokio::test(flavor = "current_thread")]
2207 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
2208 let dir = tempfile::tempdir().unwrap();
2209 let path = dir.path().join("events.sqlite");
2210 let topic = Topic::new("daemon.demo.state").unwrap();
2211 let first_log = Arc::new(AnyEventLog::Sqlite(
2212 SqliteEventLog::open(path.clone(), 8).unwrap(),
2213 ));
2214 first_log
2215 .append(
2216 &topic,
2217 LogEvent::new("state", serde_json::json!({"state":"idle"})),
2218 )
2219 .await
2220 .unwrap();
2221 first_log
2222 .append(
2223 &topic,
2224 LogEvent::new("state", serde_json::json!({"state":"active"})),
2225 )
2226 .await
2227 .unwrap();
2228 drop(first_log);
2229
2230 let reopened = Arc::new(AnyEventLog::Sqlite(
2231 SqliteEventLog::open(path.clone(), 8).unwrap(),
2232 ));
2233 assert_eq!(
2234 reopened
2235 .read_range(&topic, None, usize::MAX)
2236 .await
2237 .unwrap()
2238 .len(),
2239 2
2240 );
2241 let compact = reopened.compact(&topic, 1).await.unwrap();
2242 assert!(compact.checkpointed);
2243 let wal = PathBuf::from(format!("{}-wal", path.display()));
2244 assert!(file_size(&wal) == 0 || !wal.exists());
2245 }
2246
2247 #[tokio::test(flavor = "current_thread")]
2248 async fn sqlite_backend_idempotent_append_returns_original_event() {
2249 let dir = tempfile::tempdir().unwrap();
2250 let path = dir.path().join("events.sqlite");
2251 let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2252 exercise_idempotent_append(log).await;
2253 }
2254
2255 #[tokio::test(flavor = "current_thread")]
2256 async fn sqlite_backend_compacts_idempotency_keys_with_events() {
2257 let dir = tempfile::tempdir().unwrap();
2258 let path = dir.path().join("events.sqlite");
2259 let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2260 let topic = Topic::new("channel.tenant.default.compacted").unwrap();
2261 let mut headers = BTreeMap::new();
2262 headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2263 let first = log
2264 .append_idempotent_by_header(
2265 &topic,
2266 "harn.channel.id",
2267 "event-1",
2268 LogEvent::new("channel.emit", serde_json::json!({"n": 1})).with_headers(headers),
2269 )
2270 .await
2271 .unwrap();
2272 log.compact(&topic, first.event_id).await.unwrap();
2273
2274 let mut replacement_headers = BTreeMap::new();
2275 replacement_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2276 let replacement = log
2277 .append_idempotent_by_header(
2278 &topic,
2279 "harn.channel.id",
2280 "event-1",
2281 LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2282 .with_headers(replacement_headers),
2283 )
2284 .await
2285 .unwrap();
2286 assert!(replacement.inserted);
2287 assert!(replacement.event_id > first.event_id);
2288 assert_eq!(replacement.event.payload, serde_json::json!({"n": 2}));
2289 }
2290
2291 #[tokio::test(flavor = "current_thread")]
2292 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
2293 let dir = tempfile::tempdir().unwrap();
2294 let path = dir.path().join("events.sqlite");
2295 let topic = Topic::new("observability.action_graph").unwrap();
2296 let log = SqliteEventLog::open(path, 8).unwrap();
2297 let event_id = log
2298 .append(
2299 &topic,
2300 LogEvent::new(
2301 "snapshot",
2302 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
2303 ),
2304 )
2305 .await
2306 .unwrap();
2307
2308 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2309 assert_eq!(events.len(), 1);
2310 assert_eq!(events[0].0, event_id);
2311 assert_eq!(
2312 events[0].1.payload_json().unwrap(),
2313 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
2314 );
2315 }
2316
2317 #[tokio::test(flavor = "current_thread")]
2318 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
2319 let dir = tempfile::tempdir().unwrap();
2320 let path = dir.path().join("events.sqlite");
2321 let topic = Topic::new("agent.transcript.legacy").unwrap();
2322 let log = SqliteEventLog::open(path, 8).unwrap();
2323 {
2324 let connection = log.connection.lock().unwrap();
2325 connection
2326 .execute(
2327 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
2328 params![topic.as_str()],
2329 )
2330 .unwrap();
2331 connection
2332 .execute(
2333 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
2334 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
2335 params![topic.as_str(), "{\"text\":\"old\"}"],
2336 )
2337 .unwrap();
2338 }
2339
2340 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2341 assert_eq!(
2342 events[0].1.payload_json().unwrap(),
2343 serde_json::json!({"text": "old"})
2344 );
2345 assert_eq!(
2346 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
2347 "legacy"
2348 );
2349 }
2350
2351 #[tokio::test(flavor = "current_thread")]
2352 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
2353 let (sender, rx) = broadcast::channel(2);
2354 for i in 0..10 {
2355 sender
2356 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
2357 .unwrap();
2358 }
2359 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2360
2361 match stream.next().await {
2362 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
2363 other => panic!("subscriber should surface lag, got {other:?}"),
2364 }
2365 }
2366
2367 #[tokio::test(flavor = "current_thread")]
2368 async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
2369 let (sender, rx) = broadcast::channel(2);
2370 let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2371 assert_eq!(sender.receiver_count(), 1);
2372 drop(stream);
2373
2374 tokio::time::timeout(std::time::Duration::from_millis(100), async {
2375 while sender.receiver_count() != 0 {
2376 tokio::task::yield_now().await;
2377 }
2378 })
2379 .await
2380 .expect("subscription receiver should close after consumer drop");
2381 }
2382
2383 #[tokio::test(flavor = "current_thread")]
2384 async fn randomized_reader_sequences_stay_monotonic() {
2385 let log = Arc::new(MemoryEventLog::new(32));
2386 let topic = Topic::new("fuzz.demo").unwrap();
2387 let mut readers = vec![
2388 log.clone().subscribe(&topic, None).await.unwrap(),
2389 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
2390 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
2391 ];
2392 let mut rng = StdRng::seed_from_u64(7);
2393 for _ in 0..64 {
2394 let value = rng.random_range(0..1000);
2395 log.append(
2396 &topic,
2397 LogEvent::new("rand", serde_json::json!({"value": value})),
2398 )
2399 .await
2400 .unwrap();
2401 }
2402
2403 let mut sequences = Vec::new();
2404 for reader in &mut readers {
2405 let mut ids = Vec::new();
2406 while let Some(item) = reader.next().await {
2407 match item {
2408 Ok((event_id, _)) => {
2409 ids.push(event_id);
2410 if ids.len() >= 16 {
2411 break;
2412 }
2413 }
2414 Err(LogError::ConsumerLagged(_)) => break,
2415 Err(error) => panic!("unexpected subscription error: {error}"),
2416 }
2417 }
2418 sequences.push(ids);
2419 }
2420
2421 for ids in sequences {
2422 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
2423 }
2424 }
2425}