1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27 let value = value.into();
28 if value.is_empty() {
29 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30 }
31 if !value
32 .chars()
33 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34 {
35 return Err(LogError::InvalidTopic(format!(
36 "topic '{value}' contains unsupported characters"
37 )));
38 }
39 Ok(Self(value))
40 }
41
42 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl fmt::Display for Topic {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 self.0.fmt(f)
50 }
51}
52
53impl FromStr for Topic {
54 type Err = LogError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 Self::new(s)
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66 let value = value.into();
67 if value.trim().is_empty() {
68 return Err(LogError::InvalidConsumer(
69 "consumer id cannot be empty".to_string(),
70 ));
71 }
72 Ok(Self(value))
73 }
74
75 pub fn as_str(&self) -> &str {
76 &self.0
77 }
78}
79
80impl fmt::Display for ConsumerId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 self.0.fmt(f)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89 Memory,
90 File,
91 Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Self::Memory => write!(f, "memory"),
98 Self::File => write!(f, "file"),
99 Self::Sqlite => write!(f, "sqlite"),
100 }
101 }
102}
103
104impl FromStr for EventLogBackendKind {
105 type Err = LogError;
106
107 fn from_str(value: &str) -> Result<Self, Self::Err> {
108 match value.trim().to_ascii_lowercase().as_str() {
109 "memory" => Ok(Self::Memory),
110 "file" => Ok(Self::File),
111 "sqlite" => Ok(Self::Sqlite),
112 other => Err(LogError::Config(format!(
113 "unsupported event log backend '{other}'"
114 ))),
115 }
116 }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121 pub kind: String,
122 pub payload: serde_json::Value,
123 #[serde(default)]
124 pub headers: BTreeMap<String, String>,
125 pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130 Self {
131 kind: kind.into(),
132 payload,
133 headers: BTreeMap::new(),
134 occurred_at_ms: now_ms(),
135 }
136 }
137
138 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139 self.headers = headers;
140 self
141 }
142}
143
144#[derive(Clone, Debug, PartialEq, Eq)]
151pub struct LogEventBytes {
152 pub kind: String,
153 pub payload: Bytes,
154 pub headers: BTreeMap<String, String>,
155 pub occurred_at_ms: i64,
156}
157
158impl LogEventBytes {
159 pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
160 serde_json::from_slice(&self.payload)
161 .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
162 }
163
164 pub fn into_log_event(self) -> Result<LogEvent, LogError> {
165 Ok(LogEvent {
166 kind: self.kind,
167 payload: serde_json::from_slice(&self.payload).map_err(|error| {
168 LogError::Serde(format!("event log payload parse error: {error}"))
169 })?,
170 headers: self.headers,
171 occurred_at_ms: self.occurred_at_ms,
172 })
173 }
174}
175
176impl TryFrom<LogEvent> for LogEventBytes {
177 type Error = LogError;
178
179 fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
180 let payload = serde_json::to_vec(&event.payload)
181 .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
182 Ok(Self {
183 kind: event.kind,
184 payload: Bytes::from(payload),
185 headers: event.headers,
186 occurred_at_ms: event.occurred_at_ms,
187 })
188 }
189}
190
191#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
192pub struct CompactReport {
193 pub removed: usize,
194 pub remaining: usize,
195 pub latest: Option<EventId>,
196 pub checkpointed: bool,
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EventLogDescription {
201 pub backend: EventLogBackendKind,
202 pub location: Option<PathBuf>,
203 pub size_bytes: Option<u64>,
204 pub queue_depth: usize,
205}
206
207#[derive(Debug)]
208pub enum LogError {
209 Config(String),
210 InvalidTopic(String),
211 InvalidConsumer(String),
212 Io(String),
213 Serde(String),
214 Sqlite(String),
215 ConsumerLagged(EventId),
216}
217
218impl fmt::Display for LogError {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 match self {
221 Self::Config(message)
222 | Self::InvalidTopic(message)
223 | Self::InvalidConsumer(message)
224 | Self::Io(message)
225 | Self::Serde(message)
226 | Self::Sqlite(message) => message.fmt(f),
227 Self::ConsumerLagged(last_id) => {
228 write!(f, "subscriber lagged behind after event {last_id}")
229 }
230 }
231 }
232}
233
234impl std::error::Error for LogError {}
235
236#[allow(async_fn_in_trait)]
237pub trait EventLog: Send + Sync {
238 fn describe(&self) -> EventLogDescription;
239
240 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
241
242 async fn flush(&self) -> Result<(), LogError>;
243
244 async fn read_range(
247 &self,
248 topic: &Topic,
249 from: Option<EventId>,
250 limit: usize,
251 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
252
253 async fn read_range_bytes(
254 &self,
255 topic: &Topic,
256 from: Option<EventId>,
257 limit: usize,
258 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
259 let events = self.read_range(topic, from, limit).await?;
260 events
261 .into_iter()
262 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
263 .collect()
264 }
265
266 async fn subscribe(
269 self: Arc<Self>,
270 topic: &Topic,
271 from: Option<EventId>,
272 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
273
274 async fn ack(
275 &self,
276 topic: &Topic,
277 consumer: &ConsumerId,
278 up_to: EventId,
279 ) -> Result<(), LogError>;
280
281 async fn consumer_cursor(
282 &self,
283 topic: &Topic,
284 consumer: &ConsumerId,
285 ) -> Result<Option<EventId>, LogError>;
286
287 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
288
289 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
290}
291
292#[derive(Clone, Debug)]
293pub struct EventLogConfig {
294 pub backend: EventLogBackendKind,
295 pub file_dir: PathBuf,
296 pub sqlite_path: PathBuf,
297 pub queue_depth: usize,
298}
299
300impl EventLogConfig {
301 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
302 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
303 .ok()
304 .map(|value| value.parse())
305 .transpose()?
306 .unwrap_or(EventLogBackendKind::Sqlite);
307 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
308 .ok()
309 .and_then(|value| value.parse::<usize>().ok())
310 .unwrap_or(128)
311 .max(1);
312
313 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
314 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
315 _ => crate::runtime_paths::event_log_dir(base_dir),
316 };
317 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
318 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
319 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
320 };
321
322 Ok(Self {
323 backend,
324 file_dir,
325 sqlite_path,
326 queue_depth,
327 })
328 }
329
330 pub fn location(&self) -> Option<PathBuf> {
331 match self.backend {
332 EventLogBackendKind::Memory => None,
333 EventLogBackendKind::File => Some(self.file_dir.clone()),
334 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
335 }
336 }
337}
338
339thread_local! {
340 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
341}
342
343pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
344 let config = EventLogConfig::for_base_dir(base_dir)?;
345 let log = open_event_log(&config)?;
346 ACTIVE_EVENT_LOG.with(|slot| {
347 *slot.borrow_mut() = Some(log.clone());
348 });
349 Ok(log)
350}
351
352pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
353 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
354 ACTIVE_EVENT_LOG.with(|slot| {
355 *slot.borrow_mut() = Some(log.clone());
356 });
357 log
358}
359
360pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
361 ACTIVE_EVENT_LOG.with(|slot| {
362 *slot.borrow_mut() = Some(log.clone());
363 });
364 log
365}
366
367pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
368 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
369}
370
371pub fn reset_active_event_log() {
372 ACTIVE_EVENT_LOG.with(|slot| {
373 *slot.borrow_mut() = None;
374 });
375}
376
377pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
378 let config = EventLogConfig::for_base_dir(base_dir)?;
379 let description = match config.backend {
380 EventLogBackendKind::Memory => EventLogDescription {
381 backend: EventLogBackendKind::Memory,
382 location: None,
383 size_bytes: None,
384 queue_depth: config.queue_depth,
385 },
386 EventLogBackendKind::File => EventLogDescription {
387 backend: EventLogBackendKind::File,
388 size_bytes: Some(dir_size_bytes(&config.file_dir)),
389 location: Some(config.file_dir),
390 queue_depth: config.queue_depth,
391 },
392 EventLogBackendKind::Sqlite => EventLogDescription {
393 backend: EventLogBackendKind::Sqlite,
394 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
395 location: Some(config.sqlite_path),
396 queue_depth: config.queue_depth,
397 },
398 };
399 Ok(description)
400}
401
402pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
403 match config.backend {
404 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
405 config.queue_depth,
406 )))),
407 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
408 config.file_dir.clone(),
409 config.queue_depth,
410 )?))),
411 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
412 config.sqlite_path.clone(),
413 config.queue_depth,
414 )?))),
415 }
416}
417
418pub enum AnyEventLog {
419 Memory(MemoryEventLog),
420 File(FileEventLog),
421 Sqlite(SqliteEventLog),
422}
423
424impl EventLog for AnyEventLog {
425 fn describe(&self) -> EventLogDescription {
426 match self {
427 Self::Memory(log) => log.describe(),
428 Self::File(log) => log.describe(),
429 Self::Sqlite(log) => log.describe(),
430 }
431 }
432
433 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
434 match self {
435 Self::Memory(log) => log.append(topic, event).await,
436 Self::File(log) => log.append(topic, event).await,
437 Self::Sqlite(log) => log.append(topic, event).await,
438 }
439 }
440
441 async fn flush(&self) -> Result<(), LogError> {
442 match self {
443 Self::Memory(log) => log.flush().await,
444 Self::File(log) => log.flush().await,
445 Self::Sqlite(log) => log.flush().await,
446 }
447 }
448
449 async fn read_range(
450 &self,
451 topic: &Topic,
452 from: Option<EventId>,
453 limit: usize,
454 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
455 match self {
456 Self::Memory(log) => log.read_range(topic, from, limit).await,
457 Self::File(log) => log.read_range(topic, from, limit).await,
458 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
459 }
460 }
461
462 async fn read_range_bytes(
463 &self,
464 topic: &Topic,
465 from: Option<EventId>,
466 limit: usize,
467 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
468 match self {
469 Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
470 Self::File(log) => log.read_range_bytes(topic, from, limit).await,
471 Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
472 }
473 }
474
475 async fn subscribe(
476 self: Arc<Self>,
477 topic: &Topic,
478 from: Option<EventId>,
479 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
480 let (rx, queue_depth) = match self.as_ref() {
481 Self::Memory(log) => (
482 log.broadcasts.subscribe(topic, log.queue_depth),
483 log.queue_depth,
484 ),
485 Self::File(log) => (
486 log.broadcasts.subscribe(topic, log.queue_depth),
487 log.queue_depth,
488 ),
489 Self::Sqlite(log) => (
490 log.broadcasts.subscribe(topic, log.queue_depth),
491 log.queue_depth,
492 ),
493 };
494 let history = self.read_range(topic, from, usize::MAX).await?;
495 Ok(stream_from_broadcast(history, from, rx, queue_depth))
496 }
497
498 async fn ack(
499 &self,
500 topic: &Topic,
501 consumer: &ConsumerId,
502 up_to: EventId,
503 ) -> Result<(), LogError> {
504 match self {
505 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
506 Self::File(log) => log.ack(topic, consumer, up_to).await,
507 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
508 }
509 }
510
511 async fn consumer_cursor(
512 &self,
513 topic: &Topic,
514 consumer: &ConsumerId,
515 ) -> Result<Option<EventId>, LogError> {
516 match self {
517 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
518 Self::File(log) => log.consumer_cursor(topic, consumer).await,
519 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
520 }
521 }
522
523 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
524 match self {
525 Self::Memory(log) => log.latest(topic).await,
526 Self::File(log) => log.latest(topic).await,
527 Self::Sqlite(log) => log.latest(topic).await,
528 }
529 }
530
531 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
532 match self {
533 Self::Memory(log) => log.compact(topic, before).await,
534 Self::File(log) => log.compact(topic, before).await,
535 Self::Sqlite(log) => log.compact(topic, before).await,
536 }
537 }
538}
539
540#[derive(Default)]
541struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
542
543impl BroadcastMap {
544 fn subscribe(
545 &self,
546 topic: &Topic,
547 capacity: usize,
548 ) -> broadcast::Receiver<(EventId, LogEvent)> {
549 self.sender(topic, capacity).subscribe()
550 }
551
552 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
553 let _ = self.sender(topic, capacity).send(record);
554 }
555
556 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
557 let mut map = self.0.lock().expect("event log broadcast map poisoned");
558 map.entry(topic.as_str().to_string())
559 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
560 .clone()
561 }
562}
563
564fn stream_from_broadcast(
565 history: Vec<(EventId, LogEvent)>,
566 from: Option<EventId>,
567 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
568 queue_depth: usize,
569) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
570 let (tx, rx) = mpsc::channel(queue_depth.max(1));
571 tokio::spawn(async move {
579 let mut last_seen = from.unwrap_or(0);
580 for (event_id, event) in history {
581 last_seen = event_id;
582 if tx.send(Ok((event_id, event))).await.is_err() {
583 return;
584 }
585 }
586
587 loop {
588 match live_rx.recv().await {
589 Ok((event_id, event)) if event_id > last_seen => {
590 last_seen = event_id;
591 if tx.send(Ok((event_id, event))).await.is_err() {
592 return;
593 }
594 }
595 Ok(_) => {}
596 Err(broadcast::error::RecvError::Closed) => return,
597 Err(broadcast::error::RecvError::Lagged(_)) => {
598 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
599 return;
600 }
601 }
602 }
603 });
604 Box::pin(ReceiverStream::new(rx))
605}
606
607#[derive(Default)]
608struct MemoryState {
609 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
610 latest: HashMap<String, EventId>,
611 consumers: HashMap<(String, String), EventId>,
612}
613
614pub struct MemoryEventLog {
615 state: tokio::sync::Mutex<MemoryState>,
616 broadcasts: BroadcastMap,
617 queue_depth: usize,
618}
619
620impl MemoryEventLog {
621 pub fn new(queue_depth: usize) -> Self {
622 Self {
623 state: tokio::sync::Mutex::new(MemoryState::default()),
624 broadcasts: BroadcastMap::default(),
625 queue_depth: queue_depth.max(1),
626 }
627 }
628}
629
630impl EventLog for MemoryEventLog {
631 fn describe(&self) -> EventLogDescription {
632 EventLogDescription {
633 backend: EventLogBackendKind::Memory,
634 location: None,
635 size_bytes: None,
636 queue_depth: self.queue_depth,
637 }
638 }
639
640 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
641 let mut state = self.state.lock().await;
642 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
643 state.latest.insert(topic.as_str().to_string(), event_id);
644 state
645 .topics
646 .entry(topic.as_str().to_string())
647 .or_default()
648 .push_back((event_id, event.clone()));
649 drop(state);
650 self.broadcasts
651 .publish(topic, self.queue_depth, (event_id, event));
652 Ok(event_id)
653 }
654
655 async fn flush(&self) -> Result<(), LogError> {
656 Ok(())
657 }
658
659 async fn read_range(
660 &self,
661 topic: &Topic,
662 from: Option<EventId>,
663 limit: usize,
664 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
665 let from = from.unwrap_or(0);
666 let state = self.state.lock().await;
667 let events = state
668 .topics
669 .get(topic.as_str())
670 .into_iter()
671 .flat_map(|events| events.iter())
672 .filter(|(event_id, _)| *event_id > from)
673 .take(limit)
674 .map(|(event_id, event)| (*event_id, event.clone()))
675 .collect();
676 Ok(events)
677 }
678
679 async fn subscribe(
680 self: Arc<Self>,
681 topic: &Topic,
682 from: Option<EventId>,
683 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
684 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
685 let history = self.read_range(topic, from, usize::MAX).await?;
686 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
687 }
688
689 async fn ack(
690 &self,
691 topic: &Topic,
692 consumer: &ConsumerId,
693 up_to: EventId,
694 ) -> Result<(), LogError> {
695 let mut state = self.state.lock().await;
696 state.consumers.insert(
697 (topic.as_str().to_string(), consumer.as_str().to_string()),
698 up_to,
699 );
700 Ok(())
701 }
702
703 async fn consumer_cursor(
704 &self,
705 topic: &Topic,
706 consumer: &ConsumerId,
707 ) -> Result<Option<EventId>, LogError> {
708 let state = self.state.lock().await;
709 Ok(state
710 .consumers
711 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
712 .copied())
713 }
714
715 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
716 let state = self.state.lock().await;
717 Ok(state.latest.get(topic.as_str()).copied())
718 }
719
720 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
721 let mut state = self.state.lock().await;
722 let Some(events) = state.topics.get_mut(topic.as_str()) else {
723 return Ok(CompactReport::default());
724 };
725 let removed = events
726 .iter()
727 .take_while(|(event_id, _)| *event_id <= before)
728 .count();
729 for _ in 0..removed {
730 events.pop_front();
731 }
732 Ok(CompactReport {
733 removed,
734 remaining: events.len(),
735 latest: state.latest.get(topic.as_str()).copied(),
736 checkpointed: false,
737 })
738 }
739}
740
741#[derive(Serialize, Deserialize)]
742struct FileRecord {
743 id: EventId,
744 event: LogEvent,
745}
746
747pub struct FileEventLog {
748 root: PathBuf,
749 latest_ids: Mutex<HashMap<String, EventId>>,
750 write_lock: Mutex<()>,
751 broadcasts: BroadcastMap,
752 queue_depth: usize,
753}
754
755impl FileEventLog {
756 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
757 std::fs::create_dir_all(root.join("topics"))
758 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
759 std::fs::create_dir_all(root.join("consumers"))
760 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
761 Ok(Self {
762 root,
763 latest_ids: Mutex::new(HashMap::new()),
764 write_lock: Mutex::new(()),
765 broadcasts: BroadcastMap::default(),
766 queue_depth: queue_depth.max(1),
767 })
768 }
769
770 fn topic_path(&self, topic: &Topic) -> PathBuf {
771 self.root
772 .join("topics")
773 .join(format!("{}.jsonl", topic.as_str()))
774 }
775
776 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
777 self.root.join("consumers").join(format!(
778 "{}__{}.json",
779 topic.as_str(),
780 sanitize_filename(consumer.as_str())
781 ))
782 }
783
784 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
785 if let Some(event_id) = self
786 .latest_ids
787 .lock()
788 .expect("file event log latest ids poisoned")
789 .get(topic.as_str())
790 .copied()
791 {
792 return Ok(event_id);
793 }
794
795 let mut latest = 0;
796 let path = self.topic_path(topic);
797 if path.is_file() {
798 for record in read_file_records(&path)? {
799 latest = record.id;
800 }
801 }
802 self.latest_ids
803 .lock()
804 .expect("file event log latest ids poisoned")
805 .insert(topic.as_str().to_string(), latest);
806 Ok(latest)
807 }
808
809 fn read_range_sync(
810 &self,
811 topic: &Topic,
812 from: Option<EventId>,
813 limit: usize,
814 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
815 let path = self.topic_path(topic);
816 if !path.is_file() {
817 return Ok(Vec::new());
818 }
819 let from = from.unwrap_or(0);
820 let mut events = Vec::new();
821 for record in read_file_records(&path)? {
822 if record.id > from {
823 events.push((record.id, record.event));
824 }
825 if events.len() >= limit {
826 break;
827 }
828 }
829 Ok(events)
830 }
831}
832
833fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
834 let file = std::fs::File::open(path)
835 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
836 let mut reader = std::io::BufReader::new(file);
837 let mut records = Vec::new();
838 let mut line = Vec::new();
839 loop {
840 line.clear();
841 let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
842 .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
843 if bytes_read == 0 {
844 break;
845 }
846 if line.iter().all(u8::is_ascii_whitespace) {
847 continue;
848 }
849 let complete_line = line.ends_with(b"\n");
850 match serde_json::from_slice::<FileRecord>(&line) {
851 Ok(record) => records.push(record),
852 Err(_) if !complete_line => break,
853 Err(error) => {
854 return Err(LogError::Serde(format!("event log parse error: {error}")));
855 }
856 }
857 }
858 Ok(records)
859}
860
861impl EventLog for FileEventLog {
862 fn describe(&self) -> EventLogDescription {
863 EventLogDescription {
864 backend: EventLogBackendKind::File,
865 location: Some(self.root.clone()),
866 size_bytes: Some(dir_size_bytes(&self.root)),
867 queue_depth: self.queue_depth,
868 }
869 }
870
871 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
872 let _guard = self
873 .write_lock
874 .lock()
875 .expect("file event log write lock poisoned");
876 let next_id = self.latest_id_for_topic(topic)? + 1;
877 let record = FileRecord {
878 id: next_id,
879 event: event.clone(),
880 };
881 let path = self.topic_path(topic);
882 if let Some(parent) = path.parent() {
883 std::fs::create_dir_all(parent)
884 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
885 }
886 let line = serde_json::to_string(&record)
887 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
888 use std::io::Write as _;
889 let mut file = std::fs::OpenOptions::new()
890 .create(true)
891 .append(true)
892 .open(&path)
893 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
894 writeln!(file, "{line}")
895 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
896 self.latest_ids
897 .lock()
898 .expect("file event log latest ids poisoned")
899 .insert(topic.as_str().to_string(), next_id);
900 self.broadcasts
901 .publish(topic, self.queue_depth, (next_id, event));
902 Ok(next_id)
903 }
904
905 async fn flush(&self) -> Result<(), LogError> {
906 sync_tree(&self.root)
907 }
908
909 async fn read_range(
910 &self,
911 topic: &Topic,
912 from: Option<EventId>,
913 limit: usize,
914 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
915 self.read_range_sync(topic, from, limit)
916 }
917
918 async fn read_range_bytes(
919 &self,
920 topic: &Topic,
921 from: Option<EventId>,
922 limit: usize,
923 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
924 self.read_range_sync(topic, from, limit)?
925 .into_iter()
926 .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
927 .collect()
928 }
929
930 async fn subscribe(
931 self: Arc<Self>,
932 topic: &Topic,
933 from: Option<EventId>,
934 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
935 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
936 let history = self.read_range_sync(topic, from, usize::MAX)?;
937 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
938 }
939
940 async fn ack(
941 &self,
942 topic: &Topic,
943 consumer: &ConsumerId,
944 up_to: EventId,
945 ) -> Result<(), LogError> {
946 let path = self.consumer_path(topic, consumer);
947 let payload = serde_json::json!({
948 "topic": topic.as_str(),
949 "consumer_id": consumer.as_str(),
950 "cursor": up_to,
951 "updated_at_ms": now_ms(),
952 });
953 write_json_atomically(&path, &payload)
954 }
955
956 async fn consumer_cursor(
957 &self,
958 topic: &Topic,
959 consumer: &ConsumerId,
960 ) -> Result<Option<EventId>, LogError> {
961 let path = self.consumer_path(topic, consumer);
962 if !path.is_file() {
963 return Ok(None);
964 }
965 let raw = std::fs::read_to_string(&path)
966 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
967 let payload: serde_json::Value = serde_json::from_str(&raw)
968 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
969 let cursor = payload
970 .get("cursor")
971 .and_then(serde_json::Value::as_u64)
972 .ok_or_else(|| {
973 LogError::Serde("event log consumer record missing numeric cursor".to_string())
974 })?;
975 Ok(Some(cursor))
976 }
977
978 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
979 let latest = self.latest_id_for_topic(topic)?;
980 if latest == 0 {
981 Ok(None)
982 } else {
983 Ok(Some(latest))
984 }
985 }
986
987 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
988 let _guard = self
989 .write_lock
990 .lock()
991 .expect("file event log write lock poisoned");
992 let path = self.topic_path(topic);
993 if !path.is_file() {
994 return Ok(CompactReport::default());
995 }
996 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
997 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
998 let tmp = path.with_extension("jsonl.tmp");
999 if retained.is_empty() {
1000 let _ = std::fs::remove_file(&path);
1001 } else {
1002 let mut writer =
1003 std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
1004 LogError::Io(format!("event log tmp create error: {error}"))
1005 })?);
1006 use std::io::Write as _;
1007 for (event_id, event) in &retained {
1008 let line = serde_json::to_string(&FileRecord {
1009 id: *event_id,
1010 event: event.clone(),
1011 })
1012 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1013 writeln!(writer, "{line}")
1014 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1015 }
1016 writer
1017 .flush()
1018 .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
1019 std::fs::rename(&tmp, &path).map_err(|error| {
1020 LogError::Io(format!("event log compact finalize error: {error}"))
1021 })?;
1022 }
1023 let latest = retained.last().map(|(event_id, _)| *event_id);
1024 self.latest_ids
1025 .lock()
1026 .expect("file event log latest ids poisoned")
1027 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1028 Ok(CompactReport {
1029 removed,
1030 remaining: retained.len(),
1031 latest,
1032 checkpointed: false,
1033 })
1034 }
1035}
1036
1037pub struct SqliteEventLog {
1038 path: PathBuf,
1039 connection: Mutex<Connection>,
1040 broadcasts: BroadcastMap,
1041 queue_depth: usize,
1042}
1043
1044impl SqliteEventLog {
1045 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1046 if let Some(parent) = path.parent() {
1047 std::fs::create_dir_all(parent)
1048 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1049 }
1050 let connection = Connection::open(&path)
1051 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1052 connection
1059 .busy_timeout(std::time::Duration::from_secs(5))
1060 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1061 connection
1062 .pragma_update(None, "journal_mode", "WAL")
1063 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1064 connection
1065 .pragma_update(None, "synchronous", "NORMAL")
1066 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1067 connection
1068 .execute_batch(
1069 "CREATE TABLE IF NOT EXISTS topic_heads (
1070 topic TEXT PRIMARY KEY,
1071 last_id INTEGER NOT NULL
1072 );
1073 CREATE TABLE IF NOT EXISTS events (
1074 topic TEXT NOT NULL,
1075 event_id INTEGER NOT NULL,
1076 kind TEXT NOT NULL,
1077 payload BLOB NOT NULL,
1078 headers TEXT NOT NULL,
1079 occurred_at_ms INTEGER NOT NULL,
1080 PRIMARY KEY (topic, event_id)
1081 );
1082 CREATE TABLE IF NOT EXISTS consumers (
1083 topic TEXT NOT NULL,
1084 consumer_id TEXT NOT NULL,
1085 cursor INTEGER NOT NULL,
1086 updated_at_ms INTEGER NOT NULL,
1087 PRIMARY KEY (topic, consumer_id)
1088 );",
1089 )
1090 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1091 Ok(Self {
1092 path,
1093 connection: Mutex::new(connection),
1094 broadcasts: BroadcastMap::default(),
1095 queue_depth: queue_depth.max(1),
1096 })
1097 }
1098}
1099
1100impl EventLog for SqliteEventLog {
1101 fn describe(&self) -> EventLogDescription {
1102 EventLogDescription {
1103 backend: EventLogBackendKind::Sqlite,
1104 location: Some(self.path.clone()),
1105 size_bytes: Some(sqlite_size_bytes(&self.path)),
1106 queue_depth: self.queue_depth,
1107 }
1108 }
1109
1110 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1111 let mut connection = self
1112 .connection
1113 .lock()
1114 .expect("sqlite event log connection poisoned");
1115 let tx = connection
1116 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1117 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1118 tx.execute(
1119 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1120 params![topic.as_str()],
1121 )
1122 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1123 tx.execute(
1124 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1125 params![topic.as_str()],
1126 )
1127 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1128 let event_id = tx
1129 .query_row(
1130 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1131 params![topic.as_str()],
1132 |row| row.get::<_, i64>(0),
1133 )
1134 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1135 .and_then(sqlite_i64_to_event_id)?;
1136 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1137 tx.execute(
1138 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1139 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1140 params![
1141 topic.as_str(),
1142 event_id_sql,
1143 event.kind,
1144 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1145 "event log payload encode error: {error}"
1146 )))?,
1147 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1148 "event log headers encode error: {error}"
1149 )))?,
1150 event.occurred_at_ms
1151 ],
1152 )
1153 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1154 tx.commit()
1155 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1156 self.broadcasts
1157 .publish(topic, self.queue_depth, (event_id, event.clone()));
1158 Ok(event_id)
1159 }
1160
1161 async fn flush(&self) -> Result<(), LogError> {
1162 let connection = self
1163 .connection
1164 .lock()
1165 .expect("sqlite event log connection poisoned");
1166 connection
1167 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1168 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1169 Ok(())
1170 }
1171
1172 async fn read_range(
1173 &self,
1174 topic: &Topic,
1175 from: Option<EventId>,
1176 limit: usize,
1177 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1178 let connection = self
1179 .connection
1180 .lock()
1181 .expect("sqlite event log connection poisoned");
1182 let mut statement = connection
1183 .prepare(
1184 "SELECT event_id, kind, payload, headers, occurred_at_ms
1185 FROM events
1186 WHERE topic = ?1 AND event_id > ?2
1187 ORDER BY event_id ASC
1188 LIMIT ?3",
1189 )
1190 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1191 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1192 let rows = statement
1193 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1194 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1195 let headers: String = row.get(3)?;
1196 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1197 Ok((
1198 event_id,
1199 LogEvent {
1200 kind: row.get(1)?,
1201 payload: serde_json::from_slice(&payload).map_err(|error| {
1202 rusqlite::Error::FromSqlConversionFailure(
1203 payload.len(),
1204 rusqlite::types::Type::Blob,
1205 Box::new(error),
1206 )
1207 })?,
1208 headers: serde_json::from_str(&headers).map_err(|error| {
1209 rusqlite::Error::FromSqlConversionFailure(
1210 headers.len(),
1211 rusqlite::types::Type::Text,
1212 Box::new(error),
1213 )
1214 })?,
1215 occurred_at_ms: row.get(4)?,
1216 },
1217 ))
1218 })
1219 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1220 let mut events = Vec::new();
1221 for row in rows {
1222 events.push(
1223 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1224 );
1225 }
1226 Ok(events)
1227 }
1228
1229 async fn read_range_bytes(
1230 &self,
1231 topic: &Topic,
1232 from: Option<EventId>,
1233 limit: usize,
1234 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1235 let connection = self
1236 .connection
1237 .lock()
1238 .expect("sqlite event log connection poisoned");
1239 let mut statement = connection
1240 .prepare(
1241 "SELECT event_id, kind, payload, headers, occurred_at_ms
1242 FROM events
1243 WHERE topic = ?1 AND event_id > ?2
1244 ORDER BY event_id ASC
1245 LIMIT ?3",
1246 )
1247 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1248 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1249 let rows = statement
1250 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1251 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1252 let headers: String = row.get(3)?;
1253 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1254 Ok((
1255 event_id,
1256 LogEventBytes {
1257 kind: row.get(1)?,
1258 payload: Bytes::from(payload),
1259 headers: serde_json::from_str(&headers).map_err(|error| {
1260 rusqlite::Error::FromSqlConversionFailure(
1261 headers.len(),
1262 rusqlite::types::Type::Text,
1263 Box::new(error),
1264 )
1265 })?,
1266 occurred_at_ms: row.get(4)?,
1267 },
1268 ))
1269 })
1270 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1271 let mut events = Vec::new();
1272 for row in rows {
1273 events.push(
1274 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1275 );
1276 }
1277 Ok(events)
1278 }
1279
1280 async fn subscribe(
1281 self: Arc<Self>,
1282 topic: &Topic,
1283 from: Option<EventId>,
1284 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1285 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1286 let history = self.read_range(topic, from, usize::MAX).await?;
1287 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1288 }
1289
1290 async fn ack(
1291 &self,
1292 topic: &Topic,
1293 consumer: &ConsumerId,
1294 up_to: EventId,
1295 ) -> Result<(), LogError> {
1296 let connection = self
1297 .connection
1298 .lock()
1299 .expect("sqlite event log connection poisoned");
1300 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1301 connection
1302 .execute(
1303 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1304 VALUES (?1, ?2, ?3, ?4)
1305 ON CONFLICT(topic, consumer_id)
1306 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1307 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1308 )
1309 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1310 Ok(())
1311 }
1312
1313 async fn consumer_cursor(
1314 &self,
1315 topic: &Topic,
1316 consumer: &ConsumerId,
1317 ) -> Result<Option<EventId>, LogError> {
1318 let connection = self
1319 .connection
1320 .lock()
1321 .expect("sqlite event log connection poisoned");
1322 connection
1323 .query_row(
1324 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1325 params![topic.as_str(), consumer.as_str()],
1326 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1327 )
1328 .optional()
1329 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1330 }
1331
1332 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1333 let connection = self
1334 .connection
1335 .lock()
1336 .expect("sqlite event log connection poisoned");
1337 connection
1338 .query_row(
1339 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1340 params![topic.as_str()],
1341 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1342 )
1343 .optional()
1344 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1345 }
1346
1347 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1348 let connection = self
1349 .connection
1350 .lock()
1351 .expect("sqlite event log connection poisoned");
1352 let before_sql = event_id_to_sqlite_i64(before)?;
1353 let removed = connection
1354 .execute(
1355 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1356 params![topic.as_str(), before_sql],
1357 )
1358 .map_err(|error| {
1359 LogError::Sqlite(format!("event log compact delete error: {error}"))
1360 })?;
1361 let remaining = connection
1362 .query_row(
1363 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1364 params![topic.as_str()],
1365 |row| row.get::<_, i64>(0),
1366 )
1367 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1368 .and_then(sqlite_i64_to_usize)?;
1369 let latest = connection
1370 .query_row(
1371 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1372 params![topic.as_str()],
1373 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1374 )
1375 .optional()
1376 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1377 connection
1378 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1379 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1380 Ok(CompactReport {
1381 removed,
1382 remaining,
1383 latest,
1384 checkpointed: true,
1385 })
1386 }
1387}
1388
1389fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1390 let candidate = PathBuf::from(value);
1391 if candidate.is_absolute() {
1392 candidate
1393 } else {
1394 base_dir.join(candidate)
1395 }
1396}
1397
1398fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1399 if let Some(parent) = path.parent() {
1400 std::fs::create_dir_all(parent)
1401 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1402 }
1403 let tmp = path.with_extension("tmp");
1404 let encoded = serde_json::to_vec_pretty(payload)
1405 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1406 std::fs::write(&tmp, encoded)
1407 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1408 std::fs::rename(&tmp, path)
1409 .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1410 Ok(())
1411}
1412
1413fn sanitize_filename(value: &str) -> String {
1414 sanitize_topic_component(value)
1415}
1416
1417pub fn sanitize_topic_component(value: &str) -> String {
1418 value
1419 .chars()
1420 .map(|ch| {
1421 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1422 ch
1423 } else {
1424 '_'
1425 }
1426 })
1427 .collect()
1428}
1429
1430fn dir_size_bytes(path: &Path) -> u64 {
1431 if !path.exists() {
1432 return 0;
1433 }
1434 let mut total = 0;
1435 if let Ok(entries) = std::fs::read_dir(path) {
1436 for entry in entries.flatten() {
1437 let path = entry.path();
1438 if path.is_dir() {
1439 total += dir_size_bytes(&path);
1440 } else if let Ok(metadata) = entry.metadata() {
1441 total += metadata.len();
1442 }
1443 }
1444 }
1445 total
1446}
1447
1448fn sqlite_size_bytes(path: &Path) -> u64 {
1449 let mut total = file_size(path);
1450 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1451 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1452 total
1453}
1454
1455fn file_size(path: &Path) -> u64 {
1456 std::fs::metadata(path)
1457 .map(|metadata| metadata.len())
1458 .unwrap_or(0)
1459}
1460
1461fn sync_tree(root: &Path) -> Result<(), LogError> {
1462 if !root.exists() {
1463 return Ok(());
1464 }
1465 for entry in std::fs::read_dir(root)
1466 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1467 {
1468 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1469 let path = entry.path();
1470 if path.is_dir() {
1471 sync_tree(&path)?;
1472 continue;
1473 }
1474 std::fs::File::open(&path)
1475 .and_then(|file| file.sync_all())
1476 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1477 }
1478 Ok(())
1479}
1480
1481fn now_ms() -> i64 {
1482 std::time::SystemTime::now()
1483 .duration_since(std::time::UNIX_EPOCH)
1484 .map(|duration| duration.as_millis() as i64)
1485 .unwrap_or(0)
1486}
1487
1488fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1489 i64::try_from(event_id)
1490 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1491}
1492
1493fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1494 u64::try_from(value)
1495 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1496}
1497
1498fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1499 u64::try_from(value).map_err(|_| {
1500 rusqlite::Error::FromSqlConversionFailure(
1501 std::mem::size_of::<i64>(),
1502 rusqlite::types::Type::Integer,
1503 "sqlite event id is negative".into(),
1504 )
1505 })
1506}
1507
1508fn sqlite_json_bytes_for_row(
1509 row: &rusqlite::Row<'_>,
1510 index: usize,
1511 name: &str,
1512) -> rusqlite::Result<Vec<u8>> {
1513 let value = row.get_ref(index)?;
1514 match value {
1515 rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1516 Ok(bytes.to_vec())
1517 }
1518 other => Err(rusqlite::Error::InvalidColumnType(
1519 index,
1520 name.to_string(),
1521 other.data_type(),
1522 )),
1523 }
1524}
1525
1526fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1527 usize::try_from(value)
1528 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533 use super::*;
1534 use futures::StreamExt;
1535 use rand::{rngs::StdRng, RngExt, SeedableRng};
1536
1537 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1538 let topic = Topic::new("trigger.inbox").unwrap();
1539 for i in 0..10_000 {
1540 log.append(
1541 &topic,
1542 LogEvent::new("append", serde_json::json!({ "i": i })),
1543 )
1544 .await
1545 .unwrap();
1546 }
1547 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1548 assert_eq!(events.len(), 10_000);
1549 assert_eq!(events.first().unwrap().0, 1);
1550 assert_eq!(events.last().unwrap().0, 10_000);
1551 }
1552
1553 #[tokio::test(flavor = "current_thread")]
1554 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1555 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1556 exercise_basic_backend(log.clone()).await;
1557
1558 let topic = Topic::new("agent.transcript.demo").unwrap();
1559 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1560 let first = log
1561 .append(
1562 &topic,
1563 LogEvent::new("message", serde_json::json!({"text":"one"})),
1564 )
1565 .await
1566 .unwrap();
1567 let second = log
1568 .append(
1569 &topic,
1570 LogEvent::new("message", serde_json::json!({"text":"two"})),
1571 )
1572 .await
1573 .unwrap();
1574 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1575 assert_eq!(seen[0].as_ref().unwrap().0, first);
1576 assert_eq!(seen[1].as_ref().unwrap().0, second);
1577
1578 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1579 .await
1580 .unwrap();
1581 let compact = log.compact(&topic, first).await.unwrap();
1582 assert_eq!(compact.removed, 1);
1583 assert_eq!(compact.remaining, 1);
1584 }
1585
1586 #[tokio::test(flavor = "current_thread")]
1587 async fn file_backend_persists_across_reopen_and_compacts() {
1588 let dir = tempfile::tempdir().unwrap();
1589 let topic = Topic::new("trigger.outbox").unwrap();
1590 let first_log = Arc::new(AnyEventLog::File(
1591 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1592 ));
1593 first_log
1594 .append(
1595 &topic,
1596 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1597 )
1598 .await
1599 .unwrap();
1600 first_log
1601 .append(
1602 &topic,
1603 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1604 )
1605 .await
1606 .unwrap();
1607 drop(first_log);
1608
1609 let reopened = Arc::new(AnyEventLog::File(
1610 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1611 ));
1612 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1613 assert_eq!(events.len(), 2);
1614 let compact = reopened.compact(&topic, 1).await.unwrap();
1615 assert_eq!(compact.removed, 1);
1616 assert_eq!(
1617 reopened
1618 .read_range(&topic, None, usize::MAX)
1619 .await
1620 .unwrap()
1621 .len(),
1622 1
1623 );
1624 }
1625
1626 #[tokio::test(flavor = "current_thread")]
1627 async fn file_backend_skips_torn_tail_on_restart() {
1628 let dir = tempfile::tempdir().unwrap();
1629 let topic = Topic::new("trigger.inbox").unwrap();
1630 let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1631 first_log
1632 .append(
1633 &topic,
1634 LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
1635 )
1636 .await
1637 .unwrap();
1638 drop(first_log);
1639
1640 let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
1641 use std::io::Write as _;
1642 let mut file = std::fs::OpenOptions::new()
1643 .append(true)
1644 .open(&topic_path)
1645 .unwrap();
1646 write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
1647 drop(file);
1648
1649 let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
1650 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1651 assert_eq!(events.len(), 1);
1652 assert_eq!(events[0].0, 1);
1653 assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
1654 }
1655
1656 #[tokio::test(flavor = "current_thread")]
1657 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1658 let dir = tempfile::tempdir().unwrap();
1659 let path = dir.path().join("events.sqlite");
1660 let topic = Topic::new("daemon.demo.state").unwrap();
1661 let first_log = Arc::new(AnyEventLog::Sqlite(
1662 SqliteEventLog::open(path.clone(), 8).unwrap(),
1663 ));
1664 first_log
1665 .append(
1666 &topic,
1667 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1668 )
1669 .await
1670 .unwrap();
1671 first_log
1672 .append(
1673 &topic,
1674 LogEvent::new("state", serde_json::json!({"state":"active"})),
1675 )
1676 .await
1677 .unwrap();
1678 drop(first_log);
1679
1680 let reopened = Arc::new(AnyEventLog::Sqlite(
1681 SqliteEventLog::open(path.clone(), 8).unwrap(),
1682 ));
1683 assert_eq!(
1684 reopened
1685 .read_range(&topic, None, usize::MAX)
1686 .await
1687 .unwrap()
1688 .len(),
1689 2
1690 );
1691 let compact = reopened.compact(&topic, 1).await.unwrap();
1692 assert!(compact.checkpointed);
1693 let wal = PathBuf::from(format!("{}-wal", path.display()));
1694 assert!(file_size(&wal) == 0 || !wal.exists());
1695 }
1696
1697 #[tokio::test(flavor = "current_thread")]
1698 async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
1699 let dir = tempfile::tempdir().unwrap();
1700 let path = dir.path().join("events.sqlite");
1701 let topic = Topic::new("observability.action_graph").unwrap();
1702 let log = SqliteEventLog::open(path, 8).unwrap();
1703 let event_id = log
1704 .append(
1705 &topic,
1706 LogEvent::new(
1707 "snapshot",
1708 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
1709 ),
1710 )
1711 .await
1712 .unwrap();
1713
1714 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1715 assert_eq!(events.len(), 1);
1716 assert_eq!(events[0].0, event_id);
1717 assert_eq!(
1718 events[0].1.payload_json().unwrap(),
1719 serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
1720 );
1721 }
1722
1723 #[tokio::test(flavor = "current_thread")]
1724 async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
1725 let dir = tempfile::tempdir().unwrap();
1726 let path = dir.path().join("events.sqlite");
1727 let topic = Topic::new("agent.transcript.legacy").unwrap();
1728 let log = SqliteEventLog::open(path, 8).unwrap();
1729 {
1730 let connection = log.connection.lock().unwrap();
1731 connection
1732 .execute(
1733 "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
1734 params![topic.as_str()],
1735 )
1736 .unwrap();
1737 connection
1738 .execute(
1739 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1740 VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
1741 params![topic.as_str(), "{\"text\":\"old\"}"],
1742 )
1743 .unwrap();
1744 }
1745
1746 let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
1747 assert_eq!(
1748 events[0].1.payload_json().unwrap(),
1749 serde_json::json!({"text": "old"})
1750 );
1751 assert_eq!(
1752 log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
1753 "legacy"
1754 );
1755 }
1756
1757 #[tokio::test(flavor = "current_thread")]
1758 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1759 let (sender, rx) = broadcast::channel(2);
1760 for i in 0..10 {
1761 sender
1762 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1763 .unwrap();
1764 }
1765 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1766
1767 match stream.next().await {
1768 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1769 other => panic!("subscriber should surface lag, got {other:?}"),
1770 }
1771 }
1772
1773 #[tokio::test(flavor = "current_thread")]
1774 async fn randomized_reader_sequences_stay_monotonic() {
1775 let log = Arc::new(MemoryEventLog::new(32));
1776 let topic = Topic::new("fuzz.demo").unwrap();
1777 let mut readers = vec![
1778 log.clone().subscribe(&topic, None).await.unwrap(),
1779 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1780 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1781 ];
1782 let mut rng = StdRng::seed_from_u64(7);
1783 for _ in 0..64 {
1784 let value = rng.random_range(0..1000);
1785 log.append(
1786 &topic,
1787 LogEvent::new("rand", serde_json::json!({"value": value})),
1788 )
1789 .await
1790 .unwrap();
1791 }
1792
1793 let mut sequences = Vec::new();
1794 for reader in &mut readers {
1795 let mut ids = Vec::new();
1796 while let Some(item) = reader.next().await {
1797 match item {
1798 Ok((event_id, _)) => {
1799 ids.push(event_id);
1800 if ids.len() >= 16 {
1801 break;
1802 }
1803 }
1804 Err(LogError::ConsumerLagged(_)) => break,
1805 Err(error) => panic!("unexpected subscription error: {error}"),
1806 }
1807 }
1808 sequences.push(ids);
1809 }
1810
1811 for ids in sequences {
1812 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1813 }
1814 }
1815}