1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15use crate::runtime_limits::RuntimeLimits;
16
17pub type EventId = u64;
18
19pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
20pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
21pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
22pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct Topic(String);
26
27impl Topic {
28 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
29 let value = value.into();
30 if value.is_empty() {
31 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
32 }
33 if !value
34 .chars()
35 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
36 {
37 return Err(LogError::InvalidTopic(format!(
38 "topic '{value}' contains unsupported characters"
39 )));
40 }
41 Ok(Self(value))
42 }
43
44 pub fn as_str(&self) -> &str {
45 &self.0
46 }
47}
48
49impl fmt::Display for Topic {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 self.0.fmt(f)
52 }
53}
54
55impl FromStr for Topic {
56 type Err = LogError;
57
58 fn from_str(s: &str) -> Result<Self, Self::Err> {
59 Self::new(s)
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
64pub struct ConsumerId(String);
65
66impl ConsumerId {
67 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
68 let value = value.into();
69 if value.trim().is_empty() {
70 return Err(LogError::InvalidConsumer(
71 "consumer id cannot be empty".to_string(),
72 ));
73 }
74 Ok(Self(value))
75 }
76
77 pub fn as_str(&self) -> &str {
78 &self.0
79 }
80}
81
82impl fmt::Display for ConsumerId {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 self.0.fmt(f)
85 }
86}
87
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(rename_all = "snake_case")]
90pub enum EventLogBackendKind {
91 Memory,
92 File,
93 Sqlite,
94}
95
96impl fmt::Display for EventLogBackendKind {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 match self {
99 Self::Memory => write!(f, "memory"),
100 Self::File => write!(f, "file"),
101 Self::Sqlite => write!(f, "sqlite"),
102 }
103 }
104}
105
106impl FromStr for EventLogBackendKind {
107 type Err = LogError;
108
109 fn from_str(value: &str) -> Result<Self, Self::Err> {
110 match value.trim().to_ascii_lowercase().as_str() {
111 "memory" => Ok(Self::Memory),
112 "file" => Ok(Self::File),
113 "sqlite" => Ok(Self::Sqlite),
114 other => Err(LogError::Config(format!(
115 "unsupported event log backend '{other}'"
116 ))),
117 }
118 }
119}
120
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
122pub struct LogEvent {
123 pub kind: String,
124 pub payload: serde_json::Value,
125 #[serde(default)]
126 pub headers: BTreeMap<String, String>,
127 pub occurred_at_ms: i64,
128}
129
130impl LogEvent {
131 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
132 Self {
133 kind: kind.into(),
134 payload,
135 headers: BTreeMap::new(),
136 occurred_at_ms: now_ms(),
137 }
138 }
139
140 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
141 self.headers = headers;
142 self
143 }
144
145 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
151 self.headers = policy.redact_headers(&self.headers);
152 policy.redact_json_in_place(&mut self.payload);
153 }
154}
155
156#[derive(Clone, Debug, PartialEq, Eq)]
163pub struct LogEventBytes {
164 pub kind: String,
165 pub payload: Bytes,
166 pub headers: BTreeMap<String, String>,
167 pub occurred_at_ms: i64,
168}
169
170impl LogEventBytes {
171 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
172 serde_json::from_slice(&self.payload)
173 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
174 }
175
176 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
177 Ok(LogEvent {
178 kind: self.kind,
179 payload: serde_json::from_slice(&self.payload).map_err(|error| {
180 LogError::Serde(format!("event log payload parse error: {error}"))
181 })?,
182 headers: self.headers,
183 occurred_at_ms: self.occurred_at_ms,
184 })
185 }
186}
187
188impl TryFrom<LogEvent> for LogEventBytes {
189 type Error = LogError;
190
191 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
192 let payload = serde_json::to_vec(&event.payload)
193 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
194 Ok(Self {
195 kind: event.kind,
196 payload: Bytes::from(payload),
197 headers: event.headers,
198 occurred_at_ms: event.occurred_at_ms,
199 })
200 }
201}
202
203#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct CompactReport {
205 pub removed: usize,
206 pub remaining: usize,
207 pub latest: Option<EventId>,
208 pub checkpointed: bool,
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub struct AppendOutcome {
213 pub event_id: EventId,
214 pub event: LogEvent,
215 pub inserted: bool,
216}
217
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct EventLogDescription {
220 pub backend: EventLogBackendKind,
221 pub location: Option<PathBuf>,
222 pub size_bytes: Option<u64>,
223 pub queue_depth: usize,
224}
225
226#[derive(Debug)]
227pub enum LogError {
228 Config(String),
229 InvalidTopic(String),
230 InvalidConsumer(String),
231 Io(String),
232 Serde(String),
233 Sqlite(String),
234 ConsumerLagged(EventId),
235}
236
237impl fmt::Display for LogError {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match self {
240 Self::Config(message)
241 | Self::InvalidTopic(message)
242 | Self::InvalidConsumer(message)
243 | Self::Io(message)
244 | Self::Serde(message)
245 | Self::Sqlite(message) => message.fmt(f),
246 Self::ConsumerLagged(last_id) => {
247 write!(f, "subscriber lagged behind after event {last_id}")
248 }
249 }
250 }
251}
252
253impl std::error::Error for LogError {}
254
255#[allow(async_fn_in_trait)]
256pub trait EventLog: Send + Sync {
257 fn describe(&self) -> EventLogDescription;
258
259 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
260
261 async fn flush(&self) -> Result<(), LogError>;
262
263 async fn read_range(
266 &self,
267 topic: &Topic,
268 from: Option<EventId>,
269 limit: usize,
270 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
271
272 async fn read_range_bytes(
273 &self,
274 topic: &Topic,
275 from: Option<EventId>,
276 limit: usize,
277 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
278 let events = self.read_range(topic, from, limit).await?;
279 events
280 .into_iter()
281 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
282 .collect()
283 }
284
285 async fn subscribe(
288 self: Arc<Self>,
289 topic: &Topic,
290 from: Option<EventId>,
291 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
292
293 async fn ack(
294 &self,
295 topic: &Topic,
296 consumer: &ConsumerId,
297 up_to: EventId,
298 ) -> Result<(), LogError>;
299
300 async fn consumer_cursor(
301 &self,
302 topic: &Topic,
303 consumer: &ConsumerId,
304 ) -> Result<Option<EventId>, LogError>;
305
306 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
307
308 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
309}
310
311#[derive(Clone, Debug)]
312pub struct EventLogConfig {
313 pub backend: EventLogBackendKind,
314 pub file_dir: PathBuf,
315 pub sqlite_path: PathBuf,
316 pub queue_depth: usize,
317}
318
319impl EventLogConfig {
320 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
321 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
322 .ok()
323 .map(|value| value.parse())
324 .transpose()?
325 .unwrap_or(EventLogBackendKind::Sqlite);
326 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
327 .ok()
328 .and_then(|value| value.parse::<usize>().ok())
329 .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
330 .max(1);
331
332 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
333 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
334 _ => crate::runtime_paths::event_log_dir(base_dir),
335 };
336 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
337 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
338 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
339 };
340
341 Ok(Self {
342 backend,
343 file_dir,
344 sqlite_path,
345 queue_depth,
346 })
347 }
348
349 pub fn location(&self) -> Option<PathBuf> {
350 match self.backend {
351 EventLogBackendKind::Memory => None,
352 EventLogBackendKind::File => Some(self.file_dir.clone()),
353 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
354 }
355 }
356}
357
358thread_local! {
359 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
360 static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
361}
362
363pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
364 let config = EventLogConfig::for_base_dir(base_dir)?;
365 let log = open_event_log(&config)?;
366 ACTIVE_EVENT_LOG.with(|slot| {
367 *slot.borrow_mut() = Some(log.clone());
368 });
369 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
370 *slot.borrow_mut() = None;
371 });
372 Ok(log)
373}
374
375pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
376 let config = EventLogConfig::for_base_dir(base_dir)?;
377 let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
378 if !has_active {
379 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
380 *slot.borrow_mut() = Some(config);
381 });
382 }
383 Ok(())
384}
385
386pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
387 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
388 ACTIVE_EVENT_LOG.with(|slot| {
389 *slot.borrow_mut() = Some(log.clone());
390 });
391 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
392 *slot.borrow_mut() = None;
393 });
394 log
395}
396
397pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
398 ACTIVE_EVENT_LOG.with(|slot| {
399 *slot.borrow_mut() = Some(log.clone());
400 });
401 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
402 *slot.borrow_mut() = None;
403 });
404 log
405}
406
407pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
408 if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
409 return Some(log);
410 }
411
412 let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
413 match open_event_log(&config) {
414 Ok(log) => Some(install_active_event_log(log)),
415 Err(error) => {
416 crate::events::log_warn("event_log.init", &error.to_string());
417 None
418 }
419 }
420}
421
422pub fn reset_active_event_log() {
423 ACTIVE_EVENT_LOG.with(|slot| {
424 *slot.borrow_mut() = None;
425 });
426 PENDING_DEFAULT_EVENT_LOG.with(|slot| {
427 *slot.borrow_mut() = None;
428 });
429}
430
431pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
432 let config = EventLogConfig::for_base_dir(base_dir)?;
433 let description = match config.backend {
434 EventLogBackendKind::Memory => EventLogDescription {
435 backend: EventLogBackendKind::Memory,
436 location: None,
437 size_bytes: None,
438 queue_depth: config.queue_depth,
439 },
440 EventLogBackendKind::File => EventLogDescription {
441 backend: EventLogBackendKind::File,
442 size_bytes: Some(dir_size_bytes(&config.file_dir)),
443 location: Some(config.file_dir),
444 queue_depth: config.queue_depth,
445 },
446 EventLogBackendKind::Sqlite => EventLogDescription {
447 backend: EventLogBackendKind::Sqlite,
448 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
449 location: Some(config.sqlite_path),
450 queue_depth: config.queue_depth,
451 },
452 };
453 Ok(description)
454}
455
456pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
457 match config.backend {
458 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
459 config.queue_depth,
460 )))),
461 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
462 config.file_dir.clone(),
463 config.queue_depth,
464 )?))),
465 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
466 config.sqlite_path.clone(),
467 config.queue_depth,
468 )?))),
469 }
470}
471
472pub enum AnyEventLog {
473 Memory(MemoryEventLog),
474 File(FileEventLog),
475 Sqlite(SqliteEventLog),
476}
477
478impl AnyEventLog {
479 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
480 match self {
481 Self::Memory(log) => log.topics().await,
482 Self::File(log) => log.topics(),
483 Self::Sqlite(log) => log.topics(),
484 }
485 }
486
487 pub async fn append_idempotent_by_header(
488 &self,
489 topic: &Topic,
490 header: &str,
491 value: &str,
492 event: LogEvent,
493 ) -> Result<AppendOutcome, LogError> {
494 if header.trim().is_empty() {
495 return Err(LogError::Config(
496 "idempotent append header cannot be empty".to_string(),
497 ));
498 }
499 match self {
500 Self::Memory(log) => {
501 log.append_idempotent_by_header(topic, header, value, event)
502 .await
503 }
504 Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
505 Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
506 }
507 }
508}
509
510impl EventLog for AnyEventLog {
511 fn describe(&self) -> EventLogDescription {
512 match self {
513 Self::Memory(log) => log.describe(),
514 Self::File(log) => log.describe(),
515 Self::Sqlite(log) => log.describe(),
516 }
517 }
518
519 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
520 match self {
521 Self::Memory(log) => log.append(topic, event).await,
522 Self::File(log) => log.append(topic, event).await,
523 Self::Sqlite(log) => log.append(topic, event).await,
524 }
525 }
526
527 async fn flush(&self) -> Result<(), LogError> {
528 match self {
529 Self::Memory(log) => log.flush().await,
530 Self::File(log) => log.flush().await,
531 Self::Sqlite(log) => log.flush().await,
532 }
533 }
534
535 async fn read_range(
536 &self,
537 topic: &Topic,
538 from: Option<EventId>,
539 limit: usize,
540 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
541 match self {
542 Self::Memory(log) => log.read_range(topic, from, limit).await,
543 Self::File(log) => log.read_range(topic, from, limit).await,
544 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
545 }
546 }
547
548 async fn read_range_bytes(
549 &self,
550 topic: &Topic,
551 from: Option<EventId>,
552 limit: usize,
553 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
554 match self {
555 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
556 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
557 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
558 }
559 }
560
561 async fn subscribe(
562 self: Arc<Self>,
563 topic: &Topic,
564 from: Option<EventId>,
565 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
566 let (rx, queue_depth) = match self.as_ref() {
567 Self::Memory(log) => (
568 log.broadcasts.subscribe(topic, log.queue_depth),
569 log.queue_depth,
570 ),
571 Self::File(log) => (
572 log.broadcasts.subscribe(topic, log.queue_depth),
573 log.queue_depth,
574 ),
575 Self::Sqlite(log) => (
576 log.broadcasts.subscribe(topic, log.queue_depth),
577 log.queue_depth,
578 ),
579 };
580 let history = self.read_range(topic, from, usize::MAX).await?;
581 Ok(stream_from_broadcast(history, from, rx, queue_depth))
582 }
583
584 async fn ack(
585 &self,
586 topic: &Topic,
587 consumer: &ConsumerId,
588 up_to: EventId,
589 ) -> Result<(), LogError> {
590 match self {
591 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
592 Self::File(log) => log.ack(topic, consumer, up_to).await,
593 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
594 }
595 }
596
597 async fn consumer_cursor(
598 &self,
599 topic: &Topic,
600 consumer: &ConsumerId,
601 ) -> Result<Option<EventId>, LogError> {
602 match self {
603 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
604 Self::File(log) => log.consumer_cursor(topic, consumer).await,
605 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
606 }
607 }
608
609 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
610 match self {
611 Self::Memory(log) => log.latest(topic).await,
612 Self::File(log) => log.latest(topic).await,
613 Self::Sqlite(log) => log.latest(topic).await,
614 }
615 }
616
617 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
618 match self {
619 Self::Memory(log) => log.compact(topic, before).await,
620 Self::File(log) => log.compact(topic, before).await,
621 Self::Sqlite(log) => log.compact(topic, before).await,
622 }
623 }
624}
625
626#[derive(Default)]
627struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
628
629impl BroadcastMap {
630 fn subscribe(
631 &self,
632 topic: &Topic,
633 capacity: usize,
634 ) -> broadcast::Receiver<(EventId, LogEvent)> {
635 self.sender(topic, capacity).subscribe()
636 }
637
638 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
639 let _ = self.sender(topic, capacity).send(record);
640 }
641
642 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
643 let mut map = self.0.lock().expect("event log broadcast map poisoned");
644 map.entry(topic.as_str().to_string())
645 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
646 .clone()
647 }
648}
649
650fn stream_from_broadcast(
651 history: Vec<(EventId, LogEvent)>,
652 from: Option<EventId>,
653 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
654 queue_depth: usize,
655) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
656 let (tx, rx) = mpsc::channel(queue_depth.max(1));
657 tokio::spawn(async move {
665 let mut last_seen = from.unwrap_or(0);
666 for (event_id, event) in history {
667 last_seen = event_id;
668 if tx.send(Ok((event_id, event))).await.is_err() {
669 return;
670 }
671 }
672
673 loop {
674 tokio::select! {
675 _ = tx.closed() => return,
676 received = live_rx.recv() => {
677 match received {
678 Ok((event_id, event)) if event_id > last_seen => {
679 last_seen = event_id;
680 if tx.send(Ok((event_id, event))).await.is_err() {
681 return;
682 }
683 }
684 Ok(_) => {}
685 Err(broadcast::error::RecvError::Closed) => return,
686 Err(broadcast::error::RecvError::Lagged(_)) => {
687 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
688 return;
689 }
690 }
691 }
692 }
693 }
694 });
695 Box::pin(ReceiverStream::new(rx))
696}
697
698fn prepare_event_after(
699 topic: &Topic,
700 event_id: EventId,
701 previous: Option<(EventId, &LogEvent)>,
702 event: LogEvent,
703) -> Result<LogEvent, LogError> {
704 let previous_hash = previous
705 .map(|(previous_id, previous_event)| {
706 crate::provenance::event_record_hash_from_headers(
707 topic.as_str(),
708 previous_id,
709 previous_event,
710 )
711 })
712 .transpose()?;
713 crate::provenance::prepare_event_for_append(topic.as_str(), event_id, previous_hash, event)
714}
715
716#[derive(Default)]
717struct MemoryState {
718 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
719 latest: HashMap<String, EventId>,
720 consumers: HashMap<(String, String), EventId>,
721}
722
723pub struct MemoryEventLog {
724 state: Mutex<MemoryState>,
725 broadcasts: BroadcastMap,
726 queue_depth: usize,
727}
728
729impl MemoryEventLog {
730 pub fn new(queue_depth: usize) -> Self {
731 Self {
732 state: Mutex::new(MemoryState::default()),
733 broadcasts: BroadcastMap::default(),
734 queue_depth: queue_depth.max(1),
735 }
736 }
737
738 fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
739 self.state
740 .lock()
741 .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
742 }
743
744 async fn topics(&self) -> Result<Vec<Topic>, LogError> {
745 let state = self.state()?;
746 let mut topics = state
747 .topics
748 .keys()
749 .map(|topic| Topic::new(topic.clone()))
750 .collect::<Result<Vec<_>, _>>()?;
751 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
752 Ok(topics)
753 }
754
755 async fn append_idempotent_by_header(
756 &self,
757 topic: &Topic,
758 header: &str,
759 value: &str,
760 event: LogEvent,
761 ) -> Result<AppendOutcome, LogError> {
762 let mut state = self.state()?;
763 if let Some((event_id, existing)) = state
764 .topics
765 .get(topic.as_str())
766 .into_iter()
767 .flat_map(|events| events.iter())
768 .find(|(_, event)| {
769 event
770 .headers
771 .get(header)
772 .is_some_and(|found| found == value)
773 })
774 {
775 return Ok(AppendOutcome {
776 event_id: *event_id,
777 event: existing.clone(),
778 inserted: false,
779 });
780 }
781
782 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
783 let previous = state
784 .topics
785 .get(topic.as_str())
786 .and_then(|events| events.back())
787 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
788 let event = prepare_event_after(topic, event_id, previous, event)?;
789 state.latest.insert(topic.as_str().to_string(), event_id);
790 state
791 .topics
792 .entry(topic.as_str().to_string())
793 .or_default()
794 .push_back((event_id, event.clone()));
795 drop(state);
796 self.broadcasts
797 .publish(topic, self.queue_depth, (event_id, event.clone()));
798 Ok(AppendOutcome {
799 event_id,
800 event,
801 inserted: true,
802 })
803 }
804}
805
806impl EventLog for MemoryEventLog {
807 fn describe(&self) -> EventLogDescription {
808 EventLogDescription {
809 backend: EventLogBackendKind::Memory,
810 location: None,
811 size_bytes: None,
812 queue_depth: self.queue_depth,
813 }
814 }
815
816 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
817 let mut state = self.state()?;
818 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
819 let previous = state
820 .topics
821 .get(topic.as_str())
822 .and_then(|events| events.back())
823 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
824 let event = prepare_event_after(topic, event_id, previous, event)?;
825 state.latest.insert(topic.as_str().to_string(), event_id);
826 state
827 .topics
828 .entry(topic.as_str().to_string())
829 .or_default()
830 .push_back((event_id, event.clone()));
831 drop(state);
832 self.broadcasts
833 .publish(topic, self.queue_depth, (event_id, event));
834 Ok(event_id)
835 }
836
837 async fn flush(&self) -> Result<(), LogError> {
838 Ok(())
839 }
840
841 async fn read_range(
842 &self,
843 topic: &Topic,
844 from: Option<EventId>,
845 limit: usize,
846 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
847 let from = from.unwrap_or(0);
848 let state = self.state()?;
849 let events = state
850 .topics
851 .get(topic.as_str())
852 .into_iter()
853 .flat_map(|events| events.iter())
854 .filter(|(event_id, _)| *event_id > from)
855 .take(limit)
856 .map(|(event_id, event)| (*event_id, event.clone()))
857 .collect();
858 Ok(events)
859 }
860
861 async fn subscribe(
862 self: Arc<Self>,
863 topic: &Topic,
864 from: Option<EventId>,
865 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
866 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
867 let history = self.read_range(topic, from, usize::MAX).await?;
868 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
869 }
870
871 async fn ack(
872 &self,
873 topic: &Topic,
874 consumer: &ConsumerId,
875 up_to: EventId,
876 ) -> Result<(), LogError> {
877 let mut state = self.state()?;
878 state.consumers.insert(
879 (topic.as_str().to_string(), consumer.as_str().to_string()),
880 up_to,
881 );
882 Ok(())
883 }
884
885 async fn consumer_cursor(
886 &self,
887 topic: &Topic,
888 consumer: &ConsumerId,
889 ) -> Result<Option<EventId>, LogError> {
890 let state = self.state()?;
891 Ok(state
892 .consumers
893 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
894 .copied())
895 }
896
897 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
898 let state = self.state()?;
899 Ok(state.latest.get(topic.as_str()).copied())
900 }
901
902 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
903 let mut state = self.state()?;
904 let Some(events) = state.topics.get_mut(topic.as_str()) else {
905 return Ok(CompactReport::default());
906 };
907 let removed = events
908 .iter()
909 .take_while(|(event_id, _)| *event_id <= before)
910 .count();
911 for _ in 0..removed {
912 events.pop_front();
913 }
914 Ok(CompactReport {
915 removed,
916 remaining: events.len(),
917 latest: state.latest.get(topic.as_str()).copied(),
918 checkpointed: false,
919 })
920 }
921}
922
923#[derive(Serialize, Deserialize)]
924struct FileRecord {
925 id: EventId,
926 event: LogEvent,
927}
928
929pub struct FileEventLog {
930 root: PathBuf,
931 latest_ids: Mutex<HashMap<String, EventId>>,
932 write_lock: Mutex<()>,
933 broadcasts: BroadcastMap,
934 queue_depth: usize,
935}
936
937impl FileEventLog {
938 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
939 std::fs::create_dir_all(root.join("topics"))
940 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
941 std::fs::create_dir_all(root.join("consumers"))
942 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
943 Ok(Self {
944 root,
945 latest_ids: Mutex::new(HashMap::new()),
946 write_lock: Mutex::new(()),
947 broadcasts: BroadcastMap::default(),
948 queue_depth: queue_depth.max(1),
949 })
950 }
951
952 fn topic_path(&self, topic: &Topic) -> PathBuf {
953 self.root
954 .join("topics")
955 .join(format!("{}.jsonl", topic.as_str()))
956 }
957
958 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
959 self.root.join("consumers").join(format!(
960 "{}__{}.json",
961 topic.as_str(),
962 sanitize_filename(consumer.as_str())
963 ))
964 }
965
966 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
967 if let Some(event_id) = self
968 .latest_ids
969 .lock()
970 .expect("file event log latest ids poisoned")
971 .get(topic.as_str())
972 .copied()
973 {
974 return Ok(event_id);
975 }
976
977 let mut latest = 0;
978 let path = self.topic_path(topic);
979 if path.is_file() {
980 for record in read_file_records(&path)? {
981 latest = record.id;
982 }
983 }
984 self.latest_ids
985 .lock()
986 .expect("file event log latest ids poisoned")
987 .insert(topic.as_str().to_string(), latest);
988 Ok(latest)
989 }
990
991 fn read_range_sync(
992 &self,
993 topic: &Topic,
994 from: Option<EventId>,
995 limit: usize,
996 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
997 let path = self.topic_path(topic);
998 if !path.is_file() {
999 return Ok(Vec::new());
1000 }
1001 let from = from.unwrap_or(0);
1002 let mut events = Vec::new();
1003 for record in read_file_records(&path)? {
1004 if record.id > from {
1005 events.push((record.id, record.event));
1006 }
1007 if events.len() >= limit {
1008 break;
1009 }
1010 }
1011 Ok(events)
1012 }
1013
1014 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1015 let topics_dir = self.root.join("topics");
1016 if !topics_dir.is_dir() {
1017 return Ok(Vec::new());
1018 }
1019 let mut topics = Vec::new();
1020 for entry in std::fs::read_dir(&topics_dir)
1021 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
1022 {
1023 let entry = entry
1024 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
1025 let path = entry.path();
1026 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
1027 continue;
1028 }
1029 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1030 continue;
1031 };
1032 topics.push(Topic::new(stem.to_string())?);
1033 }
1034 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
1035 Ok(topics)
1036 }
1037
1038 fn append_idempotent_by_header(
1039 &self,
1040 topic: &Topic,
1041 header: &str,
1042 value: &str,
1043 event: LogEvent,
1044 ) -> Result<AppendOutcome, LogError> {
1045 let _guard = self
1046 .write_lock
1047 .lock()
1048 .expect("file event log write lock poisoned");
1049 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1050 if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
1051 event
1052 .headers
1053 .get(header)
1054 .is_some_and(|found| found == value)
1055 }) {
1056 return Ok(AppendOutcome {
1057 event_id: *event_id,
1058 event: existing.clone(),
1059 inserted: false,
1060 });
1061 }
1062
1063 let next_id = self.latest_id_for_topic(topic)? + 1;
1064 let previous = existing_events
1065 .last()
1066 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1067 let event = prepare_event_after(topic, next_id, previous, event)?;
1068 self.append_record_locked(topic, next_id, event)
1069 }
1070
1071 fn append_record_locked(
1072 &self,
1073 topic: &Topic,
1074 event_id: EventId,
1075 event: LogEvent,
1076 ) -> Result<AppendOutcome, LogError> {
1077 let record = FileRecord {
1078 id: event_id,
1079 event: event.clone(),
1080 };
1081 let path = self.topic_path(topic);
1082 if let Some(parent) = path.parent() {
1083 std::fs::create_dir_all(parent)
1084 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1085 }
1086 let line = serde_json::to_string(&record)
1087 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1088 use std::io::Write as _;
1089 let mut file = std::fs::OpenOptions::new()
1090 .create(true)
1091 .append(true)
1092 .open(&path)
1093 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1094 writeln!(file, "{line}")
1095 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1096 self.latest_ids
1097 .lock()
1098 .expect("file event log latest ids poisoned")
1099 .insert(topic.as_str().to_string(), event_id);
1100 self.broadcasts
1101 .publish(topic, self.queue_depth, (event_id, event.clone()));
1102 Ok(AppendOutcome {
1103 event_id,
1104 event,
1105 inserted: true,
1106 })
1107 }
1108}
1109
1110fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
1111 let file = std::fs::File::open(path)
1112 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1113 let mut reader = std::io::BufReader::new(file);
1114 let mut records = Vec::new();
1115 let mut line = Vec::new();
1116 loop {
1117 line.clear();
1118 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
1119 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
1120 if bytes_read == 0 {
1121 break;
1122 }
1123 if line.iter().all(u8::is_ascii_whitespace) {
1124 continue;
1125 }
1126 let complete_line = line.ends_with(b"\n");
1127 match serde_json::from_slice::<FileRecord>(&line) {
1128 Ok(record) => records.push(record),
1129 Err(_) if !complete_line => break,
1130 Err(error) => {
1131 return Err(LogError::Serde(format!("event log parse error: {error}")));
1132 }
1133 }
1134 }
1135 Ok(records)
1136}
1137
1138impl EventLog for FileEventLog {
1139 fn describe(&self) -> EventLogDescription {
1140 EventLogDescription {
1141 backend: EventLogBackendKind::File,
1142 location: Some(self.root.clone()),
1143 size_bytes: Some(dir_size_bytes(&self.root)),
1144 queue_depth: self.queue_depth,
1145 }
1146 }
1147
1148 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1149 let _guard = self
1150 .write_lock
1151 .lock()
1152 .expect("file event log write lock poisoned");
1153 let next_id = self.latest_id_for_topic(topic)? + 1;
1154 let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1155 let previous = existing_events
1156 .last()
1157 .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1158 let event = prepare_event_after(topic, next_id, previous, event)?;
1159 self.append_record_locked(topic, next_id, event)
1160 .map(|outcome| outcome.event_id)
1161 }
1162
1163 async fn flush(&self) -> Result<(), LogError> {
1164 sync_tree(&self.root)
1165 }
1166
1167 async fn read_range(
1168 &self,
1169 topic: &Topic,
1170 from: Option<EventId>,
1171 limit: usize,
1172 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1173 self.read_range_sync(topic, from, limit)
1174 }
1175
1176 async fn read_range_bytes(
1177 &self,
1178 topic: &Topic,
1179 from: Option<EventId>,
1180 limit: usize,
1181 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1182 self.read_range_sync(topic, from, limit)?
1183 .into_iter()
1184 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1185 .collect()
1186 }
1187
1188 async fn subscribe(
1189 self: Arc<Self>,
1190 topic: &Topic,
1191 from: Option<EventId>,
1192 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1193 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1194 let history = self.read_range_sync(topic, from, usize::MAX)?;
1195 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1196 }
1197
1198 async fn ack(
1199 &self,
1200 topic: &Topic,
1201 consumer: &ConsumerId,
1202 up_to: EventId,
1203 ) -> Result<(), LogError> {
1204 let path = self.consumer_path(topic, consumer);
1205 let payload = serde_json::json!({
1206 "topic": topic.as_str(),
1207 "consumer_id": consumer.as_str(),
1208 "cursor": up_to,
1209 "updated_at_ms": now_ms(),
1210 });
1211 write_json_atomically(&path, &payload)
1212 }
1213
1214 async fn consumer_cursor(
1215 &self,
1216 topic: &Topic,
1217 consumer: &ConsumerId,
1218 ) -> Result<Option<EventId>, LogError> {
1219 let path = self.consumer_path(topic, consumer);
1220 if !path.is_file() {
1221 return Ok(None);
1222 }
1223 let raw = std::fs::read_to_string(&path)
1224 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1225 let payload: serde_json::Value = serde_json::from_str(&raw)
1226 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1227 let cursor = payload
1228 .get("cursor")
1229 .and_then(serde_json::Value::as_u64)
1230 .ok_or_else(|| {
1231 LogError::Serde("event log consumer record missing numeric cursor".to_string())
1232 })?;
1233 Ok(Some(cursor))
1234 }
1235
1236 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1237 let latest = self.latest_id_for_topic(topic)?;
1238 if latest == 0 {
1239 Ok(None)
1240 } else {
1241 Ok(Some(latest))
1242 }
1243 }
1244
1245 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1246 let _guard = self
1247 .write_lock
1248 .lock()
1249 .expect("file event log write lock poisoned");
1250 let path = self.topic_path(topic);
1251 if !path.is_file() {
1252 return Ok(CompactReport::default());
1253 }
1254 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1255 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1256 if retained.is_empty() {
1257 let _ = std::fs::remove_file(&path);
1258 } else {
1259 crate::atomic_io::atomic_write_with(&path, |writer| {
1260 use std::io::Write as _;
1261 for (event_id, event) in &retained {
1262 let line = serde_json::to_string(&FileRecord {
1263 id: *event_id,
1264 event: event.clone(),
1265 })
1266 .map_err(|error| std::io::Error::other(error.to_string()))?;
1267 writeln!(writer, "{line}")?;
1268 }
1269 Ok(())
1270 })
1271 .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1272 }
1273 let latest = retained.last().map(|(event_id, _)| *event_id);
1274 self.latest_ids
1275 .lock()
1276 .expect("file event log latest ids poisoned")
1277 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1278 Ok(CompactReport {
1279 removed,
1280 remaining: retained.len(),
1281 latest,
1282 checkpointed: false,
1283 })
1284 }
1285}
1286
1287pub struct SqliteEventLog {
1288 path: PathBuf,
1289 connection: Mutex<Connection>,
1290 broadcasts: BroadcastMap,
1291 queue_depth: usize,
1292}
1293
1294impl SqliteEventLog {
1295 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1296 if let Some(parent) = path.parent() {
1297 std::fs::create_dir_all(parent)
1298 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1299 }
1300 let connection = Connection::open(&path)
1301 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1302 connection
1309 .busy_timeout(std::time::Duration::from_secs(5))
1310 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1311 connection
1312 .pragma_update(None, "journal_mode", "WAL")
1313 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1314 connection
1315 .pragma_update(None, "synchronous", "NORMAL")
1316 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1317 connection
1318 .execute_batch(
1319 "CREATE TABLE IF NOT EXISTS topic_heads (
1320 topic TEXT PRIMARY KEY,
1321 last_id INTEGER NOT NULL
1322 );
1323 CREATE TABLE IF NOT EXISTS events (
1324 topic TEXT NOT NULL,
1325 event_id INTEGER NOT NULL,
1326 kind TEXT NOT NULL,
1327 payload BLOB NOT NULL,
1328 headers TEXT NOT NULL,
1329 occurred_at_ms INTEGER NOT NULL,
1330 PRIMARY KEY (topic, event_id)
1331 );
1332 CREATE TABLE IF NOT EXISTS consumers (
1333 topic TEXT NOT NULL,
1334 consumer_id TEXT NOT NULL,
1335 cursor INTEGER NOT NULL,
1336 updated_at_ms INTEGER NOT NULL,
1337 PRIMARY KEY (topic, consumer_id)
1338 );
1339 CREATE TABLE IF NOT EXISTS event_idempotency_keys (
1340 topic TEXT NOT NULL,
1341 key TEXT NOT NULL,
1342 value TEXT NOT NULL,
1343 event_id INTEGER NOT NULL,
1344 PRIMARY KEY (topic, key, value),
1345 FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
1346 );",
1347 )
1348 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1349 Ok(Self {
1350 path,
1351 connection: Mutex::new(connection),
1352 broadcasts: BroadcastMap::default(),
1353 queue_depth: queue_depth.max(1),
1354 })
1355 }
1356
1357 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1358 let connection = self
1359 .connection
1360 .lock()
1361 .expect("sqlite event log connection poisoned");
1362 let mut statement = connection
1363 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1364 .map_err(|error| {
1365 LogError::Sqlite(format!("event log topics prepare error: {error}"))
1366 })?;
1367 let rows = statement
1368 .query_map([], |row| row.get::<_, String>(0))
1369 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1370 let mut topics = Vec::new();
1371 for row in rows {
1372 topics.push(Topic::new(row.map_err(|error| {
1373 LogError::Sqlite(format!("event log topic row error: {error}"))
1374 })?)?);
1375 }
1376 Ok(topics)
1377 }
1378
1379 fn append_idempotent_by_header(
1380 &self,
1381 topic: &Topic,
1382 header: &str,
1383 value: &str,
1384 event: LogEvent,
1385 ) -> Result<AppendOutcome, LogError> {
1386 let mut connection = self
1387 .connection
1388 .lock()
1389 .expect("sqlite event log connection poisoned");
1390 let tx = connection
1391 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1392 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1393
1394 let existing = tx
1395 .query_row(
1396 "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
1397 FROM event_idempotency_keys k
1398 JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
1399 WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3",
1400 params![topic.as_str(), header, value],
1401 |row| {
1402 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1403 let headers: String = row.get(3)?;
1404 Ok((
1405 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1406 LogEvent {
1407 kind: row.get(1)?,
1408 payload: serde_json::from_slice(&payload).map_err(|error| {
1409 rusqlite::Error::FromSqlConversionFailure(
1410 payload.len(),
1411 rusqlite::types::Type::Blob,
1412 Box::new(error),
1413 )
1414 })?,
1415 headers: serde_json::from_str(&headers).map_err(|error| {
1416 rusqlite::Error::FromSqlConversionFailure(
1417 headers.len(),
1418 rusqlite::types::Type::Text,
1419 Box::new(error),
1420 )
1421 })?,
1422 occurred_at_ms: row.get(4)?,
1423 },
1424 ))
1425 },
1426 )
1427 .optional()
1428 .map_err(|error| {
1429 LogError::Sqlite(format!("event log idempotency read error: {error}"))
1430 })?;
1431
1432 if let Some((event_id, event)) = existing {
1433 return Ok(AppendOutcome {
1434 event_id,
1435 event,
1436 inserted: false,
1437 });
1438 }
1439
1440 tx.execute(
1441 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1442 params![topic.as_str()],
1443 )
1444 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1445 tx.execute(
1446 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1447 params![topic.as_str()],
1448 )
1449 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1450 let event_id = tx
1451 .query_row(
1452 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1453 params![topic.as_str()],
1454 |row| row.get::<_, i64>(0),
1455 )
1456 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1457 .and_then(sqlite_i64_to_event_id)?;
1458 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1459 let previous = tx
1460 .query_row(
1461 "SELECT event_id, kind, payload, headers, occurred_at_ms
1462 FROM events
1463 WHERE topic = ?1 AND event_id < ?2
1464 ORDER BY event_id DESC
1465 LIMIT 1",
1466 params![topic.as_str(), event_id_sql],
1467 |row| {
1468 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1469 let headers: String = row.get(3)?;
1470 Ok((
1471 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1472 LogEvent {
1473 kind: row.get(1)?,
1474 payload: serde_json::from_slice(&payload).map_err(|error| {
1475 rusqlite::Error::FromSqlConversionFailure(
1476 payload.len(),
1477 rusqlite::types::Type::Blob,
1478 Box::new(error),
1479 )
1480 })?,
1481 headers: serde_json::from_str(&headers).map_err(|error| {
1482 rusqlite::Error::FromSqlConversionFailure(
1483 headers.len(),
1484 rusqlite::types::Type::Text,
1485 Box::new(error),
1486 )
1487 })?,
1488 occurred_at_ms: row.get(4)?,
1489 },
1490 ))
1491 },
1492 )
1493 .optional()
1494 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1495 let event = prepare_event_after(
1496 topic,
1497 event_id,
1498 previous
1499 .as_ref()
1500 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1501 event,
1502 )?;
1503 tx.execute(
1504 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1505 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1506 params![
1507 topic.as_str(),
1508 event_id_sql,
1509 event.kind,
1510 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1511 "event log payload encode error: {error}"
1512 )))?,
1513 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1514 "event log headers encode error: {error}"
1515 )))?,
1516 event.occurred_at_ms
1517 ],
1518 )
1519 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1520 tx.execute(
1521 "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
1522 VALUES (?1, ?2, ?3, ?4)",
1523 params![topic.as_str(), header, value, event_id_sql],
1524 )
1525 .map_err(|error| {
1526 LogError::Sqlite(format!("event log idempotency insert error: {error}"))
1527 })?;
1528 tx.commit()
1529 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1530 self.broadcasts
1531 .publish(topic, self.queue_depth, (event_id, event.clone()));
1532 Ok(AppendOutcome {
1533 event_id,
1534 event,
1535 inserted: true,
1536 })
1537 }
1538}
1539
1540impl EventLog for SqliteEventLog {
1541 fn describe(&self) -> EventLogDescription {
1542 EventLogDescription {
1543 backend: EventLogBackendKind::Sqlite,
1544 location: Some(self.path.clone()),
1545 size_bytes: Some(sqlite_size_bytes(&self.path)),
1546 queue_depth: self.queue_depth,
1547 }
1548 }
1549
1550 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1551 let mut connection = self
1552 .connection
1553 .lock()
1554 .expect("sqlite event log connection poisoned");
1555 let tx = connection
1556 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1557 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1558 tx.execute(
1559 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1560 params![topic.as_str()],
1561 )
1562 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1563 tx.execute(
1564 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1565 params![topic.as_str()],
1566 )
1567 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1568 let event_id = tx
1569 .query_row(
1570 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1571 params![topic.as_str()],
1572 |row| row.get::<_, i64>(0),
1573 )
1574 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1575 .and_then(sqlite_i64_to_event_id)?;
1576 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1577 let previous = tx
1578 .query_row(
1579 "SELECT event_id, kind, payload, headers, occurred_at_ms
1580 FROM events
1581 WHERE topic = ?1 AND event_id < ?2
1582 ORDER BY event_id DESC
1583 LIMIT 1",
1584 params![topic.as_str(), event_id_sql],
1585 |row| {
1586 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1587 let headers: String = row.get(3)?;
1588 Ok((
1589 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1590 LogEvent {
1591 kind: row.get(1)?,
1592 payload: serde_json::from_slice(&payload).map_err(|error| {
1593 rusqlite::Error::FromSqlConversionFailure(
1594 payload.len(),
1595 rusqlite::types::Type::Blob,
1596 Box::new(error),
1597 )
1598 })?,
1599 headers: serde_json::from_str(&headers).map_err(|error| {
1600 rusqlite::Error::FromSqlConversionFailure(
1601 headers.len(),
1602 rusqlite::types::Type::Text,
1603 Box::new(error),
1604 )
1605 })?,
1606 occurred_at_ms: row.get(4)?,
1607 },
1608 ))
1609 },
1610 )
1611 .optional()
1612 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1613 let event = prepare_event_after(
1614 topic,
1615 event_id,
1616 previous
1617 .as_ref()
1618 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1619 event,
1620 )?;
1621 tx.execute(
1622 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1623 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1624 params![
1625 topic.as_str(),
1626 event_id_sql,
1627 event.kind,
1628 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1629 "event log payload encode error: {error}"
1630 )))?,
1631 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1632 "event log headers encode error: {error}"
1633 )))?,
1634 event.occurred_at_ms
1635 ],
1636 )
1637 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1638 tx.commit()
1639 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1640 self.broadcasts
1641 .publish(topic, self.queue_depth, (event_id, event.clone()));
1642 Ok(event_id)
1643 }
1644
1645 async fn flush(&self) -> Result<(), LogError> {
1646 let connection = self
1647 .connection
1648 .lock()
1649 .expect("sqlite event log connection poisoned");
1650 connection
1651 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1652 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1653 Ok(())
1654 }
1655
1656 async fn read_range(
1657 &self,
1658 topic: &Topic,
1659 from: Option<EventId>,
1660 limit: usize,
1661 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1662 let connection = self
1663 .connection
1664 .lock()
1665 .expect("sqlite event log connection poisoned");
1666 let mut statement = connection
1667 .prepare(
1668 "SELECT event_id, kind, payload, headers, occurred_at_ms
1669 FROM events
1670 WHERE topic = ?1 AND event_id > ?2
1671 ORDER BY event_id ASC
1672 LIMIT ?3",
1673 )
1674 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1675 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1676 let rows = statement
1677 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1678 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1679 let headers: String = row.get(3)?;
1680 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1681 Ok((
1682 event_id,
1683 LogEvent {
1684 kind: row.get(1)?,
1685 payload: serde_json::from_slice(&payload).map_err(|error| {
1686 rusqlite::Error::FromSqlConversionFailure(
1687 payload.len(),
1688 rusqlite::types::Type::Blob,
1689 Box::new(error),
1690 )
1691 })?,
1692 headers: serde_json::from_str(&headers).map_err(|error| {
1693 rusqlite::Error::FromSqlConversionFailure(
1694 headers.len(),
1695 rusqlite::types::Type::Text,
1696 Box::new(error),
1697 )
1698 })?,
1699 occurred_at_ms: row.get(4)?,
1700 },
1701 ))
1702 })
1703 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1704 let mut events = Vec::new();
1705 for row in rows {
1706 events.push(
1707 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1708 );
1709 }
1710 Ok(events)
1711 }
1712
1713 async fn read_range_bytes(
1714 &self,
1715 topic: &Topic,
1716 from: Option<EventId>,
1717 limit: usize,
1718 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1719 let connection = self
1720 .connection
1721 .lock()
1722 .expect("sqlite event log connection poisoned");
1723 let mut statement = connection
1724 .prepare(
1725 "SELECT event_id, kind, payload, headers, occurred_at_ms
1726 FROM events
1727 WHERE topic = ?1 AND event_id > ?2
1728 ORDER BY event_id ASC
1729 LIMIT ?3",
1730 )
1731 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1732 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1733 let rows = statement
1734 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1735 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1736 let headers: String = row.get(3)?;
1737 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1738 Ok((
1739 event_id,
1740 LogEventBytes {
1741 kind: row.get(1)?,
1742 payload: Bytes::from(payload),
1743 headers: serde_json::from_str(&headers).map_err(|error| {
1744 rusqlite::Error::FromSqlConversionFailure(
1745 headers.len(),
1746 rusqlite::types::Type::Text,
1747 Box::new(error),
1748 )
1749 })?,
1750 occurred_at_ms: row.get(4)?,
1751 },
1752 ))
1753 })
1754 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1755 let mut events = Vec::new();
1756 for row in rows {
1757 events.push(
1758 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1759 );
1760 }
1761 Ok(events)
1762 }
1763
1764 async fn subscribe(
1765 self: Arc<Self>,
1766 topic: &Topic,
1767 from: Option<EventId>,
1768 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1769 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1770 let history = self.read_range(topic, from, usize::MAX).await?;
1771 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1772 }
1773
1774 async fn ack(
1775 &self,
1776 topic: &Topic,
1777 consumer: &ConsumerId,
1778 up_to: EventId,
1779 ) -> Result<(), LogError> {
1780 let connection = self
1781 .connection
1782 .lock()
1783 .expect("sqlite event log connection poisoned");
1784 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1785 connection
1786 .execute(
1787 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1788 VALUES (?1, ?2, ?3, ?4)
1789 ON CONFLICT(topic, consumer_id)
1790 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1791 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1792 )
1793 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1794 Ok(())
1795 }
1796
1797 async fn consumer_cursor(
1798 &self,
1799 topic: &Topic,
1800 consumer: &ConsumerId,
1801 ) -> Result<Option<EventId>, LogError> {
1802 let connection = self
1803 .connection
1804 .lock()
1805 .expect("sqlite event log connection poisoned");
1806 connection
1807 .query_row(
1808 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1809 params![topic.as_str(), consumer.as_str()],
1810 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1811 )
1812 .optional()
1813 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1814 }
1815
1816 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1817 let connection = self
1818 .connection
1819 .lock()
1820 .expect("sqlite event log connection poisoned");
1821 connection
1822 .query_row(
1823 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1824 params![topic.as_str()],
1825 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1826 )
1827 .optional()
1828 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1829 }
1830
1831 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1832 let connection = self
1833 .connection
1834 .lock()
1835 .expect("sqlite event log connection poisoned");
1836 let before_sql = event_id_to_sqlite_i64(before)?;
1837 connection
1838 .execute(
1839 "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
1840 params![topic.as_str(), before_sql],
1841 )
1842 .map_err(|error| {
1843 LogError::Sqlite(format!("event log idempotency compact error: {error}"))
1844 })?;
1845 let removed = connection
1846 .execute(
1847 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1848 params![topic.as_str(), before_sql],
1849 )
1850 .map_err(|error| {
1851 LogError::Sqlite(format!("event log compact delete error: {error}"))
1852 })?;
1853 let remaining = connection
1854 .query_row(
1855 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1856 params![topic.as_str()],
1857 |row| row.get::<_, i64>(0),
1858 )
1859 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1860 .and_then(sqlite_i64_to_usize)?;
1861 let latest = connection
1862 .query_row(
1863 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1864 params![topic.as_str()],
1865 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1866 )
1867 .optional()
1868 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1869 connection
1870 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1871 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1872 Ok(CompactReport {
1873 removed,
1874 remaining,
1875 latest,
1876 checkpointed: true,
1877 })
1878 }
1879}
1880
1881fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1882 let candidate = PathBuf::from(value);
1883 if candidate.is_absolute() {
1884 candidate
1885 } else {
1886 base_dir.join(candidate)
1887 }
1888}
1889
1890fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1891 let encoded = serde_json::to_vec_pretty(payload)
1892 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1893 crate::atomic_io::atomic_write(path, &encoded)
1894 .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1895}
1896
1897fn sanitize_filename(value: &str) -> String {
1898 sanitize_topic_component(value)
1899}
1900
1901pub fn sanitize_topic_component(value: &str) -> String {
1902 value
1903 .chars()
1904 .map(|ch| {
1905 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1906 ch
1907 } else {
1908 '_'
1909 }
1910 })
1911 .collect()
1912}
1913
1914fn dir_size_bytes(path: &Path) -> u64 {
1915 if !path.exists() {
1916 return 0;
1917 }
1918 let mut total = 0;
1919 if let Ok(entries) = std::fs::read_dir(path) {
1920 for entry in entries.flatten() {
1921 let path = entry.path();
1922 if path.is_dir() {
1923 total += dir_size_bytes(&path);
1924 } else if let Ok(metadata) = entry.metadata() {
1925 total += metadata.len();
1926 }
1927 }
1928 }
1929 total
1930}
1931
1932fn sqlite_size_bytes(path: &Path) -> u64 {
1933 let mut total = file_size(path);
1934 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1935 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1936 total
1937}
1938
1939fn file_size(path: &Path) -> u64 {
1940 std::fs::metadata(path)
1941 .map(|metadata| metadata.len())
1942 .unwrap_or(0)
1943}
1944
1945fn sync_tree(root: &Path) -> Result<(), LogError> {
1946 if !root.exists() {
1947 return Ok(());
1948 }
1949 for entry in std::fs::read_dir(root)
1950 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1951 {
1952 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1953 let path = entry.path();
1954 if path.is_dir() {
1955 sync_tree(&path)?;
1956 continue;
1957 }
1958 std::fs::File::open(&path)
1959 .and_then(|file| file.sync_all())
1960 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1961 }
1962 Ok(())
1963}
1964
1965fn now_ms() -> i64 {
1966 std::time::SystemTime::now()
1967 .duration_since(std::time::UNIX_EPOCH)
1968 .map(|duration| duration.as_millis() as i64)
1969 .unwrap_or(0)
1970}
1971
1972fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1973 i64::try_from(event_id)
1974 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1975}
1976
1977fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1978 u64::try_from(value)
1979 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1980}
1981
1982fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1983 u64::try_from(value).map_err(|_| {
1984 rusqlite::Error::FromSqlConversionFailure(
1985 std::mem::size_of::<i64>(),
1986 rusqlite::types::Type::Integer,
1987 "sqlite event id is negative".into(),
1988 )
1989 })
1990}
1991
1992fn sqlite_json_bytes_for_row(
1993 row: &rusqlite::Row<'_>,
1994 index: usize,
1995 name: &str,
1996) -> rusqlite::Result<Vec<u8>> {
1997 let value = row.get_ref(index)?;
1998 match value {
1999 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
2000 Ok(bytes.to_vec())
2001 }
2002 other => Err(rusqlite::Error::InvalidColumnType(
2003 index,
2004 name.to_string(),
2005 other.data_type(),
2006 )),
2007 }
2008}
2009
2010fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
2011 usize::try_from(value)
2012 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
2013}
2014
2015#[cfg(test)]
2016mod tests {
2017 use super::*;
2018 use futures::StreamExt;
2019 use rand::{rngs::StdRng, RngExt, SeedableRng};
2020
2021 #[test]
2022 fn lazy_default_event_log_opens_on_first_access() {
2023 reset_active_event_log();
2024 let dir = tempfile::tempdir().unwrap();
2025
2026 install_lazy_default_for_base_dir(dir.path()).unwrap();
2027 assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2028 assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2029
2030 let _log = active_event_log().expect("lazy event log should open on demand");
2031 assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2032 assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2033 reset_active_event_log();
2034 }
2035
2036 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
2037 let topic = Topic::new("trigger.inbox").unwrap();
2038 for i in 0..10_000 {
2039 log.append(
2040 &topic,
2041 LogEvent::new("append", serde_json::json!({ "i": i })),
2042 )
2043 .await
2044 .unwrap();
2045 }
2046 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2047 assert_eq!(events.len(), 10_000);
2048 assert_eq!(events.first().unwrap().0, 1);
2049 assert_eq!(events.last().unwrap().0, 10_000);
2050 }
2051
2052 async fn exercise_idempotent_append(log: Arc<AnyEventLog>) {
2053 let topic = Topic::new("channel.tenant.default.pr").unwrap();
2054 let mut first_headers = BTreeMap::new();
2055 first_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2056 let first = log
2057 .append_idempotent_by_header(
2058 &topic,
2059 "harn.channel.id",
2060 "event-1",
2061 LogEvent::new("channel.emit", serde_json::json!({"n": 1}))
2062 .with_headers(first_headers),
2063 )
2064 .await
2065 .unwrap();
2066 assert!(first.inserted);
2067 assert_eq!(first.event_id, 1);
2068
2069 let mut duplicate_headers = BTreeMap::new();
2070 duplicate_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2071 let duplicate = log
2072 .append_idempotent_by_header(
2073 &topic,
2074 "harn.channel.id",
2075 "event-1",
2076 LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2077 .with_headers(duplicate_headers),
2078 )
2079 .await
2080 .unwrap();
2081 assert!(!duplicate.inserted);
2082 assert_eq!(duplicate.event_id, first.event_id);
2083 assert_eq!(duplicate.event.payload, serde_json::json!({"n": 1}));
2084
2085 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2086 assert_eq!(events.len(), 1);
2087 assert_eq!(events[0].0, first.event_id);
2088 }
2089
2090 #[tokio::test(flavor = "current_thread")]
2091 async fn memory_backend_supports_append_read_subscribe_and_compact() {
2092 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2093 exercise_basic_backend(log.clone()).await;
2094
2095 let topic = Topic::new("agent.transcript.demo").unwrap();
2096 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
2097 let first = log
2098 .append(
2099 &topic,
2100 LogEvent::new("message", serde_json::json!({"text":"one"})),
2101 )
2102 .await
2103 .unwrap();
2104 let second = log
2105 .append(
2106 &topic,
2107 LogEvent::new("message", serde_json::json!({"text":"two"})),
2108 )
2109 .await
2110 .unwrap();
2111 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
2112 assert_eq!(seen[0].as_ref().unwrap().0, first);
2113 assert_eq!(seen[1].as_ref().unwrap().0, second);
2114
2115 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
2116 .await
2117 .unwrap();
2118 let compact = log.compact(&topic, first).await.unwrap();
2119 assert_eq!(compact.removed, 1);
2120 assert_eq!(compact.remaining, 1);
2121 }
2122
2123 #[tokio::test(flavor = "current_thread")]
2124 async fn memory_backend_idempotent_append_returns_original_event() {
2125 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2126 exercise_idempotent_append(log).await;
2127 }
2128
2129 #[tokio::test(flavor = "current_thread")]
2130 async fn file_backend_persists_across_reopen_and_compacts() {
2131 let dir = tempfile::tempdir().unwrap();
2132 let topic = Topic::new("trigger.outbox").unwrap();
2133 let first_log = Arc::new(AnyEventLog::File(
2134 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2135 ));
2136 first_log
2137 .append(
2138 &topic,
2139 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
2140 )
2141 .await
2142 .unwrap();
2143 first_log
2144 .append(
2145 &topic,
2146 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
2147 )
2148 .await
2149 .unwrap();
2150 drop(first_log);
2151
2152 let reopened = Arc::new(AnyEventLog::File(
2153 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2154 ));
2155 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2156 assert_eq!(events.len(), 2);
2157 let compact = reopened.compact(&topic, 1).await.unwrap();
2158 assert_eq!(compact.removed, 1);
2159 assert_eq!(
2160 reopened
2161 .read_range(&topic, None, usize::MAX)
2162 .await
2163 .unwrap()
2164 .len(),
2165 1
2166 );
2167 }
2168
2169 #[tokio::test(flavor = "current_thread")]
2170 async fn file_backend_skips_torn_tail_on_restart() {
2171 let dir = tempfile::tempdir().unwrap();
2172 let topic = Topic::new("trigger.inbox").unwrap();
2173 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2174 first_log
2175 .append(
2176 &topic,
2177 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
2178 )
2179 .await
2180 .unwrap();
2181 drop(first_log);
2182
2183 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
2184 use std::io::Write as _;
2185 let mut file = std::fs::OpenOptions::new()
2186 .append(true)
2187 .open(&topic_path)
2188 .unwrap();
2189 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
2190 drop(file);
2191
2192 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2193 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2194 assert_eq!(events.len(), 1);
2195 assert_eq!(events[0].0, 1);
2196 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
2197 }
2198
2199 #[tokio::test(flavor = "current_thread")]
2200 async fn file_backend_idempotent_append_returns_original_event() {
2201 let dir = tempfile::tempdir().unwrap();
2202 let log = Arc::new(AnyEventLog::File(
2203 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2204 ));
2205 exercise_idempotent_append(log).await;
2206 }
2207
2208 #[tokio::test(flavor = "current_thread")]
2209 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
2210 let dir = tempfile::tempdir().unwrap();
2211 let path = dir.path().join("events.sqlite");
2212 let topic = Topic::new("daemon.demo.state").unwrap();
2213 let first_log = Arc::new(AnyEventLog::Sqlite(
2214 SqliteEventLog::open(path.clone(), 8).unwrap(),
2215 ));
2216 first_log
2217 .append(
2218 &topic,
2219 LogEvent::new("state", serde_json::json!({"state":"idle"})),
2220 )
2221 .await
2222 .unwrap();
2223 first_log
2224 .append(
2225 &topic,
2226 LogEvent::new("state", serde_json::json!({"state":"active"})),
2227 )
2228 .await
2229 .unwrap();
2230 drop(first_log);
2231
2232 let reopened = Arc::new(AnyEventLog::Sqlite(
2233 SqliteEventLog::open(path.clone(), 8).unwrap(),
2234 ));
2235 assert_eq!(
2236 reopened
2237 .read_range(&topic, None, usize::MAX)
2238 .await
2239 .unwrap()
2240 .len(),
2241 2
2242 );
2243 let compact = reopened.compact(&topic, 1).await.unwrap();
2244 assert!(compact.checkpointed);
2245 let wal = PathBuf::from(format!("{}-wal", path.display()));
2246 assert!(file_size(&wal) == 0 || !wal.exists());
2247 }
2248
2249 #[tokio::test(flavor = "current_thread")]
2250 async fn sqlite_backend_idempotent_append_returns_original_event() {
2251 let dir = tempfile::tempdir().unwrap();
2252 let path = dir.path().join("events.sqlite");
2253 let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2254 exercise_idempotent_append(log).await;
2255 }
2256
2257 #[tokio::test(flavor = "current_thread")]
2258 async fn sqlite_backend_compacts_idempotency_keys_with_events() {
2259 let dir = tempfile::tempdir().unwrap();
2260 let path = dir.path().join("events.sqlite");
2261 let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2262 let topic = Topic::new("channel.tenant.default.compacted").unwrap();
2263 let mut headers = BTreeMap::new();
2264 headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2265 let first = log
2266 .append_idempotent_by_header(
2267 &topic,
2268 "harn.channel.id",
2269 "event-1",
2270 LogEvent::new("channel.emit", serde_json::json!({"n": 1})).with_headers(headers),
2271 )
2272 .await
2273 .unwrap();
2274 log.compact(&topic, first.event_id).await.unwrap();
2275
2276 let mut replacement_headers = BTreeMap::new();
2277 replacement_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2278 let replacement = log
2279 .append_idempotent_by_header(
2280 &topic,
2281 "harn.channel.id",
2282 "event-1",
2283 LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2284 .with_headers(replacement_headers),
2285 )
2286 .await
2287 .unwrap();
2288 assert!(replacement.inserted);
2289 assert!(replacement.event_id > first.event_id);
2290 assert_eq!(replacement.event.payload, serde_json::json!({"n": 2}));
2291 }
2292
2293 #[tokio::test(flavor = "current_thread")]
2294 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
2295 let dir = tempfile::tempdir().unwrap();
2296 let path = dir.path().join("events.sqlite");
2297 let topic = Topic::new("observability.action_graph").unwrap();
2298 let log = SqliteEventLog::open(path, 8).unwrap();
2299 let event_id = log
2300 .append(
2301 &topic,
2302 LogEvent::new(
2303 "snapshot",
2304 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
2305 ),
2306 )
2307 .await
2308 .unwrap();
2309
2310 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2311 assert_eq!(events.len(), 1);
2312 assert_eq!(events[0].0, event_id);
2313 assert_eq!(
2314 events[0].1.payload_json().unwrap(),
2315 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
2316 );
2317 }
2318
2319 #[tokio::test(flavor = "current_thread")]
2320 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
2321 let dir = tempfile::tempdir().unwrap();
2322 let path = dir.path().join("events.sqlite");
2323 let topic = Topic::new("agent.transcript.legacy").unwrap();
2324 let log = SqliteEventLog::open(path, 8).unwrap();
2325 {
2326 let connection = log.connection.lock().unwrap();
2327 connection
2328 .execute(
2329 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
2330 params![topic.as_str()],
2331 )
2332 .unwrap();
2333 connection
2334 .execute(
2335 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
2336 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
2337 params![topic.as_str(), "{\"text\":\"old\"}"],
2338 )
2339 .unwrap();
2340 }
2341
2342 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2343 assert_eq!(
2344 events[0].1.payload_json().unwrap(),
2345 serde_json::json!({"text": "old"})
2346 );
2347 assert_eq!(
2348 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
2349 "legacy"
2350 );
2351 }
2352
2353 #[tokio::test(flavor = "current_thread")]
2354 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
2355 let (sender, rx) = broadcast::channel(2);
2356 for i in 0..10 {
2357 sender
2358 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
2359 .unwrap();
2360 }
2361 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2362
2363 match stream.next().await {
2364 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
2365 other => panic!("subscriber should surface lag, got {other:?}"),
2366 }
2367 }
2368
2369 #[tokio::test(flavor = "current_thread")]
2370 async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
2371 let (sender, rx) = broadcast::channel(2);
2372 let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2373 assert_eq!(sender.receiver_count(), 1);
2374 drop(stream);
2375
2376 tokio::time::timeout(std::time::Duration::from_millis(100), async {
2377 while sender.receiver_count() != 0 {
2378 tokio::task::yield_now().await;
2379 }
2380 })
2381 .await
2382 .expect("subscription receiver should close after consumer drop");
2383 }
2384
2385 #[tokio::test(flavor = "current_thread")]
2386 async fn randomized_reader_sequences_stay_monotonic() {
2387 let log = Arc::new(MemoryEventLog::new(32));
2388 let topic = Topic::new("fuzz.demo").unwrap();
2389 let mut readers = vec![
2390 log.clone().subscribe(&topic, None).await.unwrap(),
2391 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
2392 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
2393 ];
2394 let mut rng = StdRng::seed_from_u64(7);
2395 for _ in 0..64 {
2396 let value = rng.random_range(0..1000);
2397 log.append(
2398 &topic,
2399 LogEvent::new("rand", serde_json::json!({"value": value})),
2400 )
2401 .await
2402 .unwrap();
2403 }
2404
2405 let mut sequences = Vec::new();
2406 for reader in &mut readers {
2407 let mut ids = Vec::new();
2408 while let Some(item) = reader.next().await {
2409 match item {
2410 Ok((event_id, _)) => {
2411 ids.push(event_id);
2412 if ids.len() >= 16 {
2413 break;
2414 }
2415 }
2416 Err(LogError::ConsumerLagged(_)) => break,
2417 Err(error) => panic!("unexpected subscription error: {error}"),
2418 }
2419 }
2420 sequences.push(ids);
2421 }
2422
2423 for ids in sequences {
2424 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
2425 }
2426 }
2427}