1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27 let value = value.into();
28 if value.is_empty() {
29 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30 }
31 if !value
32 .chars()
33 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34 {
35 return Err(LogError::InvalidTopic(format!(
36 "topic '{value}' contains unsupported characters"
37 )));
38 }
39 Ok(Self(value))
40 }
41
42 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl fmt::Display for Topic {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 self.0.fmt(f)
50 }
51}
52
53impl FromStr for Topic {
54 type Err = LogError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 Self::new(s)
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66 let value = value.into();
67 if value.trim().is_empty() {
68 return Err(LogError::InvalidConsumer(
69 "consumer id cannot be empty".to_string(),
70 ));
71 }
72 Ok(Self(value))
73 }
74
75 pub fn as_str(&self) -> &str {
76 &self.0
77 }
78}
79
80impl fmt::Display for ConsumerId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 self.0.fmt(f)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89 Memory,
90 File,
91 Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::Memory => write!(f, "memory"),
98 Self::File => write!(f, "file"),
99 Self::Sqlite => write!(f, "sqlite"),
100 }
101 }
102}
103
104impl FromStr for EventLogBackendKind {
105 type Err = LogError;
106
107 fn from_str(value: &str) -> Result<Self, Self::Err> {
108 match value.trim().to_ascii_lowercase().as_str() {
109 "memory" => Ok(Self::Memory),
110 "file" => Ok(Self::File),
111 "sqlite" => Ok(Self::Sqlite),
112 other => Err(LogError::Config(format!(
113 "unsupported event log backend '{other}'"
114 ))),
115 }
116 }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121 pub kind: String,
122 pub payload: serde_json::Value,
123 #[serde(default)]
124 pub headers: BTreeMap<String, String>,
125 pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130 Self {
131 kind: kind.into(),
132 payload,
133 headers: BTreeMap::new(),
134 occurred_at_ms: now_ms(),
135 }
136 }
137
138 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139 self.headers = headers;
140 self
141 }
142}
143
144#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152 pub kind: String,
153 pub payload: Bytes,
154 pub headers: BTreeMap<String, String>,
155 pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160 serde_json::from_slice(&self.payload)
161 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162 }
163
164 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165 Ok(LogEvent {
166 kind: self.kind,
167 payload: serde_json::from_slice(&self.payload).map_err(|error| {
168 LogError::Serde(format!("event log payload parse error: {error}"))
169 })?,
170 headers: self.headers,
171 occurred_at_ms: self.occurred_at_ms,
172 })
173 }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177 type Error = LogError;
178
179 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180 let payload = serde_json::to_vec(&event.payload)
181 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182 Ok(Self {
183 kind: event.kind,
184 payload: Bytes::from(payload),
185 headers: event.headers,
186 occurred_at_ms: event.occurred_at_ms,
187 })
188 }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193 pub removed: usize,
194 pub remaining: usize,
195 pub latest: Option<EventId>,
196 pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201 pub backend: EventLogBackendKind,
202 pub location: Option<PathBuf>,
203 pub size_bytes: Option<u64>,
204 pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209 Config(String),
210 InvalidTopic(String),
211 InvalidConsumer(String),
212 Io(String),
213 Serde(String),
214 Sqlite(String),
215 ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 match self {
221 Self::Config(message)
222 | Self::InvalidTopic(message)
223 | Self::InvalidConsumer(message)
224 | Self::Io(message)
225 | Self::Serde(message)
226 | Self::Sqlite(message) => message.fmt(f),
227 Self::ConsumerLagged(last_id) => {
228 write!(f, "subscriber lagged behind after event {last_id}")
229 }
230 }
231 }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238 fn describe(&self) -> EventLogDescription;
239
240 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242 async fn flush(&self) -> Result<(), LogError>;
243
244 async fn read_range(
247 &self,
248 topic: &Topic,
249 from: Option<EventId>,
250 limit: usize,
251 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253 async fn read_range_bytes(
254 &self,
255 topic: &Topic,
256 from: Option<EventId>,
257 limit: usize,
258 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259 let events = self.read_range(topic, from, limit).await?;
260 events
261 .into_iter()
262 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263 .collect()
264 }
265
266 async fn subscribe(
269 self: Arc<Self>,
270 topic: &Topic,
271 from: Option<EventId>,
272 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274 async fn ack(
275 &self,
276 topic: &Topic,
277 consumer: &ConsumerId,
278 up_to: EventId,
279 ) -> Result<(), LogError>;
280
281 async fn consumer_cursor(
282 &self,
283 topic: &Topic,
284 consumer: &ConsumerId,
285 ) -> Result<Option<EventId>, LogError>;
286
287 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294 pub backend: EventLogBackendKind,
295 pub file_dir: PathBuf,
296 pub sqlite_path: PathBuf,
297 pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303 .ok()
304 .map(|value| value.parse())
305 .transpose()?
306 .unwrap_or(EventLogBackendKind::Sqlite);
307 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308 .ok()
309 .and_then(|value| value.parse::<usize>().ok())
310 .unwrap_or(128)
311 .max(1);
312
313 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315 _ => crate::runtime_paths::event_log_dir(base_dir),
316 };
317 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320 };
321
322 Ok(Self {
323 backend,
324 file_dir,
325 sqlite_path,
326 queue_depth,
327 })
328 }
329
330 pub fn location(&self) -> Option<PathBuf> {
331 match self.backend {
332 EventLogBackendKind::Memory => None,
333 EventLogBackendKind::File => Some(self.file_dir.clone()),
334 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335 }
336 }
337}
338
339thread_local! {
340 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344 let config = EventLogConfig::for_base_dir(base_dir)?;
345 let log = open_event_log(&config)?;
346 ACTIVE_EVENT_LOG.with(|slot| {
347 *slot.borrow_mut() = Some(log.clone());
348 });
349 Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354 ACTIVE_EVENT_LOG.with(|slot| {
355 *slot.borrow_mut() = Some(log.clone());
356 });
357 log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361 ACTIVE_EVENT_LOG.with(|slot| {
362 *slot.borrow_mut() = Some(log.clone());
363 });
364 log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372 ACTIVE_EVENT_LOG.with(|slot| {
373 *slot.borrow_mut() = None;
374 });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378 let config = EventLogConfig::for_base_dir(base_dir)?;
379 let description = match config.backend {
380 EventLogBackendKind::Memory => EventLogDescription {
381 backend: EventLogBackendKind::Memory,
382 location: None,
383 size_bytes: None,
384 queue_depth: config.queue_depth,
385 },
386 EventLogBackendKind::File => EventLogDescription {
387 backend: EventLogBackendKind::File,
388 size_bytes: Some(dir_size_bytes(&config.file_dir)),
389 location: Some(config.file_dir),
390 queue_depth: config.queue_depth,
391 },
392 EventLogBackendKind::Sqlite => EventLogDescription {
393 backend: EventLogBackendKind::Sqlite,
394 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395 location: Some(config.sqlite_path),
396 queue_depth: config.queue_depth,
397 },
398 };
399 Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403 match config.backend {
404 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405 config.queue_depth,
406 )))),
407 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408 config.file_dir.clone(),
409 config.queue_depth,
410 )?))),
411 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412 config.sqlite_path.clone(),
413 config.queue_depth,
414 )?))),
415 }
416}
417
418pub enum AnyEventLog {
419 Memory(MemoryEventLog),
420 File(FileEventLog),
421 Sqlite(SqliteEventLog),
422}
423
424impl AnyEventLog {
425 pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
426 match self {
427 Self::Memory(log) => log.topics().await,
428 Self::File(log) => log.topics(),
429 Self::Sqlite(log) => log.topics(),
430 }
431 }
432}
433
434impl EventLog for AnyEventLog {
435 fn describe(&self) -> EventLogDescription {
436 match self {
437 Self::Memory(log) => log.describe(),
438 Self::File(log) => log.describe(),
439 Self::Sqlite(log) => log.describe(),
440 }
441 }
442
443 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
444 match self {
445 Self::Memory(log) => log.append(topic, event).await,
446 Self::File(log) => log.append(topic, event).await,
447 Self::Sqlite(log) => log.append(topic, event).await,
448 }
449 }
450
451 async fn flush(&self) -> Result<(), LogError> {
452 match self {
453 Self::Memory(log) => log.flush().await,
454 Self::File(log) => log.flush().await,
455 Self::Sqlite(log) => log.flush().await,
456 }
457 }
458
459 async fn read_range(
460 &self,
461 topic: &Topic,
462 from: Option<EventId>,
463 limit: usize,
464 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
465 match self {
466 Self::Memory(log) => log.read_range(topic, from, limit).await,
467 Self::File(log) => log.read_range(topic, from, limit).await,
468 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
469 }
470 }
471
472 async fn read_range_bytes(
473 &self,
474 topic: &Topic,
475 from: Option<EventId>,
476 limit: usize,
477 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
478 match self {
479 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
480 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
481 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
482 }
483 }
484
485 async fn subscribe(
486 self: Arc<Self>,
487 topic: &Topic,
488 from: Option<EventId>,
489 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
490 let (rx, queue_depth) = match self.as_ref() {
491 Self::Memory(log) => (
492 log.broadcasts.subscribe(topic, log.queue_depth),
493 log.queue_depth,
494 ),
495 Self::File(log) => (
496 log.broadcasts.subscribe(topic, log.queue_depth),
497 log.queue_depth,
498 ),
499 Self::Sqlite(log) => (
500 log.broadcasts.subscribe(topic, log.queue_depth),
501 log.queue_depth,
502 ),
503 };
504 let history = self.read_range(topic, from, usize::MAX).await?;
505 Ok(stream_from_broadcast(history, from, rx, queue_depth))
506 }
507
508 async fn ack(
509 &self,
510 topic: &Topic,
511 consumer: &ConsumerId,
512 up_to: EventId,
513 ) -> Result<(), LogError> {
514 match self {
515 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
516 Self::File(log) => log.ack(topic, consumer, up_to).await,
517 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
518 }
519 }
520
521 async fn consumer_cursor(
522 &self,
523 topic: &Topic,
524 consumer: &ConsumerId,
525 ) -> Result<Option<EventId>, LogError> {
526 match self {
527 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
528 Self::File(log) => log.consumer_cursor(topic, consumer).await,
529 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
530 }
531 }
532
533 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
534 match self {
535 Self::Memory(log) => log.latest(topic).await,
536 Self::File(log) => log.latest(topic).await,
537 Self::Sqlite(log) => log.latest(topic).await,
538 }
539 }
540
541 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
542 match self {
543 Self::Memory(log) => log.compact(topic, before).await,
544 Self::File(log) => log.compact(topic, before).await,
545 Self::Sqlite(log) => log.compact(topic, before).await,
546 }
547 }
548}
549
550#[derive(Default)]
551struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
552
553impl BroadcastMap {
554 fn subscribe(
555 &self,
556 topic: &Topic,
557 capacity: usize,
558 ) -> broadcast::Receiver<(EventId, LogEvent)> {
559 self.sender(topic, capacity).subscribe()
560 }
561
562 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
563 let _ = self.sender(topic, capacity).send(record);
564 }
565
566 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
567 let mut map = self.0.lock().expect("event log broadcast map poisoned");
568 map.entry(topic.as_str().to_string())
569 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
570 .clone()
571 }
572}
573
574fn stream_from_broadcast(
575 history: Vec<(EventId, LogEvent)>,
576 from: Option<EventId>,
577 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
578 queue_depth: usize,
579) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
580 let (tx, rx) = mpsc::channel(queue_depth.max(1));
581 tokio::spawn(async move {
589 let mut last_seen = from.unwrap_or(0);
590 for (event_id, event) in history {
591 last_seen = event_id;
592 if tx.send(Ok((event_id, event))).await.is_err() {
593 return;
594 }
595 }
596
597 loop {
598 tokio::select! {
599 _ = tx.closed() => return,
600 received = live_rx.recv() => {
601 match received {
602 Ok((event_id, event)) if event_id > last_seen => {
603 last_seen = event_id;
604 if tx.send(Ok((event_id, event))).await.is_err() {
605 return;
606 }
607 }
608 Ok(_) => {}
609 Err(broadcast::error::RecvError::Closed) => return,
610 Err(broadcast::error::RecvError::Lagged(_)) => {
611 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
612 return;
613 }
614 }
615 }
616 }
617 }
618 });
619 Box::pin(ReceiverStream::new(rx))
620}
621
622#[derive(Default)]
623struct MemoryState {
624 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
625 latest: HashMap<String, EventId>,
626 consumers: HashMap<(String, String), EventId>,
627}
628
629pub struct MemoryEventLog {
630 state: tokio::sync::Mutex<MemoryState>,
631 broadcasts: BroadcastMap,
632 queue_depth: usize,
633}
634
635impl MemoryEventLog {
636 pub fn new(queue_depth: usize) -> Self {
637 Self {
638 state: tokio::sync::Mutex::new(MemoryState::default()),
639 broadcasts: BroadcastMap::default(),
640 queue_depth: queue_depth.max(1),
641 }
642 }
643
644 async fn topics(&self) -> Result<Vec<Topic>, LogError> {
645 let state = self.state.lock().await;
646 let mut topics = state
647 .topics
648 .keys()
649 .map(|topic| Topic::new(topic.clone()))
650 .collect::<Result<Vec<_>, _>>()?;
651 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
652 Ok(topics)
653 }
654}
655
656impl EventLog for MemoryEventLog {
657 fn describe(&self) -> EventLogDescription {
658 EventLogDescription {
659 backend: EventLogBackendKind::Memory,
660 location: None,
661 size_bytes: None,
662 queue_depth: self.queue_depth,
663 }
664 }
665
666 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
667 let mut state = self.state.lock().await;
668 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
669 let previous_hash = state
670 .topics
671 .get(topic.as_str())
672 .and_then(|events| events.back())
673 .map(|(previous_id, previous_event)| {
674 crate::provenance::event_record_hash_from_headers(
675 topic.as_str(),
676 *previous_id,
677 previous_event,
678 )
679 })
680 .transpose()?;
681 let event = crate::provenance::prepare_event_for_append(
682 topic.as_str(),
683 event_id,
684 previous_hash,
685 event,
686 )?;
687 state.latest.insert(topic.as_str().to_string(), event_id);
688 state
689 .topics
690 .entry(topic.as_str().to_string())
691 .or_default()
692 .push_back((event_id, event.clone()));
693 drop(state);
694 self.broadcasts
695 .publish(topic, self.queue_depth, (event_id, event));
696 Ok(event_id)
697 }
698
699 async fn flush(&self) -> Result<(), LogError> {
700 Ok(())
701 }
702
703 async fn read_range(
704 &self,
705 topic: &Topic,
706 from: Option<EventId>,
707 limit: usize,
708 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
709 let from = from.unwrap_or(0);
710 let state = self.state.lock().await;
711 let events = state
712 .topics
713 .get(topic.as_str())
714 .into_iter()
715 .flat_map(|events| events.iter())
716 .filter(|(event_id, _)| *event_id > from)
717 .take(limit)
718 .map(|(event_id, event)| (*event_id, event.clone()))
719 .collect();
720 Ok(events)
721 }
722
723 async fn subscribe(
724 self: Arc<Self>,
725 topic: &Topic,
726 from: Option<EventId>,
727 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
728 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
729 let history = self.read_range(topic, from, usize::MAX).await?;
730 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
731 }
732
733 async fn ack(
734 &self,
735 topic: &Topic,
736 consumer: &ConsumerId,
737 up_to: EventId,
738 ) -> Result<(), LogError> {
739 let mut state = self.state.lock().await;
740 state.consumers.insert(
741 (topic.as_str().to_string(), consumer.as_str().to_string()),
742 up_to,
743 );
744 Ok(())
745 }
746
747 async fn consumer_cursor(
748 &self,
749 topic: &Topic,
750 consumer: &ConsumerId,
751 ) -> Result<Option<EventId>, LogError> {
752 let state = self.state.lock().await;
753 Ok(state
754 .consumers
755 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
756 .copied())
757 }
758
759 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
760 let state = self.state.lock().await;
761 Ok(state.latest.get(topic.as_str()).copied())
762 }
763
764 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
765 let mut state = self.state.lock().await;
766 let Some(events) = state.topics.get_mut(topic.as_str()) else {
767 return Ok(CompactReport::default());
768 };
769 let removed = events
770 .iter()
771 .take_while(|(event_id, _)| *event_id <= before)
772 .count();
773 for _ in 0..removed {
774 events.pop_front();
775 }
776 Ok(CompactReport {
777 removed,
778 remaining: events.len(),
779 latest: state.latest.get(topic.as_str()).copied(),
780 checkpointed: false,
781 })
782 }
783}
784
785#[derive(Serialize, Deserialize)]
786struct FileRecord {
787 id: EventId,
788 event: LogEvent,
789}
790
791pub struct FileEventLog {
792 root: PathBuf,
793 latest_ids: Mutex<HashMap<String, EventId>>,
794 write_lock: Mutex<()>,
795 broadcasts: BroadcastMap,
796 queue_depth: usize,
797}
798
799impl FileEventLog {
800 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
801 std::fs::create_dir_all(root.join("topics"))
802 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
803 std::fs::create_dir_all(root.join("consumers"))
804 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
805 Ok(Self {
806 root,
807 latest_ids: Mutex::new(HashMap::new()),
808 write_lock: Mutex::new(()),
809 broadcasts: BroadcastMap::default(),
810 queue_depth: queue_depth.max(1),
811 })
812 }
813
814 fn topic_path(&self, topic: &Topic) -> PathBuf {
815 self.root
816 .join("topics")
817 .join(format!("{}.jsonl", topic.as_str()))
818 }
819
820 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
821 self.root.join("consumers").join(format!(
822 "{}__{}.json",
823 topic.as_str(),
824 sanitize_filename(consumer.as_str())
825 ))
826 }
827
828 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
829 if let Some(event_id) = self
830 .latest_ids
831 .lock()
832 .expect("file event log latest ids poisoned")
833 .get(topic.as_str())
834 .copied()
835 {
836 return Ok(event_id);
837 }
838
839 let mut latest = 0;
840 let path = self.topic_path(topic);
841 if path.is_file() {
842 for record in read_file_records(&path)? {
843 latest = record.id;
844 }
845 }
846 self.latest_ids
847 .lock()
848 .expect("file event log latest ids poisoned")
849 .insert(topic.as_str().to_string(), latest);
850 Ok(latest)
851 }
852
853 fn read_range_sync(
854 &self,
855 topic: &Topic,
856 from: Option<EventId>,
857 limit: usize,
858 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
859 let path = self.topic_path(topic);
860 if !path.is_file() {
861 return Ok(Vec::new());
862 }
863 let from = from.unwrap_or(0);
864 let mut events = Vec::new();
865 for record in read_file_records(&path)? {
866 if record.id > from {
867 events.push((record.id, record.event));
868 }
869 if events.len() >= limit {
870 break;
871 }
872 }
873 Ok(events)
874 }
875
876 fn topics(&self) -> Result<Vec<Topic>, LogError> {
877 let topics_dir = self.root.join("topics");
878 if !topics_dir.is_dir() {
879 return Ok(Vec::new());
880 }
881 let mut topics = Vec::new();
882 for entry in std::fs::read_dir(&topics_dir)
883 .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
884 {
885 let entry = entry
886 .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
887 let path = entry.path();
888 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
889 continue;
890 }
891 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
892 continue;
893 };
894 topics.push(Topic::new(stem.to_string())?);
895 }
896 topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
897 Ok(topics)
898 }
899}
900
901fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
902 let file = std::fs::File::open(path)
903 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
904 let mut reader = std::io::BufReader::new(file);
905 let mut records = Vec::new();
906 let mut line = Vec::new();
907 loop {
908 line.clear();
909 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
910 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
911 if bytes_read == 0 {
912 break;
913 }
914 if line.iter().all(u8::is_ascii_whitespace) {
915 continue;
916 }
917 let complete_line = line.ends_with(b"\n");
918 match serde_json::from_slice::<FileRecord>(&line) {
919 Ok(record) => records.push(record),
920 Err(_) if !complete_line => break,
921 Err(error) => {
922 return Err(LogError::Serde(format!("event log parse error: {error}")));
923 }
924 }
925 }
926 Ok(records)
927}
928
929impl EventLog for FileEventLog {
930 fn describe(&self) -> EventLogDescription {
931 EventLogDescription {
932 backend: EventLogBackendKind::File,
933 location: Some(self.root.clone()),
934 size_bytes: Some(dir_size_bytes(&self.root)),
935 queue_depth: self.queue_depth,
936 }
937 }
938
939 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
940 let _guard = self
941 .write_lock
942 .lock()
943 .expect("file event log write lock poisoned");
944 let next_id = self.latest_id_for_topic(topic)? + 1;
945 let previous_hash = self
946 .read_range_sync(topic, None, usize::MAX)?
947 .last()
948 .map(|(previous_id, previous_event)| {
949 crate::provenance::event_record_hash_from_headers(
950 topic.as_str(),
951 *previous_id,
952 previous_event,
953 )
954 })
955 .transpose()?;
956 let event = crate::provenance::prepare_event_for_append(
957 topic.as_str(),
958 next_id,
959 previous_hash,
960 event,
961 )?;
962 let record = FileRecord {
963 id: next_id,
964 event: event.clone(),
965 };
966 let path = self.topic_path(topic);
967 if let Some(parent) = path.parent() {
968 std::fs::create_dir_all(parent)
969 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
970 }
971 let line = serde_json::to_string(&record)
972 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
973 use std::io::Write as _;
974 let mut file = std::fs::OpenOptions::new()
975 .create(true)
976 .append(true)
977 .open(&path)
978 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
979 writeln!(file, "{line}")
980 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
981 self.latest_ids
982 .lock()
983 .expect("file event log latest ids poisoned")
984 .insert(topic.as_str().to_string(), next_id);
985 self.broadcasts
986 .publish(topic, self.queue_depth, (next_id, event));
987 Ok(next_id)
988 }
989
990 async fn flush(&self) -> Result<(), LogError> {
991 sync_tree(&self.root)
992 }
993
994 async fn read_range(
995 &self,
996 topic: &Topic,
997 from: Option<EventId>,
998 limit: usize,
999 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1000 self.read_range_sync(topic, from, limit)
1001 }
1002
1003 async fn read_range_bytes(
1004 &self,
1005 topic: &Topic,
1006 from: Option<EventId>,
1007 limit: usize,
1008 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1009 self.read_range_sync(topic, from, limit)?
1010 .into_iter()
1011 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1012 .collect()
1013 }
1014
1015 async fn subscribe(
1016 self: Arc<Self>,
1017 topic: &Topic,
1018 from: Option<EventId>,
1019 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1020 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1021 let history = self.read_range_sync(topic, from, usize::MAX)?;
1022 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1023 }
1024
1025 async fn ack(
1026 &self,
1027 topic: &Topic,
1028 consumer: &ConsumerId,
1029 up_to: EventId,
1030 ) -> Result<(), LogError> {
1031 let path = self.consumer_path(topic, consumer);
1032 let payload = serde_json::json!({
1033 "topic": topic.as_str(),
1034 "consumer_id": consumer.as_str(),
1035 "cursor": up_to,
1036 "updated_at_ms": now_ms(),
1037 });
1038 write_json_atomically(&path, &payload)
1039 }
1040
1041 async fn consumer_cursor(
1042 &self,
1043 topic: &Topic,
1044 consumer: &ConsumerId,
1045 ) -> Result<Option<EventId>, LogError> {
1046 let path = self.consumer_path(topic, consumer);
1047 if !path.is_file() {
1048 return Ok(None);
1049 }
1050 let raw = std::fs::read_to_string(&path)
1051 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1052 let payload: serde_json::Value = serde_json::from_str(&raw)
1053 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1054 let cursor = payload
1055 .get("cursor")
1056 .and_then(serde_json::Value::as_u64)
1057 .ok_or_else(|| {
1058 LogError::Serde("event log consumer record missing numeric cursor".to_string())
1059 })?;
1060 Ok(Some(cursor))
1061 }
1062
1063 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1064 let latest = self.latest_id_for_topic(topic)?;
1065 if latest == 0 {
1066 Ok(None)
1067 } else {
1068 Ok(Some(latest))
1069 }
1070 }
1071
1072 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1073 let _guard = self
1074 .write_lock
1075 .lock()
1076 .expect("file event log write lock poisoned");
1077 let path = self.topic_path(topic);
1078 if !path.is_file() {
1079 return Ok(CompactReport::default());
1080 }
1081 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1082 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1083 let tmp = path.with_extension("jsonl.tmp");
1084 if retained.is_empty() {
1085 let _ = std::fs::remove_file(&path);
1086 } else {
1087 let mut writer =
1088 std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
1089 LogError::Io(format!("event log tmp create error: {error}"))
1090 })?);
1091 use std::io::Write as _;
1092 for (event_id, event) in &retained {
1093 let line = serde_json::to_string(&FileRecord {
1094 id: *event_id,
1095 event: event.clone(),
1096 })
1097 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1098 writeln!(writer, "{line}")
1099 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1100 }
1101 writer
1102 .flush()
1103 .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
1104 std::fs::rename(&tmp, &path).map_err(|error| {
1105 LogError::Io(format!("event log compact finalize error: {error}"))
1106 })?;
1107 }
1108 let latest = retained.last().map(|(event_id, _)| *event_id);
1109 self.latest_ids
1110 .lock()
1111 .expect("file event log latest ids poisoned")
1112 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1113 Ok(CompactReport {
1114 removed,
1115 remaining: retained.len(),
1116 latest,
1117 checkpointed: false,
1118 })
1119 }
1120}
1121
1122pub struct SqliteEventLog {
1123 path: PathBuf,
1124 connection: Mutex<Connection>,
1125 broadcasts: BroadcastMap,
1126 queue_depth: usize,
1127}
1128
1129impl SqliteEventLog {
1130 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1131 if let Some(parent) = path.parent() {
1132 std::fs::create_dir_all(parent)
1133 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1134 }
1135 let connection = Connection::open(&path)
1136 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1137 connection
1144 .busy_timeout(std::time::Duration::from_secs(5))
1145 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1146 connection
1147 .pragma_update(None, "journal_mode", "WAL")
1148 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1149 connection
1150 .pragma_update(None, "synchronous", "NORMAL")
1151 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1152 connection
1153 .execute_batch(
1154 "CREATE TABLE IF NOT EXISTS topic_heads (
1155 topic TEXT PRIMARY KEY,
1156 last_id INTEGER NOT NULL
1157 );
1158 CREATE TABLE IF NOT EXISTS events (
1159 topic TEXT NOT NULL,
1160 event_id INTEGER NOT NULL,
1161 kind TEXT NOT NULL,
1162 payload BLOB NOT NULL,
1163 headers TEXT NOT NULL,
1164 occurred_at_ms INTEGER NOT NULL,
1165 PRIMARY KEY (topic, event_id)
1166 );
1167 CREATE TABLE IF NOT EXISTS consumers (
1168 topic TEXT NOT NULL,
1169 consumer_id TEXT NOT NULL,
1170 cursor INTEGER NOT NULL,
1171 updated_at_ms INTEGER NOT NULL,
1172 PRIMARY KEY (topic, consumer_id)
1173 );",
1174 )
1175 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1176 Ok(Self {
1177 path,
1178 connection: Mutex::new(connection),
1179 broadcasts: BroadcastMap::default(),
1180 queue_depth: queue_depth.max(1),
1181 })
1182 }
1183
1184 fn topics(&self) -> Result<Vec<Topic>, LogError> {
1185 let connection = self
1186 .connection
1187 .lock()
1188 .expect("sqlite event log connection poisoned");
1189 let mut statement = connection
1190 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1191 .map_err(|error| {
1192 LogError::Sqlite(format!("event log topics prepare error: {error}"))
1193 })?;
1194 let rows = statement
1195 .query_map([], |row| row.get::<_, String>(0))
1196 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1197 let mut topics = Vec::new();
1198 for row in rows {
1199 topics.push(Topic::new(row.map_err(|error| {
1200 LogError::Sqlite(format!("event log topic row error: {error}"))
1201 })?)?);
1202 }
1203 Ok(topics)
1204 }
1205}
1206
1207impl EventLog for SqliteEventLog {
1208 fn describe(&self) -> EventLogDescription {
1209 EventLogDescription {
1210 backend: EventLogBackendKind::Sqlite,
1211 location: Some(self.path.clone()),
1212 size_bytes: Some(sqlite_size_bytes(&self.path)),
1213 queue_depth: self.queue_depth,
1214 }
1215 }
1216
1217 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1218 let mut connection = self
1219 .connection
1220 .lock()
1221 .expect("sqlite event log connection poisoned");
1222 let tx = connection
1223 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1224 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1225 tx.execute(
1226 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1227 params![topic.as_str()],
1228 )
1229 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1230 tx.execute(
1231 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1232 params![topic.as_str()],
1233 )
1234 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1235 let event_id = tx
1236 .query_row(
1237 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1238 params![topic.as_str()],
1239 |row| row.get::<_, i64>(0),
1240 )
1241 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1242 .and_then(sqlite_i64_to_event_id)?;
1243 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1244 let previous = tx
1245 .query_row(
1246 "SELECT event_id, kind, payload, headers, occurred_at_ms
1247 FROM events
1248 WHERE topic = ?1 AND event_id < ?2
1249 ORDER BY event_id DESC
1250 LIMIT 1",
1251 params![topic.as_str(), event_id_sql],
1252 |row| {
1253 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1254 let headers: String = row.get(3)?;
1255 Ok((
1256 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1257 LogEvent {
1258 kind: row.get(1)?,
1259 payload: serde_json::from_slice(&payload).map_err(|error| {
1260 rusqlite::Error::FromSqlConversionFailure(
1261 payload.len(),
1262 rusqlite::types::Type::Blob,
1263 Box::new(error),
1264 )
1265 })?,
1266 headers: serde_json::from_str(&headers).map_err(|error| {
1267 rusqlite::Error::FromSqlConversionFailure(
1268 headers.len(),
1269 rusqlite::types::Type::Text,
1270 Box::new(error),
1271 )
1272 })?,
1273 occurred_at_ms: row.get(4)?,
1274 },
1275 ))
1276 },
1277 )
1278 .optional()
1279 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1280 let previous_hash = previous
1281 .as_ref()
1282 .map(|(previous_id, previous_event)| {
1283 crate::provenance::event_record_hash_from_headers(
1284 topic.as_str(),
1285 *previous_id,
1286 previous_event,
1287 )
1288 })
1289 .transpose()?;
1290 let event = crate::provenance::prepare_event_for_append(
1291 topic.as_str(),
1292 event_id,
1293 previous_hash,
1294 event,
1295 )?;
1296 tx.execute(
1297 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1298 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1299 params![
1300 topic.as_str(),
1301 event_id_sql,
1302 event.kind,
1303 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1304 "event log payload encode error: {error}"
1305 )))?,
1306 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1307 "event log headers encode error: {error}"
1308 )))?,
1309 event.occurred_at_ms
1310 ],
1311 )
1312 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1313 tx.commit()
1314 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1315 self.broadcasts
1316 .publish(topic, self.queue_depth, (event_id, event.clone()));
1317 Ok(event_id)
1318 }
1319
1320 async fn flush(&self) -> Result<(), LogError> {
1321 let connection = self
1322 .connection
1323 .lock()
1324 .expect("sqlite event log connection poisoned");
1325 connection
1326 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1327 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1328 Ok(())
1329 }
1330
1331 async fn read_range(
1332 &self,
1333 topic: &Topic,
1334 from: Option<EventId>,
1335 limit: usize,
1336 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1337 let connection = self
1338 .connection
1339 .lock()
1340 .expect("sqlite event log connection poisoned");
1341 let mut statement = connection
1342 .prepare(
1343 "SELECT event_id, kind, payload, headers, occurred_at_ms
1344 FROM events
1345 WHERE topic = ?1 AND event_id > ?2
1346 ORDER BY event_id ASC
1347 LIMIT ?3",
1348 )
1349 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1350 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1351 let rows = statement
1352 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1353 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1354 let headers: String = row.get(3)?;
1355 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1356 Ok((
1357 event_id,
1358 LogEvent {
1359 kind: row.get(1)?,
1360 payload: serde_json::from_slice(&payload).map_err(|error| {
1361 rusqlite::Error::FromSqlConversionFailure(
1362 payload.len(),
1363 rusqlite::types::Type::Blob,
1364 Box::new(error),
1365 )
1366 })?,
1367 headers: serde_json::from_str(&headers).map_err(|error| {
1368 rusqlite::Error::FromSqlConversionFailure(
1369 headers.len(),
1370 rusqlite::types::Type::Text,
1371 Box::new(error),
1372 )
1373 })?,
1374 occurred_at_ms: row.get(4)?,
1375 },
1376 ))
1377 })
1378 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1379 let mut events = Vec::new();
1380 for row in rows {
1381 events.push(
1382 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1383 );
1384 }
1385 Ok(events)
1386 }
1387
1388 async fn read_range_bytes(
1389 &self,
1390 topic: &Topic,
1391 from: Option<EventId>,
1392 limit: usize,
1393 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1394 let connection = self
1395 .connection
1396 .lock()
1397 .expect("sqlite event log connection poisoned");
1398 let mut statement = connection
1399 .prepare(
1400 "SELECT event_id, kind, payload, headers, occurred_at_ms
1401 FROM events
1402 WHERE topic = ?1 AND event_id > ?2
1403 ORDER BY event_id ASC
1404 LIMIT ?3",
1405 )
1406 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1407 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1408 let rows = statement
1409 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1410 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1411 let headers: String = row.get(3)?;
1412 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1413 Ok((
1414 event_id,
1415 LogEventBytes {
1416 kind: row.get(1)?,
1417 payload: Bytes::from(payload),
1418 headers: serde_json::from_str(&headers).map_err(|error| {
1419 rusqlite::Error::FromSqlConversionFailure(
1420 headers.len(),
1421 rusqlite::types::Type::Text,
1422 Box::new(error),
1423 )
1424 })?,
1425 occurred_at_ms: row.get(4)?,
1426 },
1427 ))
1428 })
1429 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1430 let mut events = Vec::new();
1431 for row in rows {
1432 events.push(
1433 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1434 );
1435 }
1436 Ok(events)
1437 }
1438
1439 async fn subscribe(
1440 self: Arc<Self>,
1441 topic: &Topic,
1442 from: Option<EventId>,
1443 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1444 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1445 let history = self.read_range(topic, from, usize::MAX).await?;
1446 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1447 }
1448
1449 async fn ack(
1450 &self,
1451 topic: &Topic,
1452 consumer: &ConsumerId,
1453 up_to: EventId,
1454 ) -> Result<(), LogError> {
1455 let connection = self
1456 .connection
1457 .lock()
1458 .expect("sqlite event log connection poisoned");
1459 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1460 connection
1461 .execute(
1462 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1463 VALUES (?1, ?2, ?3, ?4)
1464 ON CONFLICT(topic, consumer_id)
1465 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1466 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1467 )
1468 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1469 Ok(())
1470 }
1471
1472 async fn consumer_cursor(
1473 &self,
1474 topic: &Topic,
1475 consumer: &ConsumerId,
1476 ) -> Result<Option<EventId>, LogError> {
1477 let connection = self
1478 .connection
1479 .lock()
1480 .expect("sqlite event log connection poisoned");
1481 connection
1482 .query_row(
1483 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1484 params![topic.as_str(), consumer.as_str()],
1485 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1486 )
1487 .optional()
1488 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1489 }
1490
1491 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1492 let connection = self
1493 .connection
1494 .lock()
1495 .expect("sqlite event log connection poisoned");
1496 connection
1497 .query_row(
1498 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1499 params![topic.as_str()],
1500 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1501 )
1502 .optional()
1503 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1504 }
1505
1506 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1507 let connection = self
1508 .connection
1509 .lock()
1510 .expect("sqlite event log connection poisoned");
1511 let before_sql = event_id_to_sqlite_i64(before)?;
1512 let removed = connection
1513 .execute(
1514 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1515 params![topic.as_str(), before_sql],
1516 )
1517 .map_err(|error| {
1518 LogError::Sqlite(format!("event log compact delete error: {error}"))
1519 })?;
1520 let remaining = connection
1521 .query_row(
1522 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1523 params![topic.as_str()],
1524 |row| row.get::<_, i64>(0),
1525 )
1526 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1527 .and_then(sqlite_i64_to_usize)?;
1528 let latest = connection
1529 .query_row(
1530 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1531 params![topic.as_str()],
1532 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1533 )
1534 .optional()
1535 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1536 connection
1537 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1538 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1539 Ok(CompactReport {
1540 removed,
1541 remaining,
1542 latest,
1543 checkpointed: true,
1544 })
1545 }
1546}
1547
1548fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1549 let candidate = PathBuf::from(value);
1550 if candidate.is_absolute() {
1551 candidate
1552 } else {
1553 base_dir.join(candidate)
1554 }
1555}
1556
1557fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1558 if let Some(parent) = path.parent() {
1559 std::fs::create_dir_all(parent)
1560 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1561 }
1562 let tmp = path.with_extension("tmp");
1563 let encoded = serde_json::to_vec_pretty(payload)
1564 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1565 std::fs::write(&tmp, encoded)
1566 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1567 std::fs::rename(&tmp, path)
1568 .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1569 Ok(())
1570}
1571
1572fn sanitize_filename(value: &str) -> String {
1573 sanitize_topic_component(value)
1574}
1575
1576pub fn sanitize_topic_component(value: &str) -> String {
1577 value
1578 .chars()
1579 .map(|ch| {
1580 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1581 ch
1582 } else {
1583 '_'
1584 }
1585 })
1586 .collect()
1587}
1588
1589fn dir_size_bytes(path: &Path) -> u64 {
1590 if !path.exists() {
1591 return 0;
1592 }
1593 let mut total = 0;
1594 if let Ok(entries) = std::fs::read_dir(path) {
1595 for entry in entries.flatten() {
1596 let path = entry.path();
1597 if path.is_dir() {
1598 total += dir_size_bytes(&path);
1599 } else if let Ok(metadata) = entry.metadata() {
1600 total += metadata.len();
1601 }
1602 }
1603 }
1604 total
1605}
1606
1607fn sqlite_size_bytes(path: &Path) -> u64 {
1608 let mut total = file_size(path);
1609 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1610 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1611 total
1612}
1613
1614fn file_size(path: &Path) -> u64 {
1615 std::fs::metadata(path)
1616 .map(|metadata| metadata.len())
1617 .unwrap_or(0)
1618}
1619
1620fn sync_tree(root: &Path) -> Result<(), LogError> {
1621 if !root.exists() {
1622 return Ok(());
1623 }
1624 for entry in std::fs::read_dir(root)
1625 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1626 {
1627 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1628 let path = entry.path();
1629 if path.is_dir() {
1630 sync_tree(&path)?;
1631 continue;
1632 }
1633 std::fs::File::open(&path)
1634 .and_then(|file| file.sync_all())
1635 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1636 }
1637 Ok(())
1638}
1639
1640fn now_ms() -> i64 {
1641 std::time::SystemTime::now()
1642 .duration_since(std::time::UNIX_EPOCH)
1643 .map(|duration| duration.as_millis() as i64)
1644 .unwrap_or(0)
1645}
1646
1647fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1648 i64::try_from(event_id)
1649 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1650}
1651
1652fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1653 u64::try_from(value)
1654 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1655}
1656
1657fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1658 u64::try_from(value).map_err(|_| {
1659 rusqlite::Error::FromSqlConversionFailure(
1660 std::mem::size_of::<i64>(),
1661 rusqlite::types::Type::Integer,
1662 "sqlite event id is negative".into(),
1663 )
1664 })
1665}
1666
1667fn sqlite_json_bytes_for_row(
1668 row: &rusqlite::Row<'_>,
1669 index: usize,
1670 name: &str,
1671) -> rusqlite::Result<Vec<u8>> {
1672 let value = row.get_ref(index)?;
1673 match value {
1674 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1675 Ok(bytes.to_vec())
1676 }
1677 other => Err(rusqlite::Error::InvalidColumnType(
1678 index,
1679 name.to_string(),
1680 other.data_type(),
1681 )),
1682 }
1683}
1684
1685fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1686 usize::try_from(value)
1687 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692 use super::*;
1693 use futures::StreamExt;
1694 use rand::{rngs::StdRng, RngExt, SeedableRng};
1695
1696 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1697 let topic = Topic::new("trigger.inbox").unwrap();
1698 for i in 0..10_000 {
1699 log.append(
1700 &topic,
1701 LogEvent::new("append", serde_json::json!({ "i": i })),
1702 )
1703 .await
1704 .unwrap();
1705 }
1706 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1707 assert_eq!(events.len(), 10_000);
1708 assert_eq!(events.first().unwrap().0, 1);
1709 assert_eq!(events.last().unwrap().0, 10_000);
1710 }
1711
1712 #[tokio::test(flavor = "current_thread")]
1713 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1714 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1715 exercise_basic_backend(log.clone()).await;
1716
1717 let topic = Topic::new("agent.transcript.demo").unwrap();
1718 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1719 let first = log
1720 .append(
1721 &topic,
1722 LogEvent::new("message", serde_json::json!({"text":"one"})),
1723 )
1724 .await
1725 .unwrap();
1726 let second = log
1727 .append(
1728 &topic,
1729 LogEvent::new("message", serde_json::json!({"text":"two"})),
1730 )
1731 .await
1732 .unwrap();
1733 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1734 assert_eq!(seen[0].as_ref().unwrap().0, first);
1735 assert_eq!(seen[1].as_ref().unwrap().0, second);
1736
1737 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1738 .await
1739 .unwrap();
1740 let compact = log.compact(&topic, first).await.unwrap();
1741 assert_eq!(compact.removed, 1);
1742 assert_eq!(compact.remaining, 1);
1743 }
1744
1745 #[tokio::test(flavor = "current_thread")]
1746 async fn file_backend_persists_across_reopen_and_compacts() {
1747 let dir = tempfile::tempdir().unwrap();
1748 let topic = Topic::new("trigger.outbox").unwrap();
1749 let first_log = Arc::new(AnyEventLog::File(
1750 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1751 ));
1752 first_log
1753 .append(
1754 &topic,
1755 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1756 )
1757 .await
1758 .unwrap();
1759 first_log
1760 .append(
1761 &topic,
1762 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1763 )
1764 .await
1765 .unwrap();
1766 drop(first_log);
1767
1768 let reopened = Arc::new(AnyEventLog::File(
1769 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1770 ));
1771 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1772 assert_eq!(events.len(), 2);
1773 let compact = reopened.compact(&topic, 1).await.unwrap();
1774 assert_eq!(compact.removed, 1);
1775 assert_eq!(
1776 reopened
1777 .read_range(&topic, None, usize::MAX)
1778 .await
1779 .unwrap()
1780 .len(),
1781 1
1782 );
1783 }
1784
1785 #[tokio::test(flavor = "current_thread")]
1786 async fn file_backend_skips_torn_tail_on_restart() {
1787 let dir = tempfile::tempdir().unwrap();
1788 let topic = Topic::new("trigger.inbox").unwrap();
1789 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1790 first_log
1791 .append(
1792 &topic,
1793 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1794 )
1795 .await
1796 .unwrap();
1797 drop(first_log);
1798
1799 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1800 use std::io::Write as _;
1801 let mut file = std::fs::OpenOptions::new()
1802 .append(true)
1803 .open(&topic_path)
1804 .unwrap();
1805 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1806 drop(file);
1807
1808 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1809 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1810 assert_eq!(events.len(), 1);
1811 assert_eq!(events[0].0, 1);
1812 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1813 }
1814
1815 #[tokio::test(flavor = "current_thread")]
1816 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1817 let dir = tempfile::tempdir().unwrap();
1818 let path = dir.path().join("events.sqlite");
1819 let topic = Topic::new("daemon.demo.state").unwrap();
1820 let first_log = Arc::new(AnyEventLog::Sqlite(
1821 SqliteEventLog::open(path.clone(), 8).unwrap(),
1822 ));
1823 first_log
1824 .append(
1825 &topic,
1826 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1827 )
1828 .await
1829 .unwrap();
1830 first_log
1831 .append(
1832 &topic,
1833 LogEvent::new("state", serde_json::json!({"state":"active"})),
1834 )
1835 .await
1836 .unwrap();
1837 drop(first_log);
1838
1839 let reopened = Arc::new(AnyEventLog::Sqlite(
1840 SqliteEventLog::open(path.clone(), 8).unwrap(),
1841 ));
1842 assert_eq!(
1843 reopened
1844 .read_range(&topic, None, usize::MAX)
1845 .await
1846 .unwrap()
1847 .len(),
1848 2
1849 );
1850 let compact = reopened.compact(&topic, 1).await.unwrap();
1851 assert!(compact.checkpointed);
1852 let wal = PathBuf::from(format!("{}-wal", path.display()));
1853 assert!(file_size(&wal) == 0 || !wal.exists());
1854 }
1855
1856 #[tokio::test(flavor = "current_thread")]
1857 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1858 let dir = tempfile::tempdir().unwrap();
1859 let path = dir.path().join("events.sqlite");
1860 let topic = Topic::new("observability.action_graph").unwrap();
1861 let log = SqliteEventLog::open(path, 8).unwrap();
1862 let event_id = log
1863 .append(
1864 &topic,
1865 LogEvent::new(
1866 "snapshot",
1867 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1868 ),
1869 )
1870 .await
1871 .unwrap();
1872
1873 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1874 assert_eq!(events.len(), 1);
1875 assert_eq!(events[0].0, event_id);
1876 assert_eq!(
1877 events[0].1.payload_json().unwrap(),
1878 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1879 );
1880 }
1881
1882 #[tokio::test(flavor = "current_thread")]
1883 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1884 let dir = tempfile::tempdir().unwrap();
1885 let path = dir.path().join("events.sqlite");
1886 let topic = Topic::new("agent.transcript.legacy").unwrap();
1887 let log = SqliteEventLog::open(path, 8).unwrap();
1888 {
1889 let connection = log.connection.lock().unwrap();
1890 connection
1891 .execute(
1892 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1893 params![topic.as_str()],
1894 )
1895 .unwrap();
1896 connection
1897 .execute(
1898 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1899 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1900 params![topic.as_str(), "{\"text\":\"old\"}"],
1901 )
1902 .unwrap();
1903 }
1904
1905 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1906 assert_eq!(
1907 events[0].1.payload_json().unwrap(),
1908 serde_json::json!({"text": "old"})
1909 );
1910 assert_eq!(
1911 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1912 "legacy"
1913 );
1914 }
1915
1916 #[tokio::test(flavor = "current_thread")]
1917 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1918 let (sender, rx) = broadcast::channel(2);
1919 for i in 0..10 {
1920 sender
1921 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1922 .unwrap();
1923 }
1924 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1925
1926 match stream.next().await {
1927 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1928 other => panic!("subscriber should surface lag, got {other:?}"),
1929 }
1930 }
1931
1932 #[tokio::test(flavor = "current_thread")]
1933 async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
1934 let (sender, rx) = broadcast::channel(2);
1935 let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1936 assert_eq!(sender.receiver_count(), 1);
1937 drop(stream);
1938
1939 tokio::time::timeout(std::time::Duration::from_millis(100), async {
1940 while sender.receiver_count() != 0 {
1941 tokio::task::yield_now().await;
1942 }
1943 })
1944 .await
1945 .expect("subscription receiver should close after consumer drop");
1946 }
1947
1948 #[tokio::test(flavor = "current_thread")]
1949 async fn randomized_reader_sequences_stay_monotonic() {
1950 let log = Arc::new(MemoryEventLog::new(32));
1951 let topic = Topic::new("fuzz.demo").unwrap();
1952 let mut readers = vec![
1953 log.clone().subscribe(&topic, None).await.unwrap(),
1954 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1955 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1956 ];
1957 let mut rng = StdRng::seed_from_u64(7);
1958 for _ in 0..64 {
1959 let value = rng.random_range(0..1000);
1960 log.append(
1961 &topic,
1962 LogEvent::new("rand", serde_json::json!({"value": value})),
1963 )
1964 .await
1965 .unwrap();
1966 }
1967
1968 let mut sequences = Vec::new();
1969 for reader in &mut readers {
1970 let mut ids = Vec::new();
1971 while let Some(item) = reader.next().await {
1972 match item {
1973 Ok((event_id, _)) => {
1974 ids.push(event_id);
1975 if ids.len() >= 16 {
1976 break;
1977 }
1978 }
1979 Err(LogError::ConsumerLagged(_)) => break,
1980 Err(error) => panic!("unexpected subscription error: {error}"),
1981 }
1982 }
1983 sequences.push(ids);
1984 }
1985
1986 for ids in sequences {
1987 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1988 }
1989 }
1990}