1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27 let value = value.into();
28 if value.is_empty() {
29 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30 }
31 if !value
32 .chars()
33 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34 {
35 return Err(LogError::InvalidTopic(format!(
36 "topic '{value}' contains unsupported characters"
37 )));
38 }
39 Ok(Self(value))
40 }
41
42 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl fmt::Display for Topic {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 self.0.fmt(f)
50 }
51}
52
53impl FromStr for Topic {
54 type Err = LogError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 Self::new(s)
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66 let value = value.into();
67 if value.trim().is_empty() {
68 return Err(LogError::InvalidConsumer(
69 "consumer id cannot be empty".to_string(),
70 ));
71 }
72 Ok(Self(value))
73 }
74
75 pub fn as_str(&self) -> &str {
76 &self.0
77 }
78}
79
80impl fmt::Display for ConsumerId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 self.0.fmt(f)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89 Memory,
90 File,
91 Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::Memory => write!(f, "memory"),
98 Self::File => write!(f, "file"),
99 Self::Sqlite => write!(f, "sqlite"),
100 }
101 }
102}
103
104impl FromStr for EventLogBackendKind {
105 type Err = LogError;
106
107 fn from_str(value: &str) -> Result<Self, Self::Err> {
108 match value.trim().to_ascii_lowercase().as_str() {
109 "memory" => Ok(Self::Memory),
110 "file" => Ok(Self::File),
111 "sqlite" => Ok(Self::Sqlite),
112 other => Err(LogError::Config(format!(
113 "unsupported event log backend '{other}'"
114 ))),
115 }
116 }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121 pub kind: String,
122 pub payload: serde_json::Value,
123 #[serde(default)]
124 pub headers: BTreeMap<String, String>,
125 pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130 Self {
131 kind: kind.into(),
132 payload,
133 headers: BTreeMap::new(),
134 occurred_at_ms: now_ms(),
135 }
136 }
137
138 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139 self.headers = headers;
140 self
141 }
142
143 pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
149 self.headers = policy.redact_headers(&self.headers);
150 policy.redact_json_in_place(&mut self.payload);
151 }
152}
153
154#[derive(Clone, Debug, PartialEq, Eq)]
161pub struct LogEventBytes {
162 pub kind: String,
163 pub payload: Bytes,
164 pub headers: BTreeMap<String, String>,
165 pub occurred_at_ms: i64,
166}
167
168impl LogEventBytes {
169 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
170 serde_json::from_slice(&self.payload)
171 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
172 }
173
174 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
175 Ok(LogEvent {
176 kind: self.kind,
177 payload: serde_json::from_slice(&self.payload).map_err(|error| {
178 LogError::Serde(format!("event log payload parse error: {error}"))
179 })?,
180 headers: self.headers,
181 occurred_at_ms: self.occurred_at_ms,
182 })
183 }
184}
185
186impl TryFrom<LogEvent> for LogEventBytes {
187 type Error = LogError;
188
189 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
190 let payload = serde_json::to_vec(&event.payload)
191 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
192 Ok(Self {
193 kind: event.kind,
194 payload: Bytes::from(payload),
195 headers: event.headers,
196 occurred_at_ms: event.occurred_at_ms,
197 })
198 }
199}
200
201#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
202pub struct CompactReport {
203 pub removed: usize,
204 pub remaining: usize,
205 pub latest: Option<EventId>,
206 pub checkpointed: bool,
207}
208
209#[derive(Clone, Debug, PartialEq, Eq)]
210pub struct EventLogDescription {
211 pub backend: EventLogBackendKind,
212 pub location: Option<PathBuf>,
213 pub size_bytes: Option<u64>,
214 pub queue_depth: usize,
215}
216
217#[derive(Debug)]
218pub enum LogError {
219 Config(String),
220 InvalidTopic(String),
221 InvalidConsumer(String),
222 Io(String),
223 Serde(String),
224 Sqlite(String),
225 ConsumerLagged(EventId),
226}
227
228impl fmt::Display for LogError {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 match self {
231 Self::Config(message)
232 | Self::InvalidTopic(message)
233 | Self::InvalidConsumer(message)
234 | Self::Io(message)
235 | Self::Serde(message)
236 | Self::Sqlite(message) => message.fmt(f),
237 Self::ConsumerLagged(last_id) => {
238 write!(f, "subscriber lagged behind after event {last_id}")
239 }
240 }
241 }
242}
243
244impl std::error::Error for LogError {}
245
246#[allow(async_fn_in_trait)]
247pub trait EventLog: Send + Sync {
248 fn describe(&self) -> EventLogDescription;
249
250 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
251
252 async fn flush(&self) -> Result<(), LogError>;
253
254 async fn read_range(
257 &self,
258 topic: &Topic,
259 from: Option<EventId>,
260 limit: usize,
261 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
262
263 async fn read_range_bytes(
264 &self,
265 topic: &Topic,
266 from: Option<EventId>,
267 limit: usize,
268 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
269 let events = self.read_range(topic, from, limit).await?;
270 events
271 .into_iter()
272 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
273 .collect()
274 }
275
276 async fn subscribe(
279 self: Arc<Self>,
280 topic: &Topic,
281 from: Option<EventId>,
282 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
283
284 async fn ack(
285 &self,
286 topic: &Topic,
287 consumer: &ConsumerId,
288 up_to: EventId,
289 ) -> Result<(), LogError>;
290
291 async fn consumer_cursor(
292 &self,
293 topic: &Topic,
294 consumer: &ConsumerId,
295 ) -> Result<Option<EventId>, LogError>;
296
297 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
298
299 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
300}
301
302#[derive(Clone, Debug)]
303pub struct EventLogConfig {
304 pub backend: EventLogBackendKind,
305 pub file_dir: PathBuf,
306 pub sqlite_path: PathBuf,
307 pub queue_depth: usize,
308}
309
310impl EventLogConfig {
311 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
312 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
313 .ok()
314 .map(|value| value.parse())
315 .transpose()?
316 .unwrap_or(EventLogBackendKind::Sqlite);
317 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
318 .ok()
319 .and_then(|value| value.parse::<usize>().ok())
320 .unwrap_or(128)
321 .max(1);
322
323 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
324 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
325 _ => crate::runtime_paths::event_log_dir(base_dir),
326 };
327 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
328 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
329 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
330 };
331
332 Ok(Self {
333 backend,
334 file_dir,
335 sqlite_path,
336 queue_depth,
337 })
338 }
339
340 pub fn location(&self) -> Option<PathBuf> {
341 match self.backend {
342 EventLogBackendKind::Memory => None,
343 EventLogBackendKind::File => Some(self.file_dir.clone()),
344 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
345 }
346 }
347}
348
349thread_local! {
350 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
351}
352
353pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
354 let config = EventLogConfig::for_base_dir(base_dir)?;
355 let log = open_event_log(&config)?;
356 ACTIVE_EVENT_LOG.with(|slot| {
357 *slot.borrow_mut() = Some(log.clone());
358 });
359 Ok(log)
360}
361
362pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
363 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
364 ACTIVE_EVENT_LOG.with(|slot| {
365 *slot.borrow_mut() = Some(log.clone());
366 });
367 log
368}
369
370pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
371 ACTIVE_EVENT_LOG.with(|slot| {
372 *slot.borrow_mut() = Some(log.clone());
373 });
374 log
375}
376
377pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
378 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
379}
380
381pub fn reset_active_event_log() {
382 ACTIVE_EVENT_LOG.with(|slot| {
383 *slot.borrow_mut() = None;
384 });
385}
386
387pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
388 let config = EventLogConfig::for_base_dir(base_dir)?;
389 let description = match config.backend {
390 EventLogBackendKind::Memory => EventLogDescription {
391 backend: EventLogBackendKind::Memory,
392 location: None,
393 size_bytes: None,
394 queue_depth: config.queue_depth,
395 },
396 EventLogBackendKind::File => EventLogDescription {
397 backend: EventLogBackendKind::File,
398 size_bytes: Some(dir_size_bytes(&config.file_dir)),
399 location: Some(config.file_dir),
400 queue_depth: config.queue_depth,
401 },
402 EventLogBackendKind::Sqlite => EventLogDescription {
403 backend: EventLogBackendKind::Sqlite,
404 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
405 location: Some(config.sqlite_path),
406 queue_depth: config.queue_depth,
407 },
408 };
409 Ok(description)
410}
411
412pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
413 match config.backend {
414 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
415 config.queue_depth,
416 )))),
417 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
418 config.file_dir.clone(),
419 config.queue_depth,
420 )?))),
421 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
422 config.sqlite_path.clone(),
423 config.queue_depth,
424 )?))),
425 }
426}
427
428pub enum AnyEventLog {
429 Memory(MemoryEventLog),
430 File(FileEventLog),
431 Sqlite(SqliteEventLog),
432}
433
434impl AnyEventLog {
435 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
436 match self {
437 Self::Memory(log) => log.topics().await,
438 Self::File(log) => log.topics(),
439 Self::Sqlite(log) => log.topics(),
440 }
441 }
442}
443
444impl EventLog for AnyEventLog {
445 fn describe(&self) -> EventLogDescription {
446 match self {
447 Self::Memory(log) => log.describe(),
448 Self::File(log) => log.describe(),
449 Self::Sqlite(log) => log.describe(),
450 }
451 }
452
453 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
454 match self {
455 Self::Memory(log) => log.append(topic, event).await,
456 Self::File(log) => log.append(topic, event).await,
457 Self::Sqlite(log) => log.append(topic, event).await,
458 }
459 }
460
461 async fn flush(&self) -> Result<(), LogError> {
462 match self {
463 Self::Memory(log) => log.flush().await,
464 Self::File(log) => log.flush().await,
465 Self::Sqlite(log) => log.flush().await,
466 }
467 }
468
469 async fn read_range(
470 &self,
471 topic: &Topic,
472 from: Option<EventId>,
473 limit: usize,
474 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
475 match self {
476 Self::Memory(log) => log.read_range(topic, from, limit).await,
477 Self::File(log) => log.read_range(topic, from, limit).await,
478 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
479 }
480 }
481
482 async fn read_range_bytes(
483 &self,
484 topic: &Topic,
485 from: Option<EventId>,
486 limit: usize,
487 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
488 match self {
489 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
490 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
491 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
492 }
493 }
494
495 async fn subscribe(
496 self: Arc<Self>,
497 topic: &Topic,
498 from: Option<EventId>,
499 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
500 let (rx, queue_depth) = match self.as_ref() {
501 Self::Memory(log) => (
502 log.broadcasts.subscribe(topic, log.queue_depth),
503 log.queue_depth,
504 ),
505 Self::File(log) => (
506 log.broadcasts.subscribe(topic, log.queue_depth),
507 log.queue_depth,
508 ),
509 Self::Sqlite(log) => (
510 log.broadcasts.subscribe(topic, log.queue_depth),
511 log.queue_depth,
512 ),
513 };
514 let history = self.read_range(topic, from, usize::MAX).await?;
515 Ok(stream_from_broadcast(history, from, rx, queue_depth))
516 }
517
518 async fn ack(
519 &self,
520 topic: &Topic,
521 consumer: &ConsumerId,
522 up_to: EventId,
523 ) -> Result<(), LogError> {
524 match self {
525 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
526 Self::File(log) => log.ack(topic, consumer, up_to).await,
527 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
528 }
529 }
530
531 async fn consumer_cursor(
532 &self,
533 topic: &Topic,
534 consumer: &ConsumerId,
535 ) -> Result<Option<EventId>, LogError> {
536 match self {
537 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
538 Self::File(log) => log.consumer_cursor(topic, consumer).await,
539 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
540 }
541 }
542
543 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
544 match self {
545 Self::Memory(log) => log.latest(topic).await,
546 Self::File(log) => log.latest(topic).await,
547 Self::Sqlite(log) => log.latest(topic).await,
548 }
549 }
550
551 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
552 match self {
553 Self::Memory(log) => log.compact(topic, before).await,
554 Self::File(log) => log.compact(topic, before).await,
555 Self::Sqlite(log) => log.compact(topic, before).await,
556 }
557 }
558}
559
560#[derive(Default)]
561struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
562
563impl BroadcastMap {
564 fn subscribe(
565 &self,
566 topic: &Topic,
567 capacity: usize,
568 ) -> broadcast::Receiver<(EventId, LogEvent)> {
569 self.sender(topic, capacity).subscribe()
570 }
571
572 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
573 let _ = self.sender(topic, capacity).send(record);
574 }
575
576 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
577 let mut map = self.0.lock().expect("event log broadcast map poisoned");
578 map.entry(topic.as_str().to_string())
579 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
580 .clone()
581 }
582}
583
584fn stream_from_broadcast(
585 history: Vec<(EventId, LogEvent)>,
586 from: Option<EventId>,
587 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
588 queue_depth: usize,
589) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
590 let (tx, rx) = mpsc::channel(queue_depth.max(1));
591 tokio::spawn(async move {
599 let mut last_seen = from.unwrap_or(0);
600 for (event_id, event) in history {
601 last_seen = event_id;
602 if tx.send(Ok((event_id, event))).await.is_err() {
603 return;
604 }
605 }
606
607 loop {
608 tokio::select! {
609 _ = tx.closed() => return,
610 received = live_rx.recv() => {
611 match received {
612 Ok((event_id, event)) if event_id > last_seen => {
613 last_seen = event_id;
614 if tx.send(Ok((event_id, event))).await.is_err() {
615 return;
616 }
617 }
618 Ok(_) => {}
619 Err(broadcast::error::RecvError::Closed) => return,
620 Err(broadcast::error::RecvError::Lagged(_)) => {
621 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
622 return;
623 }
624 }
625 }
626 }
627 }
628 });
629 Box::pin(ReceiverStream::new(rx))
630}
631
632#[derive(Default)]
633struct MemoryState {
634 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
635 latest: HashMap<String, EventId>,
636 consumers: HashMap<(String, String), EventId>,
637}
638
639pub struct MemoryEventLog {
640 state: tokio::sync::Mutex<MemoryState>,
641 broadcasts: BroadcastMap,
642 queue_depth: usize,
643}
644
645impl MemoryEventLog {
646 pub fn new(queue_depth: usize) -> Self {
647 Self {
648 state: tokio::sync::Mutex::new(MemoryState::default()),
649 broadcasts: BroadcastMap::default(),
650 queue_depth: queue_depth.max(1),
651 }
652 }
653
654 async fn topics(&self) -> Result<Vec<Topic>, LogError> {
655 let state = self.state.lock().await;
656 let mut topics = state
657 .topics
658 .keys()
659 .map(|topic| Topic::new(topic.clone()))
660 .collect::<Result<Vec<_>, _>>()?;
661 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
662 Ok(topics)
663 }
664}
665
666impl EventLog for MemoryEventLog {
667 fn describe(&self) -> EventLogDescription {
668 EventLogDescription {
669 backend: EventLogBackendKind::Memory,
670 location: None,
671 size_bytes: None,
672 queue_depth: self.queue_depth,
673 }
674 }
675
676 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
677 let mut state = self.state.lock().await;
678 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
679 let previous_hash = state
680 .topics
681 .get(topic.as_str())
682 .and_then(|events| events.back())
683 .map(|(previous_id, previous_event)| {
684 crate::provenance::event_record_hash_from_headers(
685 topic.as_str(),
686 *previous_id,
687 previous_event,
688 )
689 })
690 .transpose()?;
691 let event = crate::provenance::prepare_event_for_append(
692 topic.as_str(),
693 event_id,
694 previous_hash,
695 event,
696 )?;
697 state.latest.insert(topic.as_str().to_string(), event_id);
698 state
699 .topics
700 .entry(topic.as_str().to_string())
701 .or_default()
702 .push_back((event_id, event.clone()));
703 drop(state);
704 self.broadcasts
705 .publish(topic, self.queue_depth, (event_id, event));
706 Ok(event_id)
707 }
708
709 async fn flush(&self) -> Result<(), LogError> {
710 Ok(())
711 }
712
713 async fn read_range(
714 &self,
715 topic: &Topic,
716 from: Option<EventId>,
717 limit: usize,
718 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
719 let from = from.unwrap_or(0);
720 let state = self.state.lock().await;
721 let events = state
722 .topics
723 .get(topic.as_str())
724 .into_iter()
725 .flat_map(|events| events.iter())
726 .filter(|(event_id, _)| *event_id > from)
727 .take(limit)
728 .map(|(event_id, event)| (*event_id, event.clone()))
729 .collect();
730 Ok(events)
731 }
732
733 async fn subscribe(
734 self: Arc<Self>,
735 topic: &Topic,
736 from: Option<EventId>,
737 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
738 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
739 let history = self.read_range(topic, from, usize::MAX).await?;
740 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
741 }
742
743 async fn ack(
744 &self,
745 topic: &Topic,
746 consumer: &ConsumerId,
747 up_to: EventId,
748 ) -> Result<(), LogError> {
749 let mut state = self.state.lock().await;
750 state.consumers.insert(
751 (topic.as_str().to_string(), consumer.as_str().to_string()),
752 up_to,
753 );
754 Ok(())
755 }
756
757 async fn consumer_cursor(
758 &self,
759 topic: &Topic,
760 consumer: &ConsumerId,
761 ) -> Result<Option<EventId>, LogError> {
762 let state = self.state.lock().await;
763 Ok(state
764 .consumers
765 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
766 .copied())
767 }
768
769 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
770 let state = self.state.lock().await;
771 Ok(state.latest.get(topic.as_str()).copied())
772 }
773
774 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
775 let mut state = self.state.lock().await;
776 let Some(events) = state.topics.get_mut(topic.as_str()) else {
777 return Ok(CompactReport::default());
778 };
779 let removed = events
780 .iter()
781 .take_while(|(event_id, _)| *event_id <= before)
782 .count();
783 for _ in 0..removed {
784 events.pop_front();
785 }
786 Ok(CompactReport {
787 removed,
788 remaining: events.len(),
789 latest: state.latest.get(topic.as_str()).copied(),
790 checkpointed: false,
791 })
792 }
793}
794
795#[derive(Serialize, Deserialize)]
796struct FileRecord {
797 id: EventId,
798 event: LogEvent,
799}
800
801pub struct FileEventLog {
802 root: PathBuf,
803 latest_ids: Mutex<HashMap<String, EventId>>,
804 write_lock: Mutex<()>,
805 broadcasts: BroadcastMap,
806 queue_depth: usize,
807}
808
809impl FileEventLog {
810 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
811 std::fs::create_dir_all(root.join("topics"))
812 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
813 std::fs::create_dir_all(root.join("consumers"))
814 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
815 Ok(Self {
816 root,
817 latest_ids: Mutex::new(HashMap::new()),
818 write_lock: Mutex::new(()),
819 broadcasts: BroadcastMap::default(),
820 queue_depth: queue_depth.max(1),
821 })
822 }
823
824 fn topic_path(&self, topic: &Topic) -> PathBuf {
825 self.root
826 .join("topics")
827 .join(format!("{}.jsonl", topic.as_str()))
828 }
829
830 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
831 self.root.join("consumers").join(format!(
832 "{}__{}.json",
833 topic.as_str(),
834 sanitize_filename(consumer.as_str())
835 ))
836 }
837
838 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
839 if let Some(event_id) = self
840 .latest_ids
841 .lock()
842 .expect("file event log latest ids poisoned")
843 .get(topic.as_str())
844 .copied()
845 {
846 return Ok(event_id);
847 }
848
849 let mut latest = 0;
850 let path = self.topic_path(topic);
851 if path.is_file() {
852 for record in read_file_records(&path)? {
853 latest = record.id;
854 }
855 }
856 self.latest_ids
857 .lock()
858 .expect("file event log latest ids poisoned")
859 .insert(topic.as_str().to_string(), latest);
860 Ok(latest)
861 }
862
863 fn read_range_sync(
864 &self,
865 topic: &Topic,
866 from: Option<EventId>,
867 limit: usize,
868 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
869 let path = self.topic_path(topic);
870 if !path.is_file() {
871 return Ok(Vec::new());
872 }
873 let from = from.unwrap_or(0);
874 let mut events = Vec::new();
875 for record in read_file_records(&path)? {
876 if record.id > from {
877 events.push((record.id, record.event));
878 }
879 if events.len() >= limit {
880 break;
881 }
882 }
883 Ok(events)
884 }
885
886 fn topics(&self) -> Result<Vec<Topic>, LogError> {
887 let topics_dir = self.root.join("topics");
888 if !topics_dir.is_dir() {
889 return Ok(Vec::new());
890 }
891 let mut topics = Vec::new();
892 for entry in std::fs::read_dir(&topics_dir)
893 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
894 {
895 let entry = entry
896 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
897 let path = entry.path();
898 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
899 continue;
900 }
901 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
902 continue;
903 };
904 topics.push(Topic::new(stem.to_string())?);
905 }
906 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
907 Ok(topics)
908 }
909}
910
911fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
912 let file = std::fs::File::open(path)
913 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
914 let mut reader = std::io::BufReader::new(file);
915 let mut records = Vec::new();
916 let mut line = Vec::new();
917 loop {
918 line.clear();
919 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
920 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
921 if bytes_read == 0 {
922 break;
923 }
924 if line.iter().all(u8::is_ascii_whitespace) {
925 continue;
926 }
927 let complete_line = line.ends_with(b"\n");
928 match serde_json::from_slice::<FileRecord>(&line) {
929 Ok(record) => records.push(record),
930 Err(_) if !complete_line => break,
931 Err(error) => {
932 return Err(LogError::Serde(format!("event log parse error: {error}")));
933 }
934 }
935 }
936 Ok(records)
937}
938
939impl EventLog for FileEventLog {
940 fn describe(&self) -> EventLogDescription {
941 EventLogDescription {
942 backend: EventLogBackendKind::File,
943 location: Some(self.root.clone()),
944 size_bytes: Some(dir_size_bytes(&self.root)),
945 queue_depth: self.queue_depth,
946 }
947 }
948
949 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
950 let _guard = self
951 .write_lock
952 .lock()
953 .expect("file event log write lock poisoned");
954 let next_id = self.latest_id_for_topic(topic)? + 1;
955 let previous_hash = self
956 .read_range_sync(topic, None, usize::MAX)?
957 .last()
958 .map(|(previous_id, previous_event)| {
959 crate::provenance::event_record_hash_from_headers(
960 topic.as_str(),
961 *previous_id,
962 previous_event,
963 )
964 })
965 .transpose()?;
966 let event = crate::provenance::prepare_event_for_append(
967 topic.as_str(),
968 next_id,
969 previous_hash,
970 event,
971 )?;
972 let record = FileRecord {
973 id: next_id,
974 event: event.clone(),
975 };
976 let path = self.topic_path(topic);
977 if let Some(parent) = path.parent() {
978 std::fs::create_dir_all(parent)
979 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
980 }
981 let line = serde_json::to_string(&record)
982 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
983 use std::io::Write as _;
984 let mut file = std::fs::OpenOptions::new()
985 .create(true)
986 .append(true)
987 .open(&path)
988 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
989 writeln!(file, "{line}")
990 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
991 self.latest_ids
992 .lock()
993 .expect("file event log latest ids poisoned")
994 .insert(topic.as_str().to_string(), next_id);
995 self.broadcasts
996 .publish(topic, self.queue_depth, (next_id, event));
997 Ok(next_id)
998 }
999
1000 async fn flush(&self) -> Result<(), LogError> {
1001 sync_tree(&self.root)
1002 }
1003
1004 async fn read_range(
1005 &self,
1006 topic: &Topic,
1007 from: Option<EventId>,
1008 limit: usize,
1009 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1010 self.read_range_sync(topic, from, limit)
1011 }
1012
1013 async fn read_range_bytes(
1014 &self,
1015 topic: &Topic,
1016 from: Option<EventId>,
1017 limit: usize,
1018 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1019 self.read_range_sync(topic, from, limit)?
1020 .into_iter()
1021 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1022 .collect()
1023 }
1024
1025 async fn subscribe(
1026 self: Arc<Self>,
1027 topic: &Topic,
1028 from: Option<EventId>,
1029 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1030 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1031 let history = self.read_range_sync(topic, from, usize::MAX)?;
1032 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1033 }
1034
1035 async fn ack(
1036 &self,
1037 topic: &Topic,
1038 consumer: &ConsumerId,
1039 up_to: EventId,
1040 ) -> Result<(), LogError> {
1041 let path = self.consumer_path(topic, consumer);
1042 let payload = serde_json::json!({
1043 "topic": topic.as_str(),
1044 "consumer_id": consumer.as_str(),
1045 "cursor": up_to,
1046 "updated_at_ms": now_ms(),
1047 });
1048 write_json_atomically(&path, &payload)
1049 }
1050
1051 async fn consumer_cursor(
1052 &self,
1053 topic: &Topic,
1054 consumer: &ConsumerId,
1055 ) -> Result<Option<EventId>, LogError> {
1056 let path = self.consumer_path(topic, consumer);
1057 if !path.is_file() {
1058 return Ok(None);
1059 }
1060 let raw = std::fs::read_to_string(&path)
1061 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1062 let payload: serde_json::Value = serde_json::from_str(&raw)
1063 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1064 let cursor = payload
1065 .get("cursor")
1066 .and_then(serde_json::Value::as_u64)
1067 .ok_or_else(|| {
1068 LogError::Serde("event log consumer record missing numeric cursor".to_string())
1069 })?;
1070 Ok(Some(cursor))
1071 }
1072
1073 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1074 let latest = self.latest_id_for_topic(topic)?;
1075 if latest == 0 {
1076 Ok(None)
1077 } else {
1078 Ok(Some(latest))
1079 }
1080 }
1081
1082 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1083 let _guard = self
1084 .write_lock
1085 .lock()
1086 .expect("file event log write lock poisoned");
1087 let path = self.topic_path(topic);
1088 if !path.is_file() {
1089 return Ok(CompactReport::default());
1090 }
1091 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1092 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1093 if retained.is_empty() {
1094 let _ = std::fs::remove_file(&path);
1095 } else {
1096 crate::atomic_io::atomic_write_with(&path, |writer| {
1097 use std::io::Write as _;
1098 for (event_id, event) in &retained {
1099 let line = serde_json::to_string(&FileRecord {
1100 id: *event_id,
1101 event: event.clone(),
1102 })
1103 .map_err(|error| std::io::Error::other(error.to_string()))?;
1104 writeln!(writer, "{line}")?;
1105 }
1106 Ok(())
1107 })
1108 .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1109 }
1110 let latest = retained.last().map(|(event_id, _)| *event_id);
1111 self.latest_ids
1112 .lock()
1113 .expect("file event log latest ids poisoned")
1114 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1115 Ok(CompactReport {
1116 removed,
1117 remaining: retained.len(),
1118 latest,
1119 checkpointed: false,
1120 })
1121 }
1122}
1123
1124pub struct SqliteEventLog {
1125 path: PathBuf,
1126 connection: Mutex<Connection>,
1127 broadcasts: BroadcastMap,
1128 queue_depth: usize,
1129}
1130
1131impl SqliteEventLog {
1132 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1133 if let Some(parent) = path.parent() {
1134 std::fs::create_dir_all(parent)
1135 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1136 }
1137 let connection = Connection::open(&path)
1138 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1139 connection
1146 .busy_timeout(std::time::Duration::from_secs(5))
1147 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1148 connection
1149 .pragma_update(None, "journal_mode", "WAL")
1150 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1151 connection
1152 .pragma_update(None, "synchronous", "NORMAL")
1153 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1154 connection
1155 .execute_batch(
1156 "CREATE TABLE IF NOT EXISTS topic_heads (
1157 topic TEXT PRIMARY KEY,
1158 last_id INTEGER NOT NULL
1159 );
1160 CREATE TABLE IF NOT EXISTS events (
1161 topic TEXT NOT NULL,
1162 event_id INTEGER NOT NULL,
1163 kind TEXT NOT NULL,
1164 payload BLOB NOT NULL,
1165 headers TEXT NOT NULL,
1166 occurred_at_ms INTEGER NOT NULL,
1167 PRIMARY KEY (topic, event_id)
1168 );
1169 CREATE TABLE IF NOT EXISTS consumers (
1170 topic TEXT NOT NULL,
1171 consumer_id TEXT NOT NULL,
1172 cursor INTEGER NOT NULL,
1173 updated_at_ms INTEGER NOT NULL,
1174 PRIMARY KEY (topic, consumer_id)
1175 );",
1176 )
1177 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1178 Ok(Self {
1179 path,
1180 connection: Mutex::new(connection),
1181 broadcasts: BroadcastMap::default(),
1182 queue_depth: queue_depth.max(1),
1183 })
1184 }
1185
1186 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1187 let connection = self
1188 .connection
1189 .lock()
1190 .expect("sqlite event log connection poisoned");
1191 let mut statement = connection
1192 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1193 .map_err(|error| {
1194 LogError::Sqlite(format!("event log topics prepare error: {error}"))
1195 })?;
1196 let rows = statement
1197 .query_map([], |row| row.get::<_, String>(0))
1198 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1199 let mut topics = Vec::new();
1200 for row in rows {
1201 topics.push(Topic::new(row.map_err(|error| {
1202 LogError::Sqlite(format!("event log topic row error: {error}"))
1203 })?)?);
1204 }
1205 Ok(topics)
1206 }
1207}
1208
1209impl EventLog for SqliteEventLog {
1210 fn describe(&self) -> EventLogDescription {
1211 EventLogDescription {
1212 backend: EventLogBackendKind::Sqlite,
1213 location: Some(self.path.clone()),
1214 size_bytes: Some(sqlite_size_bytes(&self.path)),
1215 queue_depth: self.queue_depth,
1216 }
1217 }
1218
1219 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1220 let mut connection = self
1221 .connection
1222 .lock()
1223 .expect("sqlite event log connection poisoned");
1224 let tx = connection
1225 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1226 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1227 tx.execute(
1228 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1229 params![topic.as_str()],
1230 )
1231 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1232 tx.execute(
1233 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1234 params![topic.as_str()],
1235 )
1236 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1237 let event_id = tx
1238 .query_row(
1239 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1240 params![topic.as_str()],
1241 |row| row.get::<_, i64>(0),
1242 )
1243 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1244 .and_then(sqlite_i64_to_event_id)?;
1245 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1246 let previous = tx
1247 .query_row(
1248 "SELECT event_id, kind, payload, headers, occurred_at_ms
1249 FROM events
1250 WHERE topic = ?1 AND event_id < ?2
1251 ORDER BY event_id DESC
1252 LIMIT 1",
1253 params![topic.as_str(), event_id_sql],
1254 |row| {
1255 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1256 let headers: String = row.get(3)?;
1257 Ok((
1258 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1259 LogEvent {
1260 kind: row.get(1)?,
1261 payload: serde_json::from_slice(&payload).map_err(|error| {
1262 rusqlite::Error::FromSqlConversionFailure(
1263 payload.len(),
1264 rusqlite::types::Type::Blob,
1265 Box::new(error),
1266 )
1267 })?,
1268 headers: serde_json::from_str(&headers).map_err(|error| {
1269 rusqlite::Error::FromSqlConversionFailure(
1270 headers.len(),
1271 rusqlite::types::Type::Text,
1272 Box::new(error),
1273 )
1274 })?,
1275 occurred_at_ms: row.get(4)?,
1276 },
1277 ))
1278 },
1279 )
1280 .optional()
1281 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1282 let previous_hash = previous
1283 .as_ref()
1284 .map(|(previous_id, previous_event)| {
1285 crate::provenance::event_record_hash_from_headers(
1286 topic.as_str(),
1287 *previous_id,
1288 previous_event,
1289 )
1290 })
1291 .transpose()?;
1292 let event = crate::provenance::prepare_event_for_append(
1293 topic.as_str(),
1294 event_id,
1295 previous_hash,
1296 event,
1297 )?;
1298 tx.execute(
1299 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1300 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1301 params![
1302 topic.as_str(),
1303 event_id_sql,
1304 event.kind,
1305 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1306 "event log payload encode error: {error}"
1307 )))?,
1308 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1309 "event log headers encode error: {error}"
1310 )))?,
1311 event.occurred_at_ms
1312 ],
1313 )
1314 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1315 tx.commit()
1316 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1317 self.broadcasts
1318 .publish(topic, self.queue_depth, (event_id, event.clone()));
1319 Ok(event_id)
1320 }
1321
1322 async fn flush(&self) -> Result<(), LogError> {
1323 let connection = self
1324 .connection
1325 .lock()
1326 .expect("sqlite event log connection poisoned");
1327 connection
1328 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1329 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1330 Ok(())
1331 }
1332
1333 async fn read_range(
1334 &self,
1335 topic: &Topic,
1336 from: Option<EventId>,
1337 limit: usize,
1338 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1339 let connection = self
1340 .connection
1341 .lock()
1342 .expect("sqlite event log connection poisoned");
1343 let mut statement = connection
1344 .prepare(
1345 "SELECT event_id, kind, payload, headers, occurred_at_ms
1346 FROM events
1347 WHERE topic = ?1 AND event_id > ?2
1348 ORDER BY event_id ASC
1349 LIMIT ?3",
1350 )
1351 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1352 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1353 let rows = statement
1354 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1355 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1356 let headers: String = row.get(3)?;
1357 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1358 Ok((
1359 event_id,
1360 LogEvent {
1361 kind: row.get(1)?,
1362 payload: serde_json::from_slice(&payload).map_err(|error| {
1363 rusqlite::Error::FromSqlConversionFailure(
1364 payload.len(),
1365 rusqlite::types::Type::Blob,
1366 Box::new(error),
1367 )
1368 })?,
1369 headers: serde_json::from_str(&headers).map_err(|error| {
1370 rusqlite::Error::FromSqlConversionFailure(
1371 headers.len(),
1372 rusqlite::types::Type::Text,
1373 Box::new(error),
1374 )
1375 })?,
1376 occurred_at_ms: row.get(4)?,
1377 },
1378 ))
1379 })
1380 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1381 let mut events = Vec::new();
1382 for row in rows {
1383 events.push(
1384 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1385 );
1386 }
1387 Ok(events)
1388 }
1389
1390 async fn read_range_bytes(
1391 &self,
1392 topic: &Topic,
1393 from: Option<EventId>,
1394 limit: usize,
1395 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1396 let connection = self
1397 .connection
1398 .lock()
1399 .expect("sqlite event log connection poisoned");
1400 let mut statement = connection
1401 .prepare(
1402 "SELECT event_id, kind, payload, headers, occurred_at_ms
1403 FROM events
1404 WHERE topic = ?1 AND event_id > ?2
1405 ORDER BY event_id ASC
1406 LIMIT ?3",
1407 )
1408 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1409 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1410 let rows = statement
1411 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1412 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1413 let headers: String = row.get(3)?;
1414 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1415 Ok((
1416 event_id,
1417 LogEventBytes {
1418 kind: row.get(1)?,
1419 payload: Bytes::from(payload),
1420 headers: serde_json::from_str(&headers).map_err(|error| {
1421 rusqlite::Error::FromSqlConversionFailure(
1422 headers.len(),
1423 rusqlite::types::Type::Text,
1424 Box::new(error),
1425 )
1426 })?,
1427 occurred_at_ms: row.get(4)?,
1428 },
1429 ))
1430 })
1431 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1432 let mut events = Vec::new();
1433 for row in rows {
1434 events.push(
1435 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1436 );
1437 }
1438 Ok(events)
1439 }
1440
1441 async fn subscribe(
1442 self: Arc<Self>,
1443 topic: &Topic,
1444 from: Option<EventId>,
1445 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1446 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1447 let history = self.read_range(topic, from, usize::MAX).await?;
1448 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1449 }
1450
1451 async fn ack(
1452 &self,
1453 topic: &Topic,
1454 consumer: &ConsumerId,
1455 up_to: EventId,
1456 ) -> Result<(), LogError> {
1457 let connection = self
1458 .connection
1459 .lock()
1460 .expect("sqlite event log connection poisoned");
1461 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1462 connection
1463 .execute(
1464 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1465 VALUES (?1, ?2, ?3, ?4)
1466 ON CONFLICT(topic, consumer_id)
1467 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1468 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1469 )
1470 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1471 Ok(())
1472 }
1473
1474 async fn consumer_cursor(
1475 &self,
1476 topic: &Topic,
1477 consumer: &ConsumerId,
1478 ) -> Result<Option<EventId>, LogError> {
1479 let connection = self
1480 .connection
1481 .lock()
1482 .expect("sqlite event log connection poisoned");
1483 connection
1484 .query_row(
1485 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1486 params![topic.as_str(), consumer.as_str()],
1487 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1488 )
1489 .optional()
1490 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1491 }
1492
1493 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1494 let connection = self
1495 .connection
1496 .lock()
1497 .expect("sqlite event log connection poisoned");
1498 connection
1499 .query_row(
1500 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1501 params![topic.as_str()],
1502 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1503 )
1504 .optional()
1505 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1506 }
1507
1508 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1509 let connection = self
1510 .connection
1511 .lock()
1512 .expect("sqlite event log connection poisoned");
1513 let before_sql = event_id_to_sqlite_i64(before)?;
1514 let removed = connection
1515 .execute(
1516 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1517 params![topic.as_str(), before_sql],
1518 )
1519 .map_err(|error| {
1520 LogError::Sqlite(format!("event log compact delete error: {error}"))
1521 })?;
1522 let remaining = connection
1523 .query_row(
1524 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1525 params![topic.as_str()],
1526 |row| row.get::<_, i64>(0),
1527 )
1528 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1529 .and_then(sqlite_i64_to_usize)?;
1530 let latest = connection
1531 .query_row(
1532 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1533 params![topic.as_str()],
1534 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1535 )
1536 .optional()
1537 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1538 connection
1539 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1540 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1541 Ok(CompactReport {
1542 removed,
1543 remaining,
1544 latest,
1545 checkpointed: true,
1546 })
1547 }
1548}
1549
1550fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1551 let candidate = PathBuf::from(value);
1552 if candidate.is_absolute() {
1553 candidate
1554 } else {
1555 base_dir.join(candidate)
1556 }
1557}
1558
1559fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1560 let encoded = serde_json::to_vec_pretty(payload)
1561 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1562 crate::atomic_io::atomic_write(path, &encoded)
1563 .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1564}
1565
1566fn sanitize_filename(value: &str) -> String {
1567 sanitize_topic_component(value)
1568}
1569
1570pub fn sanitize_topic_component(value: &str) -> String {
1571 value
1572 .chars()
1573 .map(|ch| {
1574 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1575 ch
1576 } else {
1577 '_'
1578 }
1579 })
1580 .collect()
1581}
1582
1583fn dir_size_bytes(path: &Path) -> u64 {
1584 if !path.exists() {
1585 return 0;
1586 }
1587 let mut total = 0;
1588 if let Ok(entries) = std::fs::read_dir(path) {
1589 for entry in entries.flatten() {
1590 let path = entry.path();
1591 if path.is_dir() {
1592 total += dir_size_bytes(&path);
1593 } else if let Ok(metadata) = entry.metadata() {
1594 total += metadata.len();
1595 }
1596 }
1597 }
1598 total
1599}
1600
1601fn sqlite_size_bytes(path: &Path) -> u64 {
1602 let mut total = file_size(path);
1603 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1604 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1605 total
1606}
1607
1608fn file_size(path: &Path) -> u64 {
1609 std::fs::metadata(path)
1610 .map(|metadata| metadata.len())
1611 .unwrap_or(0)
1612}
1613
1614fn sync_tree(root: &Path) -> Result<(), LogError> {
1615 if !root.exists() {
1616 return Ok(());
1617 }
1618 for entry in std::fs::read_dir(root)
1619 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1620 {
1621 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1622 let path = entry.path();
1623 if path.is_dir() {
1624 sync_tree(&path)?;
1625 continue;
1626 }
1627 std::fs::File::open(&path)
1628 .and_then(|file| file.sync_all())
1629 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1630 }
1631 Ok(())
1632}
1633
1634fn now_ms() -> i64 {
1635 std::time::SystemTime::now()
1636 .duration_since(std::time::UNIX_EPOCH)
1637 .map(|duration| duration.as_millis() as i64)
1638 .unwrap_or(0)
1639}
1640
1641fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1642 i64::try_from(event_id)
1643 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1644}
1645
1646fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1647 u64::try_from(value)
1648 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1649}
1650
1651fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1652 u64::try_from(value).map_err(|_| {
1653 rusqlite::Error::FromSqlConversionFailure(
1654 std::mem::size_of::<i64>(),
1655 rusqlite::types::Type::Integer,
1656 "sqlite event id is negative".into(),
1657 )
1658 })
1659}
1660
1661fn sqlite_json_bytes_for_row(
1662 row: &rusqlite::Row<'_>,
1663 index: usize,
1664 name: &str,
1665) -> rusqlite::Result<Vec<u8>> {
1666 let value = row.get_ref(index)?;
1667 match value {
1668 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1669 Ok(bytes.to_vec())
1670 }
1671 other => Err(rusqlite::Error::InvalidColumnType(
1672 index,
1673 name.to_string(),
1674 other.data_type(),
1675 )),
1676 }
1677}
1678
1679fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1680 usize::try_from(value)
1681 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686 use super::*;
1687 use futures::StreamExt;
1688 use rand::{rngs::StdRng, RngExt, SeedableRng};
1689
1690 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1691 let topic = Topic::new("trigger.inbox").unwrap();
1692 for i in 0..10_000 {
1693 log.append(
1694 &topic,
1695 LogEvent::new("append", serde_json::json!({ "i": i })),
1696 )
1697 .await
1698 .unwrap();
1699 }
1700 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1701 assert_eq!(events.len(), 10_000);
1702 assert_eq!(events.first().unwrap().0, 1);
1703 assert_eq!(events.last().unwrap().0, 10_000);
1704 }
1705
1706 #[tokio::test(flavor = "current_thread")]
1707 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1708 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1709 exercise_basic_backend(log.clone()).await;
1710
1711 let topic = Topic::new("agent.transcript.demo").unwrap();
1712 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1713 let first = log
1714 .append(
1715 &topic,
1716 LogEvent::new("message", serde_json::json!({"text":"one"})),
1717 )
1718 .await
1719 .unwrap();
1720 let second = log
1721 .append(
1722 &topic,
1723 LogEvent::new("message", serde_json::json!({"text":"two"})),
1724 )
1725 .await
1726 .unwrap();
1727 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1728 assert_eq!(seen[0].as_ref().unwrap().0, first);
1729 assert_eq!(seen[1].as_ref().unwrap().0, second);
1730
1731 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1732 .await
1733 .unwrap();
1734 let compact = log.compact(&topic, first).await.unwrap();
1735 assert_eq!(compact.removed, 1);
1736 assert_eq!(compact.remaining, 1);
1737 }
1738
1739 #[tokio::test(flavor = "current_thread")]
1740 async fn file_backend_persists_across_reopen_and_compacts() {
1741 let dir = tempfile::tempdir().unwrap();
1742 let topic = Topic::new("trigger.outbox").unwrap();
1743 let first_log = Arc::new(AnyEventLog::File(
1744 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1745 ));
1746 first_log
1747 .append(
1748 &topic,
1749 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1750 )
1751 .await
1752 .unwrap();
1753 first_log
1754 .append(
1755 &topic,
1756 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1757 )
1758 .await
1759 .unwrap();
1760 drop(first_log);
1761
1762 let reopened = Arc::new(AnyEventLog::File(
1763 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1764 ));
1765 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1766 assert_eq!(events.len(), 2);
1767 let compact = reopened.compact(&topic, 1).await.unwrap();
1768 assert_eq!(compact.removed, 1);
1769 assert_eq!(
1770 reopened
1771 .read_range(&topic, None, usize::MAX)
1772 .await
1773 .unwrap()
1774 .len(),
1775 1
1776 );
1777 }
1778
1779 #[tokio::test(flavor = "current_thread")]
1780 async fn file_backend_skips_torn_tail_on_restart() {
1781 let dir = tempfile::tempdir().unwrap();
1782 let topic = Topic::new("trigger.inbox").unwrap();
1783 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1784 first_log
1785 .append(
1786 &topic,
1787 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1788 )
1789 .await
1790 .unwrap();
1791 drop(first_log);
1792
1793 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1794 use std::io::Write as _;
1795 let mut file = std::fs::OpenOptions::new()
1796 .append(true)
1797 .open(&topic_path)
1798 .unwrap();
1799 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1800 drop(file);
1801
1802 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1803 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1804 assert_eq!(events.len(), 1);
1805 assert_eq!(events[0].0, 1);
1806 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1807 }
1808
1809 #[tokio::test(flavor = "current_thread")]
1810 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1811 let dir = tempfile::tempdir().unwrap();
1812 let path = dir.path().join("events.sqlite");
1813 let topic = Topic::new("daemon.demo.state").unwrap();
1814 let first_log = Arc::new(AnyEventLog::Sqlite(
1815 SqliteEventLog::open(path.clone(), 8).unwrap(),
1816 ));
1817 first_log
1818 .append(
1819 &topic,
1820 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1821 )
1822 .await
1823 .unwrap();
1824 first_log
1825 .append(
1826 &topic,
1827 LogEvent::new("state", serde_json::json!({"state":"active"})),
1828 )
1829 .await
1830 .unwrap();
1831 drop(first_log);
1832
1833 let reopened = Arc::new(AnyEventLog::Sqlite(
1834 SqliteEventLog::open(path.clone(), 8).unwrap(),
1835 ));
1836 assert_eq!(
1837 reopened
1838 .read_range(&topic, None, usize::MAX)
1839 .await
1840 .unwrap()
1841 .len(),
1842 2
1843 );
1844 let compact = reopened.compact(&topic, 1).await.unwrap();
1845 assert!(compact.checkpointed);
1846 let wal = PathBuf::from(format!("{}-wal", path.display()));
1847 assert!(file_size(&wal) == 0 || !wal.exists());
1848 }
1849
1850 #[tokio::test(flavor = "current_thread")]
1851 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1852 let dir = tempfile::tempdir().unwrap();
1853 let path = dir.path().join("events.sqlite");
1854 let topic = Topic::new("observability.action_graph").unwrap();
1855 let log = SqliteEventLog::open(path, 8).unwrap();
1856 let event_id = log
1857 .append(
1858 &topic,
1859 LogEvent::new(
1860 "snapshot",
1861 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1862 ),
1863 )
1864 .await
1865 .unwrap();
1866
1867 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1868 assert_eq!(events.len(), 1);
1869 assert_eq!(events[0].0, event_id);
1870 assert_eq!(
1871 events[0].1.payload_json().unwrap(),
1872 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1873 );
1874 }
1875
1876 #[tokio::test(flavor = "current_thread")]
1877 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1878 let dir = tempfile::tempdir().unwrap();
1879 let path = dir.path().join("events.sqlite");
1880 let topic = Topic::new("agent.transcript.legacy").unwrap();
1881 let log = SqliteEventLog::open(path, 8).unwrap();
1882 {
1883 let connection = log.connection.lock().unwrap();
1884 connection
1885 .execute(
1886 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1887 params![topic.as_str()],
1888 )
1889 .unwrap();
1890 connection
1891 .execute(
1892 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1893 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1894 params![topic.as_str(), "{\"text\":\"old\"}"],
1895 )
1896 .unwrap();
1897 }
1898
1899 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1900 assert_eq!(
1901 events[0].1.payload_json().unwrap(),
1902 serde_json::json!({"text": "old"})
1903 );
1904 assert_eq!(
1905 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1906 "legacy"
1907 );
1908 }
1909
1910 #[tokio::test(flavor = "current_thread")]
1911 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1912 let (sender, rx) = broadcast::channel(2);
1913 for i in 0..10 {
1914 sender
1915 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1916 .unwrap();
1917 }
1918 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1919
1920 match stream.next().await {
1921 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1922 other => panic!("subscriber should surface lag, got {other:?}"),
1923 }
1924 }
1925
1926 #[tokio::test(flavor = "current_thread")]
1927 async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1928 let (sender, rx) = broadcast::channel(2);
1929 let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1930 assert_eq!(sender.receiver_count(), 1);
1931 drop(stream);
1932
1933 tokio::time::timeout(std::time::Duration::from_millis(100), async {
1934 while sender.receiver_count() != 0 {
1935 tokio::task::yield_now().await;
1936 }
1937 })
1938 .await
1939 .expect("subscription receiver should close after consumer drop");
1940 }
1941
1942 #[tokio::test(flavor = "current_thread")]
1943 async fn randomized_reader_sequences_stay_monotonic() {
1944 let log = Arc::new(MemoryEventLog::new(32));
1945 let topic = Topic::new("fuzz.demo").unwrap();
1946 let mut readers = vec![
1947 log.clone().subscribe(&topic, None).await.unwrap(),
1948 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1949 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1950 ];
1951 let mut rng = StdRng::seed_from_u64(7);
1952 for _ in 0..64 {
1953 let value = rng.random_range(0..1000);
1954 log.append(
1955 &topic,
1956 LogEvent::new("rand", serde_json::json!({"value": value})),
1957 )
1958 .await
1959 .unwrap();
1960 }
1961
1962 let mut sequences = Vec::new();
1963 for reader in &mut readers {
1964 let mut ids = Vec::new();
1965 while let Some(item) = reader.next().await {
1966 match item {
1967 Ok((event_id, _)) => {
1968 ids.push(event_id);
1969 if ids.len() >= 16 {
1970 break;
1971 }
1972 }
1973 Err(LogError::ConsumerLagged(_)) => break,
1974 Err(error) => panic!("unexpected subscription error: {error}"),
1975 }
1976 }
1977 sequences.push(ids);
1978 }
1979
1980 for ids in sequences {
1981 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1982 }
1983 }
1984}