1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27 let value = value.into();
28 if value.is_empty() {
29 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30 }
31 if !value
32 .chars()
33 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34 {
35 return Err(LogError::InvalidTopic(format!(
36 "topic '{value}' contains unsupported characters"
37 )));
38 }
39 Ok(Self(value))
40 }
41
42 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl fmt::Display for Topic {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 self.0.fmt(f)
50 }
51}
52
53impl FromStr for Topic {
54 type Err = LogError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 Self::new(s)
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66 let value = value.into();
67 if value.trim().is_empty() {
68 return Err(LogError::InvalidConsumer(
69 "consumer id cannot be empty".to_string(),
70 ));
71 }
72 Ok(Self(value))
73 }
74
75 pub fn as_str(&self) -> &str {
76 &self.0
77 }
78}
79
80impl fmt::Display for ConsumerId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 self.0.fmt(f)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89 Memory,
90 File,
91 Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::Memory => write!(f, "memory"),
98 Self::File => write!(f, "file"),
99 Self::Sqlite => write!(f, "sqlite"),
100 }
101 }
102}
103
104impl FromStr for EventLogBackendKind {
105 type Err = LogError;
106
107 fn from_str(value: &str) -> Result<Self, Self::Err> {
108 match value.trim().to_ascii_lowercase().as_str() {
109 "memory" => Ok(Self::Memory),
110 "file" => Ok(Self::File),
111 "sqlite" => Ok(Self::Sqlite),
112 other => Err(LogError::Config(format!(
113 "unsupported event log backend '{other}'"
114 ))),
115 }
116 }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121 pub kind: String,
122 pub payload: serde_json::Value,
123 #[serde(default)]
124 pub headers: BTreeMap<String, String>,
125 pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130 Self {
131 kind: kind.into(),
132 payload,
133 headers: BTreeMap::new(),
134 occurred_at_ms: now_ms(),
135 }
136 }
137
138 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139 self.headers = headers;
140 self
141 }
142}
143
144#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152 pub kind: String,
153 pub payload: Bytes,
154 pub headers: BTreeMap<String, String>,
155 pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160 serde_json::from_slice(&self.payload)
161 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162 }
163
164 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165 Ok(LogEvent {
166 kind: self.kind,
167 payload: serde_json::from_slice(&self.payload).map_err(|error| {
168 LogError::Serde(format!("event log payload parse error: {error}"))
169 })?,
170 headers: self.headers,
171 occurred_at_ms: self.occurred_at_ms,
172 })
173 }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177 type Error = LogError;
178
179 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180 let payload = serde_json::to_vec(&event.payload)
181 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182 Ok(Self {
183 kind: event.kind,
184 payload: Bytes::from(payload),
185 headers: event.headers,
186 occurred_at_ms: event.occurred_at_ms,
187 })
188 }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193 pub removed: usize,
194 pub remaining: usize,
195 pub latest: Option<EventId>,
196 pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201 pub backend: EventLogBackendKind,
202 pub location: Option<PathBuf>,
203 pub size_bytes: Option<u64>,
204 pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209 Config(String),
210 InvalidTopic(String),
211 InvalidConsumer(String),
212 Io(String),
213 Serde(String),
214 Sqlite(String),
215 ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 match self {
221 Self::Config(message)
222 | Self::InvalidTopic(message)
223 | Self::InvalidConsumer(message)
224 | Self::Io(message)
225 | Self::Serde(message)
226 | Self::Sqlite(message) => message.fmt(f),
227 Self::ConsumerLagged(last_id) => {
228 write!(f, "subscriber lagged behind after event {last_id}")
229 }
230 }
231 }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238 fn describe(&self) -> EventLogDescription;
239
240 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242 async fn flush(&self) -> Result<(), LogError>;
243
244 async fn read_range(
247 &self,
248 topic: &Topic,
249 from: Option<EventId>,
250 limit: usize,
251 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253 async fn read_range_bytes(
254 &self,
255 topic: &Topic,
256 from: Option<EventId>,
257 limit: usize,
258 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259 let events = self.read_range(topic, from, limit).await?;
260 events
261 .into_iter()
262 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263 .collect()
264 }
265
266 async fn subscribe(
269 self: Arc<Self>,
270 topic: &Topic,
271 from: Option<EventId>,
272 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274 async fn ack(
275 &self,
276 topic: &Topic,
277 consumer: &ConsumerId,
278 up_to: EventId,
279 ) -> Result<(), LogError>;
280
281 async fn consumer_cursor(
282 &self,
283 topic: &Topic,
284 consumer: &ConsumerId,
285 ) -> Result<Option<EventId>, LogError>;
286
287 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294 pub backend: EventLogBackendKind,
295 pub file_dir: PathBuf,
296 pub sqlite_path: PathBuf,
297 pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303 .ok()
304 .map(|value| value.parse())
305 .transpose()?
306 .unwrap_or(EventLogBackendKind::Sqlite);
307 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308 .ok()
309 .and_then(|value| value.parse::<usize>().ok())
310 .unwrap_or(128)
311 .max(1);
312
313 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315 _ => crate::runtime_paths::event_log_dir(base_dir),
316 };
317 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320 };
321
322 Ok(Self {
323 backend,
324 file_dir,
325 sqlite_path,
326 queue_depth,
327 })
328 }
329
330 pub fn location(&self) -> Option<PathBuf> {
331 match self.backend {
332 EventLogBackendKind::Memory => None,
333 EventLogBackendKind::File => Some(self.file_dir.clone()),
334 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335 }
336 }
337}
338
339thread_local! {
340 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344 let config = EventLogConfig::for_base_dir(base_dir)?;
345 let log = open_event_log(&config)?;
346 ACTIVE_EVENT_LOG.with(|slot| {
347 *slot.borrow_mut() = Some(log.clone());
348 });
349 Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354 ACTIVE_EVENT_LOG.with(|slot| {
355 *slot.borrow_mut() = Some(log.clone());
356 });
357 log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361 ACTIVE_EVENT_LOG.with(|slot| {
362 *slot.borrow_mut() = Some(log.clone());
363 });
364 log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372 ACTIVE_EVENT_LOG.with(|slot| {
373 *slot.borrow_mut() = None;
374 });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378 let config = EventLogConfig::for_base_dir(base_dir)?;
379 let description = match config.backend {
380 EventLogBackendKind::Memory => EventLogDescription {
381 backend: EventLogBackendKind::Memory,
382 location: None,
383 size_bytes: None,
384 queue_depth: config.queue_depth,
385 },
386 EventLogBackendKind::File => EventLogDescription {
387 backend: EventLogBackendKind::File,
388 size_bytes: Some(dir_size_bytes(&config.file_dir)),
389 location: Some(config.file_dir),
390 queue_depth: config.queue_depth,
391 },
392 EventLogBackendKind::Sqlite => EventLogDescription {
393 backend: EventLogBackendKind::Sqlite,
394 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395 location: Some(config.sqlite_path),
396 queue_depth: config.queue_depth,
397 },
398 };
399 Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403 match config.backend {
404 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405 config.queue_depth,
406 )))),
407 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408 config.file_dir.clone(),
409 config.queue_depth,
410 )?))),
411 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412 config.sqlite_path.clone(),
413 config.queue_depth,
414 )?))),
415 }
416}
417
418pub enum AnyEventLog {
419 Memory(MemoryEventLog),
420 File(FileEventLog),
421 Sqlite(SqliteEventLog),
422}
423
424impl AnyEventLog {
425 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
426 match self {
427 Self::Memory(log) => log.topics().await,
428 Self::File(log) => log.topics(),
429 Self::Sqlite(log) => log.topics(),
430 }
431 }
432}
433
434impl EventLog for AnyEventLog {
435 fn describe(&self) -> EventLogDescription {
436 match self {
437 Self::Memory(log) => log.describe(),
438 Self::File(log) => log.describe(),
439 Self::Sqlite(log) => log.describe(),
440 }
441 }
442
443 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
444 match self {
445 Self::Memory(log) => log.append(topic, event).await,
446 Self::File(log) => log.append(topic, event).await,
447 Self::Sqlite(log) => log.append(topic, event).await,
448 }
449 }
450
451 async fn flush(&self) -> Result<(), LogError> {
452 match self {
453 Self::Memory(log) => log.flush().await,
454 Self::File(log) => log.flush().await,
455 Self::Sqlite(log) => log.flush().await,
456 }
457 }
458
459 async fn read_range(
460 &self,
461 topic: &Topic,
462 from: Option<EventId>,
463 limit: usize,
464 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
465 match self {
466 Self::Memory(log) => log.read_range(topic, from, limit).await,
467 Self::File(log) => log.read_range(topic, from, limit).await,
468 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
469 }
470 }
471
472 async fn read_range_bytes(
473 &self,
474 topic: &Topic,
475 from: Option<EventId>,
476 limit: usize,
477 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
478 match self {
479 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
480 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
481 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
482 }
483 }
484
485 async fn subscribe(
486 self: Arc<Self>,
487 topic: &Topic,
488 from: Option<EventId>,
489 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
490 let (rx, queue_depth) = match self.as_ref() {
491 Self::Memory(log) => (
492 log.broadcasts.subscribe(topic, log.queue_depth),
493 log.queue_depth,
494 ),
495 Self::File(log) => (
496 log.broadcasts.subscribe(topic, log.queue_depth),
497 log.queue_depth,
498 ),
499 Self::Sqlite(log) => (
500 log.broadcasts.subscribe(topic, log.queue_depth),
501 log.queue_depth,
502 ),
503 };
504 let history = self.read_range(topic, from, usize::MAX).await?;
505 Ok(stream_from_broadcast(history, from, rx, queue_depth))
506 }
507
508 async fn ack(
509 &self,
510 topic: &Topic,
511 consumer: &ConsumerId,
512 up_to: EventId,
513 ) -> Result<(), LogError> {
514 match self {
515 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
516 Self::File(log) => log.ack(topic, consumer, up_to).await,
517 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
518 }
519 }
520
521 async fn consumer_cursor(
522 &self,
523 topic: &Topic,
524 consumer: &ConsumerId,
525 ) -> Result<Option<EventId>, LogError> {
526 match self {
527 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
528 Self::File(log) => log.consumer_cursor(topic, consumer).await,
529 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
530 }
531 }
532
533 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
534 match self {
535 Self::Memory(log) => log.latest(topic).await,
536 Self::File(log) => log.latest(topic).await,
537 Self::Sqlite(log) => log.latest(topic).await,
538 }
539 }
540
541 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
542 match self {
543 Self::Memory(log) => log.compact(topic, before).await,
544 Self::File(log) => log.compact(topic, before).await,
545 Self::Sqlite(log) => log.compact(topic, before).await,
546 }
547 }
548}
549
550#[derive(Default)]
551struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
552
553impl BroadcastMap {
554 fn subscribe(
555 &self,
556 topic: &Topic,
557 capacity: usize,
558 ) -> broadcast::Receiver<(EventId, LogEvent)> {
559 self.sender(topic, capacity).subscribe()
560 }
561
562 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
563 let _ = self.sender(topic, capacity).send(record);
564 }
565
566 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
567 let mut map = self.0.lock().expect("event log broadcast map poisoned");
568 map.entry(topic.as_str().to_string())
569 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
570 .clone()
571 }
572}
573
574fn stream_from_broadcast(
575 history: Vec<(EventId, LogEvent)>,
576 from: Option<EventId>,
577 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
578 queue_depth: usize,
579) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
580 let (tx, rx) = mpsc::channel(queue_depth.max(1));
581 tokio::spawn(async move {
589 let mut last_seen = from.unwrap_or(0);
590 for (event_id, event) in history {
591 last_seen = event_id;
592 if tx.send(Ok((event_id, event))).await.is_err() {
593 return;
594 }
595 }
596
597 loop {
598 tokio::select! {
599 _ = tx.closed() => return,
600 received = live_rx.recv() => {
601 match received {
602 Ok((event_id, event)) if event_id > last_seen => {
603 last_seen = event_id;
604 if tx.send(Ok((event_id, event))).await.is_err() {
605 return;
606 }
607 }
608 Ok(_) => {}
609 Err(broadcast::error::RecvError::Closed) => return,
610 Err(broadcast::error::RecvError::Lagged(_)) => {
611 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
612 return;
613 }
614 }
615 }
616 }
617 }
618 });
619 Box::pin(ReceiverStream::new(rx))
620}
621
622#[derive(Default)]
623struct MemoryState {
624 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
625 latest: HashMap<String, EventId>,
626 consumers: HashMap<(String, String), EventId>,
627}
628
629pub struct MemoryEventLog {
630 state: tokio::sync::Mutex<MemoryState>,
631 broadcasts: BroadcastMap,
632 queue_depth: usize,
633}
634
635impl MemoryEventLog {
636 pub fn new(queue_depth: usize) -> Self {
637 Self {
638 state: tokio::sync::Mutex::new(MemoryState::default()),
639 broadcasts: BroadcastMap::default(),
640 queue_depth: queue_depth.max(1),
641 }
642 }
643
644 async fn topics(&self) -> Result<Vec<Topic>, LogError> {
645 let state = self.state.lock().await;
646 let mut topics = state
647 .topics
648 .keys()
649 .map(|topic| Topic::new(topic.clone()))
650 .collect::<Result<Vec<_>, _>>()?;
651 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
652 Ok(topics)
653 }
654}
655
656impl EventLog for MemoryEventLog {
657 fn describe(&self) -> EventLogDescription {
658 EventLogDescription {
659 backend: EventLogBackendKind::Memory,
660 location: None,
661 size_bytes: None,
662 queue_depth: self.queue_depth,
663 }
664 }
665
666 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
667 let mut state = self.state.lock().await;
668 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
669 let previous_hash = state
670 .topics
671 .get(topic.as_str())
672 .and_then(|events| events.back())
673 .map(|(previous_id, previous_event)| {
674 crate::provenance::event_record_hash_from_headers(
675 topic.as_str(),
676 *previous_id,
677 previous_event,
678 )
679 })
680 .transpose()?;
681 let event = crate::provenance::prepare_event_for_append(
682 topic.as_str(),
683 event_id,
684 previous_hash,
685 event,
686 )?;
687 state.latest.insert(topic.as_str().to_string(), event_id);
688 state
689 .topics
690 .entry(topic.as_str().to_string())
691 .or_default()
692 .push_back((event_id, event.clone()));
693 drop(state);
694 self.broadcasts
695 .publish(topic, self.queue_depth, (event_id, event));
696 Ok(event_id)
697 }
698
699 async fn flush(&self) -> Result<(), LogError> {
700 Ok(())
701 }
702
703 async fn read_range(
704 &self,
705 topic: &Topic,
706 from: Option<EventId>,
707 limit: usize,
708 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
709 let from = from.unwrap_or(0);
710 let state = self.state.lock().await;
711 let events = state
712 .topics
713 .get(topic.as_str())
714 .into_iter()
715 .flat_map(|events| events.iter())
716 .filter(|(event_id, _)| *event_id > from)
717 .take(limit)
718 .map(|(event_id, event)| (*event_id, event.clone()))
719 .collect();
720 Ok(events)
721 }
722
723 async fn subscribe(
724 self: Arc<Self>,
725 topic: &Topic,
726 from: Option<EventId>,
727 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
728 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
729 let history = self.read_range(topic, from, usize::MAX).await?;
730 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
731 }
732
733 async fn ack(
734 &self,
735 topic: &Topic,
736 consumer: &ConsumerId,
737 up_to: EventId,
738 ) -> Result<(), LogError> {
739 let mut state = self.state.lock().await;
740 state.consumers.insert(
741 (topic.as_str().to_string(), consumer.as_str().to_string()),
742 up_to,
743 );
744 Ok(())
745 }
746
747 async fn consumer_cursor(
748 &self,
749 topic: &Topic,
750 consumer: &ConsumerId,
751 ) -> Result<Option<EventId>, LogError> {
752 let state = self.state.lock().await;
753 Ok(state
754 .consumers
755 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
756 .copied())
757 }
758
759 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
760 let state = self.state.lock().await;
761 Ok(state.latest.get(topic.as_str()).copied())
762 }
763
764 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
765 let mut state = self.state.lock().await;
766 let Some(events) = state.topics.get_mut(topic.as_str()) else {
767 return Ok(CompactReport::default());
768 };
769 let removed = events
770 .iter()
771 .take_while(|(event_id, _)| *event_id <= before)
772 .count();
773 for _ in 0..removed {
774 events.pop_front();
775 }
776 Ok(CompactReport {
777 removed,
778 remaining: events.len(),
779 latest: state.latest.get(topic.as_str()).copied(),
780 checkpointed: false,
781 })
782 }
783}
784
785#[derive(Serialize, Deserialize)]
786struct FileRecord {
787 id: EventId,
788 event: LogEvent,
789}
790
791pub struct FileEventLog {
792 root: PathBuf,
793 latest_ids: Mutex<HashMap<String, EventId>>,
794 write_lock: Mutex<()>,
795 broadcasts: BroadcastMap,
796 queue_depth: usize,
797}
798
799impl FileEventLog {
800 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
801 std::fs::create_dir_all(root.join("topics"))
802 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
803 std::fs::create_dir_all(root.join("consumers"))
804 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
805 Ok(Self {
806 root,
807 latest_ids: Mutex::new(HashMap::new()),
808 write_lock: Mutex::new(()),
809 broadcasts: BroadcastMap::default(),
810 queue_depth: queue_depth.max(1),
811 })
812 }
813
814 fn topic_path(&self, topic: &Topic) -> PathBuf {
815 self.root
816 .join("topics")
817 .join(format!("{}.jsonl", topic.as_str()))
818 }
819
820 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
821 self.root.join("consumers").join(format!(
822 "{}__{}.json",
823 topic.as_str(),
824 sanitize_filename(consumer.as_str())
825 ))
826 }
827
828 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
829 if let Some(event_id) = self
830 .latest_ids
831 .lock()
832 .expect("file event log latest ids poisoned")
833 .get(topic.as_str())
834 .copied()
835 {
836 return Ok(event_id);
837 }
838
839 let mut latest = 0;
840 let path = self.topic_path(topic);
841 if path.is_file() {
842 for record in read_file_records(&path)? {
843 latest = record.id;
844 }
845 }
846 self.latest_ids
847 .lock()
848 .expect("file event log latest ids poisoned")
849 .insert(topic.as_str().to_string(), latest);
850 Ok(latest)
851 }
852
853 fn read_range_sync(
854 &self,
855 topic: &Topic,
856 from: Option<EventId>,
857 limit: usize,
858 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
859 let path = self.topic_path(topic);
860 if !path.is_file() {
861 return Ok(Vec::new());
862 }
863 let from = from.unwrap_or(0);
864 let mut events = Vec::new();
865 for record in read_file_records(&path)? {
866 if record.id > from {
867 events.push((record.id, record.event));
868 }
869 if events.len() >= limit {
870 break;
871 }
872 }
873 Ok(events)
874 }
875
876 fn topics(&self) -> Result<Vec<Topic>, LogError> {
877 let topics_dir = self.root.join("topics");
878 if !topics_dir.is_dir() {
879 return Ok(Vec::new());
880 }
881 let mut topics = Vec::new();
882 for entry in std::fs::read_dir(&topics_dir)
883 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
884 {
885 let entry = entry
886 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
887 let path = entry.path();
888 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
889 continue;
890 }
891 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
892 continue;
893 };
894 topics.push(Topic::new(stem.to_string())?);
895 }
896 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
897 Ok(topics)
898 }
899}
900
901fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
902 let file = std::fs::File::open(path)
903 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
904 let mut reader = std::io::BufReader::new(file);
905 let mut records = Vec::new();
906 let mut line = Vec::new();
907 loop {
908 line.clear();
909 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
910 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
911 if bytes_read == 0 {
912 break;
913 }
914 if line.iter().all(u8::is_ascii_whitespace) {
915 continue;
916 }
917 let complete_line = line.ends_with(b"\n");
918 match serde_json::from_slice::<FileRecord>(&line) {
919 Ok(record) => records.push(record),
920 Err(_) if !complete_line => break,
921 Err(error) => {
922 return Err(LogError::Serde(format!("event log parse error: {error}")));
923 }
924 }
925 }
926 Ok(records)
927}
928
929impl EventLog for FileEventLog {
930 fn describe(&self) -> EventLogDescription {
931 EventLogDescription {
932 backend: EventLogBackendKind::File,
933 location: Some(self.root.clone()),
934 size_bytes: Some(dir_size_bytes(&self.root)),
935 queue_depth: self.queue_depth,
936 }
937 }
938
939 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
940 let _guard = self
941 .write_lock
942 .lock()
943 .expect("file event log write lock poisoned");
944 let next_id = self.latest_id_for_topic(topic)? + 1;
945 let previous_hash = self
946 .read_range_sync(topic, None, usize::MAX)?
947 .last()
948 .map(|(previous_id, previous_event)| {
949 crate::provenance::event_record_hash_from_headers(
950 topic.as_str(),
951 *previous_id,
952 previous_event,
953 )
954 })
955 .transpose()?;
956 let event = crate::provenance::prepare_event_for_append(
957 topic.as_str(),
958 next_id,
959 previous_hash,
960 event,
961 )?;
962 let record = FileRecord {
963 id: next_id,
964 event: event.clone(),
965 };
966 let path = self.topic_path(topic);
967 if let Some(parent) = path.parent() {
968 std::fs::create_dir_all(parent)
969 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
970 }
971 let line = serde_json::to_string(&record)
972 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
973 use std::io::Write as _;
974 let mut file = std::fs::OpenOptions::new()
975 .create(true)
976 .append(true)
977 .open(&path)
978 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
979 writeln!(file, "{line}")
980 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
981 self.latest_ids
982 .lock()
983 .expect("file event log latest ids poisoned")
984 .insert(topic.as_str().to_string(), next_id);
985 self.broadcasts
986 .publish(topic, self.queue_depth, (next_id, event));
987 Ok(next_id)
988 }
989
990 async fn flush(&self) -> Result<(), LogError> {
991 sync_tree(&self.root)
992 }
993
994 async fn read_range(
995 &self,
996 topic: &Topic,
997 from: Option<EventId>,
998 limit: usize,
999 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1000 self.read_range_sync(topic, from, limit)
1001 }
1002
1003 async fn read_range_bytes(
1004 &self,
1005 topic: &Topic,
1006 from: Option<EventId>,
1007 limit: usize,
1008 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1009 self.read_range_sync(topic, from, limit)?
1010 .into_iter()
1011 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1012 .collect()
1013 }
1014
1015 async fn subscribe(
1016 self: Arc<Self>,
1017 topic: &Topic,
1018 from: Option<EventId>,
1019 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1020 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1021 let history = self.read_range_sync(topic, from, usize::MAX)?;
1022 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1023 }
1024
1025 async fn ack(
1026 &self,
1027 topic: &Topic,
1028 consumer: &ConsumerId,
1029 up_to: EventId,
1030 ) -> Result<(), LogError> {
1031 let path = self.consumer_path(topic, consumer);
1032 let payload = serde_json::json!({
1033 "topic": topic.as_str(),
1034 "consumer_id": consumer.as_str(),
1035 "cursor": up_to,
1036 "updated_at_ms": now_ms(),
1037 });
1038 write_json_atomically(&path, &payload)
1039 }
1040
1041 async fn consumer_cursor(
1042 &self,
1043 topic: &Topic,
1044 consumer: &ConsumerId,
1045 ) -> Result<Option<EventId>, LogError> {
1046 let path = self.consumer_path(topic, consumer);
1047 if !path.is_file() {
1048 return Ok(None);
1049 }
1050 let raw = std::fs::read_to_string(&path)
1051 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1052 let payload: serde_json::Value = serde_json::from_str(&raw)
1053 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1054 let cursor = payload
1055 .get("cursor")
1056 .and_then(serde_json::Value::as_u64)
1057 .ok_or_else(|| {
1058 LogError::Serde("event log consumer record missing numeric cursor".to_string())
1059 })?;
1060 Ok(Some(cursor))
1061 }
1062
1063 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1064 let latest = self.latest_id_for_topic(topic)?;
1065 if latest == 0 {
1066 Ok(None)
1067 } else {
1068 Ok(Some(latest))
1069 }
1070 }
1071
1072 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1073 let _guard = self
1074 .write_lock
1075 .lock()
1076 .expect("file event log write lock poisoned");
1077 let path = self.topic_path(topic);
1078 if !path.is_file() {
1079 return Ok(CompactReport::default());
1080 }
1081 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1082 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1083 if retained.is_empty() {
1084 let _ = std::fs::remove_file(&path);
1085 } else {
1086 crate::atomic_io::atomic_write_with(&path, |writer| {
1087 use std::io::Write as _;
1088 for (event_id, event) in &retained {
1089 let line = serde_json::to_string(&FileRecord {
1090 id: *event_id,
1091 event: event.clone(),
1092 })
1093 .map_err(|error| std::io::Error::other(error.to_string()))?;
1094 writeln!(writer, "{line}")?;
1095 }
1096 Ok(())
1097 })
1098 .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1099 }
1100 let latest = retained.last().map(|(event_id, _)| *event_id);
1101 self.latest_ids
1102 .lock()
1103 .expect("file event log latest ids poisoned")
1104 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1105 Ok(CompactReport {
1106 removed,
1107 remaining: retained.len(),
1108 latest,
1109 checkpointed: false,
1110 })
1111 }
1112}
1113
1114pub struct SqliteEventLog {
1115 path: PathBuf,
1116 connection: Mutex<Connection>,
1117 broadcasts: BroadcastMap,
1118 queue_depth: usize,
1119}
1120
1121impl SqliteEventLog {
1122 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1123 if let Some(parent) = path.parent() {
1124 std::fs::create_dir_all(parent)
1125 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1126 }
1127 let connection = Connection::open(&path)
1128 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1129 connection
1136 .busy_timeout(std::time::Duration::from_secs(5))
1137 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1138 connection
1139 .pragma_update(None, "journal_mode", "WAL")
1140 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1141 connection
1142 .pragma_update(None, "synchronous", "NORMAL")
1143 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1144 connection
1145 .execute_batch(
1146 "CREATE TABLE IF NOT EXISTS topic_heads (
1147 topic TEXT PRIMARY KEY,
1148 last_id INTEGER NOT NULL
1149 );
1150 CREATE TABLE IF NOT EXISTS events (
1151 topic TEXT NOT NULL,
1152 event_id INTEGER NOT NULL,
1153 kind TEXT NOT NULL,
1154 payload BLOB NOT NULL,
1155 headers TEXT NOT NULL,
1156 occurred_at_ms INTEGER NOT NULL,
1157 PRIMARY KEY (topic, event_id)
1158 );
1159 CREATE TABLE IF NOT EXISTS consumers (
1160 topic TEXT NOT NULL,
1161 consumer_id TEXT NOT NULL,
1162 cursor INTEGER NOT NULL,
1163 updated_at_ms INTEGER NOT NULL,
1164 PRIMARY KEY (topic, consumer_id)
1165 );",
1166 )
1167 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1168 Ok(Self {
1169 path,
1170 connection: Mutex::new(connection),
1171 broadcasts: BroadcastMap::default(),
1172 queue_depth: queue_depth.max(1),
1173 })
1174 }
1175
1176 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1177 let connection = self
1178 .connection
1179 .lock()
1180 .expect("sqlite event log connection poisoned");
1181 let mut statement = connection
1182 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1183 .map_err(|error| {
1184 LogError::Sqlite(format!("event log topics prepare error: {error}"))
1185 })?;
1186 let rows = statement
1187 .query_map([], |row| row.get::<_, String>(0))
1188 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1189 let mut topics = Vec::new();
1190 for row in rows {
1191 topics.push(Topic::new(row.map_err(|error| {
1192 LogError::Sqlite(format!("event log topic row error: {error}"))
1193 })?)?);
1194 }
1195 Ok(topics)
1196 }
1197}
1198
1199impl EventLog for SqliteEventLog {
1200 fn describe(&self) -> EventLogDescription {
1201 EventLogDescription {
1202 backend: EventLogBackendKind::Sqlite,
1203 location: Some(self.path.clone()),
1204 size_bytes: Some(sqlite_size_bytes(&self.path)),
1205 queue_depth: self.queue_depth,
1206 }
1207 }
1208
1209 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1210 let mut connection = self
1211 .connection
1212 .lock()
1213 .expect("sqlite event log connection poisoned");
1214 let tx = connection
1215 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1216 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1217 tx.execute(
1218 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1219 params![topic.as_str()],
1220 )
1221 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1222 tx.execute(
1223 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1224 params![topic.as_str()],
1225 )
1226 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1227 let event_id = tx
1228 .query_row(
1229 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1230 params![topic.as_str()],
1231 |row| row.get::<_, i64>(0),
1232 )
1233 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1234 .and_then(sqlite_i64_to_event_id)?;
1235 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1236 let previous = tx
1237 .query_row(
1238 "SELECT event_id, kind, payload, headers, occurred_at_ms
1239 FROM events
1240 WHERE topic = ?1 AND event_id < ?2
1241 ORDER BY event_id DESC
1242 LIMIT 1",
1243 params![topic.as_str(), event_id_sql],
1244 |row| {
1245 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1246 let headers: String = row.get(3)?;
1247 Ok((
1248 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1249 LogEvent {
1250 kind: row.get(1)?,
1251 payload: serde_json::from_slice(&payload).map_err(|error| {
1252 rusqlite::Error::FromSqlConversionFailure(
1253 payload.len(),
1254 rusqlite::types::Type::Blob,
1255 Box::new(error),
1256 )
1257 })?,
1258 headers: serde_json::from_str(&headers).map_err(|error| {
1259 rusqlite::Error::FromSqlConversionFailure(
1260 headers.len(),
1261 rusqlite::types::Type::Text,
1262 Box::new(error),
1263 )
1264 })?,
1265 occurred_at_ms: row.get(4)?,
1266 },
1267 ))
1268 },
1269 )
1270 .optional()
1271 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1272 let previous_hash = previous
1273 .as_ref()
1274 .map(|(previous_id, previous_event)| {
1275 crate::provenance::event_record_hash_from_headers(
1276 topic.as_str(),
1277 *previous_id,
1278 previous_event,
1279 )
1280 })
1281 .transpose()?;
1282 let event = crate::provenance::prepare_event_for_append(
1283 topic.as_str(),
1284 event_id,
1285 previous_hash,
1286 event,
1287 )?;
1288 tx.execute(
1289 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1290 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1291 params![
1292 topic.as_str(),
1293 event_id_sql,
1294 event.kind,
1295 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1296 "event log payload encode error: {error}"
1297 )))?,
1298 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1299 "event log headers encode error: {error}"
1300 )))?,
1301 event.occurred_at_ms
1302 ],
1303 )
1304 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1305 tx.commit()
1306 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1307 self.broadcasts
1308 .publish(topic, self.queue_depth, (event_id, event.clone()));
1309 Ok(event_id)
1310 }
1311
1312 async fn flush(&self) -> Result<(), LogError> {
1313 let connection = self
1314 .connection
1315 .lock()
1316 .expect("sqlite event log connection poisoned");
1317 connection
1318 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1319 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1320 Ok(())
1321 }
1322
1323 async fn read_range(
1324 &self,
1325 topic: &Topic,
1326 from: Option<EventId>,
1327 limit: usize,
1328 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1329 let connection = self
1330 .connection
1331 .lock()
1332 .expect("sqlite event log connection poisoned");
1333 let mut statement = connection
1334 .prepare(
1335 "SELECT event_id, kind, payload, headers, occurred_at_ms
1336 FROM events
1337 WHERE topic = ?1 AND event_id > ?2
1338 ORDER BY event_id ASC
1339 LIMIT ?3",
1340 )
1341 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1342 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1343 let rows = statement
1344 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1345 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1346 let headers: String = row.get(3)?;
1347 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1348 Ok((
1349 event_id,
1350 LogEvent {
1351 kind: row.get(1)?,
1352 payload: serde_json::from_slice(&payload).map_err(|error| {
1353 rusqlite::Error::FromSqlConversionFailure(
1354 payload.len(),
1355 rusqlite::types::Type::Blob,
1356 Box::new(error),
1357 )
1358 })?,
1359 headers: serde_json::from_str(&headers).map_err(|error| {
1360 rusqlite::Error::FromSqlConversionFailure(
1361 headers.len(),
1362 rusqlite::types::Type::Text,
1363 Box::new(error),
1364 )
1365 })?,
1366 occurred_at_ms: row.get(4)?,
1367 },
1368 ))
1369 })
1370 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1371 let mut events = Vec::new();
1372 for row in rows {
1373 events.push(
1374 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1375 );
1376 }
1377 Ok(events)
1378 }
1379
1380 async fn read_range_bytes(
1381 &self,
1382 topic: &Topic,
1383 from: Option<EventId>,
1384 limit: usize,
1385 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1386 let connection = self
1387 .connection
1388 .lock()
1389 .expect("sqlite event log connection poisoned");
1390 let mut statement = connection
1391 .prepare(
1392 "SELECT event_id, kind, payload, headers, occurred_at_ms
1393 FROM events
1394 WHERE topic = ?1 AND event_id > ?2
1395 ORDER BY event_id ASC
1396 LIMIT ?3",
1397 )
1398 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1399 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1400 let rows = statement
1401 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1402 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1403 let headers: String = row.get(3)?;
1404 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1405 Ok((
1406 event_id,
1407 LogEventBytes {
1408 kind: row.get(1)?,
1409 payload: Bytes::from(payload),
1410 headers: serde_json::from_str(&headers).map_err(|error| {
1411 rusqlite::Error::FromSqlConversionFailure(
1412 headers.len(),
1413 rusqlite::types::Type::Text,
1414 Box::new(error),
1415 )
1416 })?,
1417 occurred_at_ms: row.get(4)?,
1418 },
1419 ))
1420 })
1421 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1422 let mut events = Vec::new();
1423 for row in rows {
1424 events.push(
1425 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1426 );
1427 }
1428 Ok(events)
1429 }
1430
1431 async fn subscribe(
1432 self: Arc<Self>,
1433 topic: &Topic,
1434 from: Option<EventId>,
1435 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1436 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1437 let history = self.read_range(topic, from, usize::MAX).await?;
1438 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1439 }
1440
1441 async fn ack(
1442 &self,
1443 topic: &Topic,
1444 consumer: &ConsumerId,
1445 up_to: EventId,
1446 ) -> Result<(), LogError> {
1447 let connection = self
1448 .connection
1449 .lock()
1450 .expect("sqlite event log connection poisoned");
1451 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1452 connection
1453 .execute(
1454 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1455 VALUES (?1, ?2, ?3, ?4)
1456 ON CONFLICT(topic, consumer_id)
1457 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1458 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1459 )
1460 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1461 Ok(())
1462 }
1463
1464 async fn consumer_cursor(
1465 &self,
1466 topic: &Topic,
1467 consumer: &ConsumerId,
1468 ) -> Result<Option<EventId>, LogError> {
1469 let connection = self
1470 .connection
1471 .lock()
1472 .expect("sqlite event log connection poisoned");
1473 connection
1474 .query_row(
1475 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1476 params![topic.as_str(), consumer.as_str()],
1477 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1478 )
1479 .optional()
1480 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1481 }
1482
1483 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1484 let connection = self
1485 .connection
1486 .lock()
1487 .expect("sqlite event log connection poisoned");
1488 connection
1489 .query_row(
1490 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1491 params![topic.as_str()],
1492 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1493 )
1494 .optional()
1495 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1496 }
1497
1498 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1499 let connection = self
1500 .connection
1501 .lock()
1502 .expect("sqlite event log connection poisoned");
1503 let before_sql = event_id_to_sqlite_i64(before)?;
1504 let removed = connection
1505 .execute(
1506 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1507 params![topic.as_str(), before_sql],
1508 )
1509 .map_err(|error| {
1510 LogError::Sqlite(format!("event log compact delete error: {error}"))
1511 })?;
1512 let remaining = connection
1513 .query_row(
1514 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1515 params![topic.as_str()],
1516 |row| row.get::<_, i64>(0),
1517 )
1518 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1519 .and_then(sqlite_i64_to_usize)?;
1520 let latest = connection
1521 .query_row(
1522 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1523 params![topic.as_str()],
1524 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1525 )
1526 .optional()
1527 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1528 connection
1529 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1530 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1531 Ok(CompactReport {
1532 removed,
1533 remaining,
1534 latest,
1535 checkpointed: true,
1536 })
1537 }
1538}
1539
1540fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1541 let candidate = PathBuf::from(value);
1542 if candidate.is_absolute() {
1543 candidate
1544 } else {
1545 base_dir.join(candidate)
1546 }
1547}
1548
1549fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1550 let encoded = serde_json::to_vec_pretty(payload)
1551 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1552 crate::atomic_io::atomic_write(path, &encoded)
1553 .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1554}
1555
1556fn sanitize_filename(value: &str) -> String {
1557 sanitize_topic_component(value)
1558}
1559
1560pub fn sanitize_topic_component(value: &str) -> String {
1561 value
1562 .chars()
1563 .map(|ch| {
1564 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1565 ch
1566 } else {
1567 '_'
1568 }
1569 })
1570 .collect()
1571}
1572
1573fn dir_size_bytes(path: &Path) -> u64 {
1574 if !path.exists() {
1575 return 0;
1576 }
1577 let mut total = 0;
1578 if let Ok(entries) = std::fs::read_dir(path) {
1579 for entry in entries.flatten() {
1580 let path = entry.path();
1581 if path.is_dir() {
1582 total += dir_size_bytes(&path);
1583 } else if let Ok(metadata) = entry.metadata() {
1584 total += metadata.len();
1585 }
1586 }
1587 }
1588 total
1589}
1590
1591fn sqlite_size_bytes(path: &Path) -> u64 {
1592 let mut total = file_size(path);
1593 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1594 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1595 total
1596}
1597
1598fn file_size(path: &Path) -> u64 {
1599 std::fs::metadata(path)
1600 .map(|metadata| metadata.len())
1601 .unwrap_or(0)
1602}
1603
1604fn sync_tree(root: &Path) -> Result<(), LogError> {
1605 if !root.exists() {
1606 return Ok(());
1607 }
1608 for entry in std::fs::read_dir(root)
1609 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1610 {
1611 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1612 let path = entry.path();
1613 if path.is_dir() {
1614 sync_tree(&path)?;
1615 continue;
1616 }
1617 std::fs::File::open(&path)
1618 .and_then(|file| file.sync_all())
1619 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1620 }
1621 Ok(())
1622}
1623
1624fn now_ms() -> i64 {
1625 std::time::SystemTime::now()
1626 .duration_since(std::time::UNIX_EPOCH)
1627 .map(|duration| duration.as_millis() as i64)
1628 .unwrap_or(0)
1629}
1630
1631fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1632 i64::try_from(event_id)
1633 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1634}
1635
1636fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1637 u64::try_from(value)
1638 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1639}
1640
1641fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1642 u64::try_from(value).map_err(|_| {
1643 rusqlite::Error::FromSqlConversionFailure(
1644 std::mem::size_of::<i64>(),
1645 rusqlite::types::Type::Integer,
1646 "sqlite event id is negative".into(),
1647 )
1648 })
1649}
1650
1651fn sqlite_json_bytes_for_row(
1652 row: &rusqlite::Row<'_>,
1653 index: usize,
1654 name: &str,
1655) -> rusqlite::Result<Vec<u8>> {
1656 let value = row.get_ref(index)?;
1657 match value {
1658 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1659 Ok(bytes.to_vec())
1660 }
1661 other => Err(rusqlite::Error::InvalidColumnType(
1662 index,
1663 name.to_string(),
1664 other.data_type(),
1665 )),
1666 }
1667}
1668
1669fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1670 usize::try_from(value)
1671 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676 use super::*;
1677 use futures::StreamExt;
1678 use rand::{rngs::StdRng, RngExt, SeedableRng};
1679
1680 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1681 let topic = Topic::new("trigger.inbox").unwrap();
1682 for i in 0..10_000 {
1683 log.append(
1684 &topic,
1685 LogEvent::new("append", serde_json::json!({ "i": i })),
1686 )
1687 .await
1688 .unwrap();
1689 }
1690 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1691 assert_eq!(events.len(), 10_000);
1692 assert_eq!(events.first().unwrap().0, 1);
1693 assert_eq!(events.last().unwrap().0, 10_000);
1694 }
1695
1696 #[tokio::test(flavor = "current_thread")]
1697 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1698 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1699 exercise_basic_backend(log.clone()).await;
1700
1701 let topic = Topic::new("agent.transcript.demo").unwrap();
1702 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1703 let first = log
1704 .append(
1705 &topic,
1706 LogEvent::new("message", serde_json::json!({"text":"one"})),
1707 )
1708 .await
1709 .unwrap();
1710 let second = log
1711 .append(
1712 &topic,
1713 LogEvent::new("message", serde_json::json!({"text":"two"})),
1714 )
1715 .await
1716 .unwrap();
1717 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1718 assert_eq!(seen[0].as_ref().unwrap().0, first);
1719 assert_eq!(seen[1].as_ref().unwrap().0, second);
1720
1721 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1722 .await
1723 .unwrap();
1724 let compact = log.compact(&topic, first).await.unwrap();
1725 assert_eq!(compact.removed, 1);
1726 assert_eq!(compact.remaining, 1);
1727 }
1728
1729 #[tokio::test(flavor = "current_thread")]
1730 async fn file_backend_persists_across_reopen_and_compacts() {
1731 let dir = tempfile::tempdir().unwrap();
1732 let topic = Topic::new("trigger.outbox").unwrap();
1733 let first_log = Arc::new(AnyEventLog::File(
1734 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1735 ));
1736 first_log
1737 .append(
1738 &topic,
1739 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1740 )
1741 .await
1742 .unwrap();
1743 first_log
1744 .append(
1745 &topic,
1746 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1747 )
1748 .await
1749 .unwrap();
1750 drop(first_log);
1751
1752 let reopened = Arc::new(AnyEventLog::File(
1753 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1754 ));
1755 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1756 assert_eq!(events.len(), 2);
1757 let compact = reopened.compact(&topic, 1).await.unwrap();
1758 assert_eq!(compact.removed, 1);
1759 assert_eq!(
1760 reopened
1761 .read_range(&topic, None, usize::MAX)
1762 .await
1763 .unwrap()
1764 .len(),
1765 1
1766 );
1767 }
1768
1769 #[tokio::test(flavor = "current_thread")]
1770 async fn file_backend_skips_torn_tail_on_restart() {
1771 let dir = tempfile::tempdir().unwrap();
1772 let topic = Topic::new("trigger.inbox").unwrap();
1773 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1774 first_log
1775 .append(
1776 &topic,
1777 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1778 )
1779 .await
1780 .unwrap();
1781 drop(first_log);
1782
1783 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1784 use std::io::Write as _;
1785 let mut file = std::fs::OpenOptions::new()
1786 .append(true)
1787 .open(&topic_path)
1788 .unwrap();
1789 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1790 drop(file);
1791
1792 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1793 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1794 assert_eq!(events.len(), 1);
1795 assert_eq!(events[0].0, 1);
1796 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1797 }
1798
1799 #[tokio::test(flavor = "current_thread")]
1800 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1801 let dir = tempfile::tempdir().unwrap();
1802 let path = dir.path().join("events.sqlite");
1803 let topic = Topic::new("daemon.demo.state").unwrap();
1804 let first_log = Arc::new(AnyEventLog::Sqlite(
1805 SqliteEventLog::open(path.clone(), 8).unwrap(),
1806 ));
1807 first_log
1808 .append(
1809 &topic,
1810 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1811 )
1812 .await
1813 .unwrap();
1814 first_log
1815 .append(
1816 &topic,
1817 LogEvent::new("state", serde_json::json!({"state":"active"})),
1818 )
1819 .await
1820 .unwrap();
1821 drop(first_log);
1822
1823 let reopened = Arc::new(AnyEventLog::Sqlite(
1824 SqliteEventLog::open(path.clone(), 8).unwrap(),
1825 ));
1826 assert_eq!(
1827 reopened
1828 .read_range(&topic, None, usize::MAX)
1829 .await
1830 .unwrap()
1831 .len(),
1832 2
1833 );
1834 let compact = reopened.compact(&topic, 1).await.unwrap();
1835 assert!(compact.checkpointed);
1836 let wal = PathBuf::from(format!("{}-wal", path.display()));
1837 assert!(file_size(&wal) == 0 || !wal.exists());
1838 }
1839
1840 #[tokio::test(flavor = "current_thread")]
1841 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1842 let dir = tempfile::tempdir().unwrap();
1843 let path = dir.path().join("events.sqlite");
1844 let topic = Topic::new("observability.action_graph").unwrap();
1845 let log = SqliteEventLog::open(path, 8).unwrap();
1846 let event_id = log
1847 .append(
1848 &topic,
1849 LogEvent::new(
1850 "snapshot",
1851 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1852 ),
1853 )
1854 .await
1855 .unwrap();
1856
1857 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1858 assert_eq!(events.len(), 1);
1859 assert_eq!(events[0].0, event_id);
1860 assert_eq!(
1861 events[0].1.payload_json().unwrap(),
1862 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1863 );
1864 }
1865
1866 #[tokio::test(flavor = "current_thread")]
1867 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1868 let dir = tempfile::tempdir().unwrap();
1869 let path = dir.path().join("events.sqlite");
1870 let topic = Topic::new("agent.transcript.legacy").unwrap();
1871 let log = SqliteEventLog::open(path, 8).unwrap();
1872 {
1873 let connection = log.connection.lock().unwrap();
1874 connection
1875 .execute(
1876 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1877 params![topic.as_str()],
1878 )
1879 .unwrap();
1880 connection
1881 .execute(
1882 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1883 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1884 params![topic.as_str(), "{\"text\":\"old\"}"],
1885 )
1886 .unwrap();
1887 }
1888
1889 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1890 assert_eq!(
1891 events[0].1.payload_json().unwrap(),
1892 serde_json::json!({"text": "old"})
1893 );
1894 assert_eq!(
1895 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1896 "legacy"
1897 );
1898 }
1899
1900 #[tokio::test(flavor = "current_thread")]
1901 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1902 let (sender, rx) = broadcast::channel(2);
1903 for i in 0..10 {
1904 sender
1905 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1906 .unwrap();
1907 }
1908 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1909
1910 match stream.next().await {
1911 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1912 other => panic!("subscriber should surface lag, got {other:?}"),
1913 }
1914 }
1915
1916 #[tokio::test(flavor = "current_thread")]
1917 async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1918 let (sender, rx) = broadcast::channel(2);
1919 let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1920 assert_eq!(sender.receiver_count(), 1);
1921 drop(stream);
1922
1923 tokio::time::timeout(std::time::Duration::from_millis(100), async {
1924 while sender.receiver_count() != 0 {
1925 tokio::task::yield_now().await;
1926 }
1927 })
1928 .await
1929 .expect("subscription receiver should close after consumer drop");
1930 }
1931
1932 #[tokio::test(flavor = "current_thread")]
1933 async fn randomized_reader_sequences_stay_monotonic() {
1934 let log = Arc::new(MemoryEventLog::new(32));
1935 let topic = Topic::new("fuzz.demo").unwrap();
1936 let mut readers = vec![
1937 log.clone().subscribe(&topic, None).await.unwrap(),
1938 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1939 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1940 ];
1941 let mut rng = StdRng::seed_from_u64(7);
1942 for _ in 0..64 {
1943 let value = rng.random_range(0..1000);
1944 log.append(
1945 &topic,
1946 LogEvent::new("rand", serde_json::json!({"value": value})),
1947 )
1948 .await
1949 .unwrap();
1950 }
1951
1952 let mut sequences = Vec::new();
1953 for reader in &mut readers {
1954 let mut ids = Vec::new();
1955 while let Some(item) = reader.next().await {
1956 match item {
1957 Ok((event_id, _)) => {
1958 ids.push(event_id);
1959 if ids.len() >= 16 {
1960 break;
1961 }
1962 }
1963 Err(LogError::ConsumerLagged(_)) => break,
1964 Err(error) => panic!("unexpected subscription error: {error}"),
1965 }
1966 }
1967 sequences.push(ids);
1968 }
1969
1970 for ids in sequences {
1971 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1972 }
1973 }
1974}