1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use futures::stream::BoxStream;
9use rusqlite::{params, Connection, OptionalExtension};
10use serde::{Deserialize, Serialize};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type EventId = u64;
15
16pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
17pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
18pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
19pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
20
21#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
22pub struct Topic(String);
23
24impl Topic {
25 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
26 let value = value.into();
27 if value.is_empty() {
28 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
29 }
30 if !value
31 .chars()
32 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
33 {
34 return Err(LogError::InvalidTopic(format!(
35 "topic '{value}' contains unsupported characters"
36 )));
37 }
38 Ok(Self(value))
39 }
40
41 pub fn as_str(&self) -> &str {
42 &self.0
43 }
44}
45
46impl fmt::Display for Topic {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 self.0.fmt(f)
49 }
50}
51
52impl FromStr for Topic {
53 type Err = LogError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 Self::new(s)
57 }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
61pub struct ConsumerId(String);
62
63impl ConsumerId {
64 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
65 let value = value.into();
66 if value.trim().is_empty() {
67 return Err(LogError::InvalidConsumer(
68 "consumer id cannot be empty".to_string(),
69 ));
70 }
71 Ok(Self(value))
72 }
73
74 pub fn as_str(&self) -> &str {
75 &self.0
76 }
77}
78
79impl fmt::Display for ConsumerId {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 self.0.fmt(f)
82 }
83}
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum EventLogBackendKind {
88 Memory,
89 File,
90 Sqlite,
91}
92
93impl fmt::Display for EventLogBackendKind {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self {
96 Self::Memory => write!(f, "memory"),
97 Self::File => write!(f, "file"),
98 Self::Sqlite => write!(f, "sqlite"),
99 }
100 }
101}
102
103impl FromStr for EventLogBackendKind {
104 type Err = LogError;
105
106 fn from_str(value: &str) -> Result<Self, Self::Err> {
107 match value.trim().to_ascii_lowercase().as_str() {
108 "memory" => Ok(Self::Memory),
109 "file" => Ok(Self::File),
110 "sqlite" => Ok(Self::Sqlite),
111 other => Err(LogError::Config(format!(
112 "unsupported event log backend '{other}'"
113 ))),
114 }
115 }
116}
117
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
119pub struct LogEvent {
120 pub kind: String,
121 pub payload: serde_json::Value,
122 #[serde(default)]
123 pub headers: BTreeMap<String, String>,
124 pub occurred_at_ms: i64,
125}
126
127impl LogEvent {
128 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
129 Self {
130 kind: kind.into(),
131 payload,
132 headers: BTreeMap::new(),
133 occurred_at_ms: now_ms(),
134 }
135 }
136
137 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
138 self.headers = headers;
139 self
140 }
141}
142
143#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
144pub struct CompactReport {
145 pub removed: usize,
146 pub remaining: usize,
147 pub latest: Option<EventId>,
148 pub checkpointed: bool,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct EventLogDescription {
153 pub backend: EventLogBackendKind,
154 pub location: Option<PathBuf>,
155 pub size_bytes: Option<u64>,
156 pub queue_depth: usize,
157}
158
159#[derive(Debug)]
160pub enum LogError {
161 Config(String),
162 InvalidTopic(String),
163 InvalidConsumer(String),
164 Io(String),
165 Serde(String),
166 Sqlite(String),
167 ConsumerLagged(EventId),
168}
169
170impl fmt::Display for LogError {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 match self {
173 Self::Config(message)
174 | Self::InvalidTopic(message)
175 | Self::InvalidConsumer(message)
176 | Self::Io(message)
177 | Self::Serde(message)
178 | Self::Sqlite(message) => message.fmt(f),
179 Self::ConsumerLagged(last_id) => {
180 write!(f, "subscriber lagged behind after event {last_id}")
181 }
182 }
183 }
184}
185
186impl std::error::Error for LogError {}
187
188#[allow(async_fn_in_trait)]
189pub trait EventLog: Send + Sync {
190 fn describe(&self) -> EventLogDescription;
191
192 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
193
194 async fn flush(&self) -> Result<(), LogError>;
195
196 async fn read_range(
199 &self,
200 topic: &Topic,
201 from: Option<EventId>,
202 limit: usize,
203 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
204
205 async fn subscribe(
208 self: Arc<Self>,
209 topic: &Topic,
210 from: Option<EventId>,
211 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
212
213 async fn ack(
214 &self,
215 topic: &Topic,
216 consumer: &ConsumerId,
217 up_to: EventId,
218 ) -> Result<(), LogError>;
219
220 async fn consumer_cursor(
221 &self,
222 topic: &Topic,
223 consumer: &ConsumerId,
224 ) -> Result<Option<EventId>, LogError>;
225
226 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
227
228 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
229}
230
231#[derive(Clone, Debug)]
232pub struct EventLogConfig {
233 pub backend: EventLogBackendKind,
234 pub file_dir: PathBuf,
235 pub sqlite_path: PathBuf,
236 pub queue_depth: usize,
237}
238
239impl EventLogConfig {
240 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
241 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
242 .ok()
243 .map(|value| value.parse())
244 .transpose()?
245 .unwrap_or(EventLogBackendKind::Sqlite);
246 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
247 .ok()
248 .and_then(|value| value.parse::<usize>().ok())
249 .unwrap_or(128)
250 .max(1);
251
252 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
253 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
254 _ => crate::runtime_paths::event_log_dir(base_dir),
255 };
256 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
257 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
258 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
259 };
260
261 Ok(Self {
262 backend,
263 file_dir,
264 sqlite_path,
265 queue_depth,
266 })
267 }
268
269 pub fn location(&self) -> Option<PathBuf> {
270 match self.backend {
271 EventLogBackendKind::Memory => None,
272 EventLogBackendKind::File => Some(self.file_dir.clone()),
273 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
274 }
275 }
276}
277
278thread_local! {
279 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
280}
281
282pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
283 let config = EventLogConfig::for_base_dir(base_dir)?;
284 let log = open_event_log(&config)?;
285 ACTIVE_EVENT_LOG.with(|slot| {
286 *slot.borrow_mut() = Some(log.clone());
287 });
288 Ok(log)
289}
290
291pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
292 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
293 ACTIVE_EVENT_LOG.with(|slot| {
294 *slot.borrow_mut() = Some(log.clone());
295 });
296 log
297}
298
299pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
300 ACTIVE_EVENT_LOG.with(|slot| {
301 *slot.borrow_mut() = Some(log.clone());
302 });
303 log
304}
305
306pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
307 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
308}
309
310pub fn reset_active_event_log() {
311 ACTIVE_EVENT_LOG.with(|slot| {
312 *slot.borrow_mut() = None;
313 });
314}
315
316pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
317 let config = EventLogConfig::for_base_dir(base_dir)?;
318 let description = match config.backend {
319 EventLogBackendKind::Memory => EventLogDescription {
320 backend: EventLogBackendKind::Memory,
321 location: None,
322 size_bytes: None,
323 queue_depth: config.queue_depth,
324 },
325 EventLogBackendKind::File => EventLogDescription {
326 backend: EventLogBackendKind::File,
327 size_bytes: Some(dir_size_bytes(&config.file_dir)),
328 location: Some(config.file_dir),
329 queue_depth: config.queue_depth,
330 },
331 EventLogBackendKind::Sqlite => EventLogDescription {
332 backend: EventLogBackendKind::Sqlite,
333 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
334 location: Some(config.sqlite_path),
335 queue_depth: config.queue_depth,
336 },
337 };
338 Ok(description)
339}
340
341pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
342 match config.backend {
343 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
344 config.queue_depth,
345 )))),
346 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
347 config.file_dir.clone(),
348 config.queue_depth,
349 )?))),
350 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
351 config.sqlite_path.clone(),
352 config.queue_depth,
353 )?))),
354 }
355}
356
357pub enum AnyEventLog {
358 Memory(MemoryEventLog),
359 File(FileEventLog),
360 Sqlite(SqliteEventLog),
361}
362
363impl EventLog for AnyEventLog {
364 fn describe(&self) -> EventLogDescription {
365 match self {
366 Self::Memory(log) => log.describe(),
367 Self::File(log) => log.describe(),
368 Self::Sqlite(log) => log.describe(),
369 }
370 }
371
372 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
373 match self {
374 Self::Memory(log) => log.append(topic, event).await,
375 Self::File(log) => log.append(topic, event).await,
376 Self::Sqlite(log) => log.append(topic, event).await,
377 }
378 }
379
380 async fn flush(&self) -> Result<(), LogError> {
381 match self {
382 Self::Memory(log) => log.flush().await,
383 Self::File(log) => log.flush().await,
384 Self::Sqlite(log) => log.flush().await,
385 }
386 }
387
388 async fn read_range(
389 &self,
390 topic: &Topic,
391 from: Option<EventId>,
392 limit: usize,
393 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
394 match self {
395 Self::Memory(log) => log.read_range(topic, from, limit).await,
396 Self::File(log) => log.read_range(topic, from, limit).await,
397 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
398 }
399 }
400
401 async fn subscribe(
402 self: Arc<Self>,
403 topic: &Topic,
404 from: Option<EventId>,
405 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
406 let (rx, queue_depth) = match self.as_ref() {
407 Self::Memory(log) => (
408 log.broadcasts.subscribe(topic, log.queue_depth),
409 log.queue_depth,
410 ),
411 Self::File(log) => (
412 log.broadcasts.subscribe(topic, log.queue_depth),
413 log.queue_depth,
414 ),
415 Self::Sqlite(log) => (
416 log.broadcasts.subscribe(topic, log.queue_depth),
417 log.queue_depth,
418 ),
419 };
420 let history = self.read_range(topic, from, usize::MAX).await?;
421 Ok(stream_from_broadcast(history, from, rx, queue_depth))
422 }
423
424 async fn ack(
425 &self,
426 topic: &Topic,
427 consumer: &ConsumerId,
428 up_to: EventId,
429 ) -> Result<(), LogError> {
430 match self {
431 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
432 Self::File(log) => log.ack(topic, consumer, up_to).await,
433 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
434 }
435 }
436
437 async fn consumer_cursor(
438 &self,
439 topic: &Topic,
440 consumer: &ConsumerId,
441 ) -> Result<Option<EventId>, LogError> {
442 match self {
443 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
444 Self::File(log) => log.consumer_cursor(topic, consumer).await,
445 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
446 }
447 }
448
449 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
450 match self {
451 Self::Memory(log) => log.latest(topic).await,
452 Self::File(log) => log.latest(topic).await,
453 Self::Sqlite(log) => log.latest(topic).await,
454 }
455 }
456
457 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
458 match self {
459 Self::Memory(log) => log.compact(topic, before).await,
460 Self::File(log) => log.compact(topic, before).await,
461 Self::Sqlite(log) => log.compact(topic, before).await,
462 }
463 }
464}
465
466#[derive(Default)]
467struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
468
469impl BroadcastMap {
470 fn subscribe(
471 &self,
472 topic: &Topic,
473 capacity: usize,
474 ) -> broadcast::Receiver<(EventId, LogEvent)> {
475 self.sender(topic, capacity).subscribe()
476 }
477
478 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
479 let _ = self.sender(topic, capacity).send(record);
480 }
481
482 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
483 let mut map = self.0.lock().expect("event log broadcast map poisoned");
484 map.entry(topic.as_str().to_string())
485 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
486 .clone()
487 }
488}
489
490fn stream_from_broadcast(
491 history: Vec<(EventId, LogEvent)>,
492 from: Option<EventId>,
493 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
494 queue_depth: usize,
495) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
496 let (tx, rx) = mpsc::channel(queue_depth.max(1));
497 tokio::spawn(async move {
505 let mut last_seen = from.unwrap_or(0);
506 for (event_id, event) in history {
507 last_seen = event_id;
508 if tx.send(Ok((event_id, event))).await.is_err() {
509 return;
510 }
511 }
512
513 loop {
514 match live_rx.recv().await {
515 Ok((event_id, event)) if event_id > last_seen => {
516 last_seen = event_id;
517 if tx.send(Ok((event_id, event))).await.is_err() {
518 return;
519 }
520 }
521 Ok(_) => {}
522 Err(broadcast::error::RecvError::Closed) => return,
523 Err(broadcast::error::RecvError::Lagged(_)) => {
524 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
525 return;
526 }
527 }
528 }
529 });
530 Box::pin(ReceiverStream::new(rx))
531}
532
533#[derive(Default)]
534struct MemoryState {
535 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
536 latest: HashMap<String, EventId>,
537 consumers: HashMap<(String, String), EventId>,
538}
539
540pub struct MemoryEventLog {
541 state: tokio::sync::Mutex<MemoryState>,
542 broadcasts: BroadcastMap,
543 queue_depth: usize,
544}
545
546impl MemoryEventLog {
547 pub fn new(queue_depth: usize) -> Self {
548 Self {
549 state: tokio::sync::Mutex::new(MemoryState::default()),
550 broadcasts: BroadcastMap::default(),
551 queue_depth: queue_depth.max(1),
552 }
553 }
554}
555
556impl EventLog for MemoryEventLog {
557 fn describe(&self) -> EventLogDescription {
558 EventLogDescription {
559 backend: EventLogBackendKind::Memory,
560 location: None,
561 size_bytes: None,
562 queue_depth: self.queue_depth,
563 }
564 }
565
566 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
567 let mut state = self.state.lock().await;
568 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
569 state.latest.insert(topic.as_str().to_string(), event_id);
570 state
571 .topics
572 .entry(topic.as_str().to_string())
573 .or_default()
574 .push_back((event_id, event.clone()));
575 drop(state);
576 self.broadcasts
577 .publish(topic, self.queue_depth, (event_id, event));
578 Ok(event_id)
579 }
580
581 async fn flush(&self) -> Result<(), LogError> {
582 Ok(())
583 }
584
585 async fn read_range(
586 &self,
587 topic: &Topic,
588 from: Option<EventId>,
589 limit: usize,
590 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
591 let from = from.unwrap_or(0);
592 let state = self.state.lock().await;
593 let events = state
594 .topics
595 .get(topic.as_str())
596 .into_iter()
597 .flat_map(|events| events.iter())
598 .filter(|(event_id, _)| *event_id > from)
599 .take(limit)
600 .map(|(event_id, event)| (*event_id, event.clone()))
601 .collect();
602 Ok(events)
603 }
604
605 async fn subscribe(
606 self: Arc<Self>,
607 topic: &Topic,
608 from: Option<EventId>,
609 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
610 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
611 let history = self.read_range(topic, from, usize::MAX).await?;
612 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
613 }
614
615 async fn ack(
616 &self,
617 topic: &Topic,
618 consumer: &ConsumerId,
619 up_to: EventId,
620 ) -> Result<(), LogError> {
621 let mut state = self.state.lock().await;
622 state.consumers.insert(
623 (topic.as_str().to_string(), consumer.as_str().to_string()),
624 up_to,
625 );
626 Ok(())
627 }
628
629 async fn consumer_cursor(
630 &self,
631 topic: &Topic,
632 consumer: &ConsumerId,
633 ) -> Result<Option<EventId>, LogError> {
634 let state = self.state.lock().await;
635 Ok(state
636 .consumers
637 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
638 .copied())
639 }
640
641 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
642 let state = self.state.lock().await;
643 Ok(state.latest.get(topic.as_str()).copied())
644 }
645
646 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
647 let mut state = self.state.lock().await;
648 let Some(events) = state.topics.get_mut(topic.as_str()) else {
649 return Ok(CompactReport::default());
650 };
651 let removed = events
652 .iter()
653 .take_while(|(event_id, _)| *event_id <= before)
654 .count();
655 for _ in 0..removed {
656 events.pop_front();
657 }
658 Ok(CompactReport {
659 removed,
660 remaining: events.len(),
661 latest: state.latest.get(topic.as_str()).copied(),
662 checkpointed: false,
663 })
664 }
665}
666
667#[derive(Serialize, Deserialize)]
668struct FileRecord {
669 id: EventId,
670 event: LogEvent,
671}
672
673pub struct FileEventLog {
674 root: PathBuf,
675 latest_ids: Mutex<HashMap<String, EventId>>,
676 write_lock: Mutex<()>,
677 broadcasts: BroadcastMap,
678 queue_depth: usize,
679}
680
681impl FileEventLog {
682 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
683 std::fs::create_dir_all(root.join("topics"))
684 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
685 std::fs::create_dir_all(root.join("consumers"))
686 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
687 Ok(Self {
688 root,
689 latest_ids: Mutex::new(HashMap::new()),
690 write_lock: Mutex::new(()),
691 broadcasts: BroadcastMap::default(),
692 queue_depth: queue_depth.max(1),
693 })
694 }
695
696 fn topic_path(&self, topic: &Topic) -> PathBuf {
697 self.root
698 .join("topics")
699 .join(format!("{}.jsonl", topic.as_str()))
700 }
701
702 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
703 self.root.join("consumers").join(format!(
704 "{}__{}.json",
705 topic.as_str(),
706 sanitize_filename(consumer.as_str())
707 ))
708 }
709
710 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
711 if let Some(event_id) = self
712 .latest_ids
713 .lock()
714 .expect("file event log latest ids poisoned")
715 .get(topic.as_str())
716 .copied()
717 {
718 return Ok(event_id);
719 }
720
721 let mut latest = 0;
722 let path = self.topic_path(topic);
723 if path.is_file() {
724 let file = std::fs::File::open(&path)
725 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
726 for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
727 let line =
728 line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
729 let record: FileRecord = serde_json::from_str(&line)
730 .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
731 latest = record.id;
732 }
733 }
734 self.latest_ids
735 .lock()
736 .expect("file event log latest ids poisoned")
737 .insert(topic.as_str().to_string(), latest);
738 Ok(latest)
739 }
740
741 fn read_range_sync(
742 &self,
743 topic: &Topic,
744 from: Option<EventId>,
745 limit: usize,
746 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
747 let path = self.topic_path(topic);
748 if !path.is_file() {
749 return Ok(Vec::new());
750 }
751 let file = std::fs::File::open(&path)
752 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
753 let from = from.unwrap_or(0);
754 let mut events = Vec::new();
755 for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
756 let line =
757 line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
758 let record: FileRecord = serde_json::from_str(&line)
759 .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
760 if record.id > from {
761 events.push((record.id, record.event));
762 }
763 if events.len() >= limit {
764 break;
765 }
766 }
767 Ok(events)
768 }
769}
770
771impl EventLog for FileEventLog {
772 fn describe(&self) -> EventLogDescription {
773 EventLogDescription {
774 backend: EventLogBackendKind::File,
775 location: Some(self.root.clone()),
776 size_bytes: Some(dir_size_bytes(&self.root)),
777 queue_depth: self.queue_depth,
778 }
779 }
780
781 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
782 let _guard = self
783 .write_lock
784 .lock()
785 .expect("file event log write lock poisoned");
786 let next_id = self.latest_id_for_topic(topic)? + 1;
787 let record = FileRecord {
788 id: next_id,
789 event: event.clone(),
790 };
791 let path = self.topic_path(topic);
792 if let Some(parent) = path.parent() {
793 std::fs::create_dir_all(parent)
794 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
795 }
796 let line = serde_json::to_string(&record)
797 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
798 use std::io::Write as _;
799 let mut file = std::fs::OpenOptions::new()
800 .create(true)
801 .append(true)
802 .open(&path)
803 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
804 writeln!(file, "{line}")
805 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
806 self.latest_ids
807 .lock()
808 .expect("file event log latest ids poisoned")
809 .insert(topic.as_str().to_string(), next_id);
810 self.broadcasts
811 .publish(topic, self.queue_depth, (next_id, event));
812 Ok(next_id)
813 }
814
815 async fn flush(&self) -> Result<(), LogError> {
816 sync_tree(&self.root)
817 }
818
819 async fn read_range(
820 &self,
821 topic: &Topic,
822 from: Option<EventId>,
823 limit: usize,
824 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
825 self.read_range_sync(topic, from, limit)
826 }
827
828 async fn subscribe(
829 self: Arc<Self>,
830 topic: &Topic,
831 from: Option<EventId>,
832 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
833 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
834 let history = self.read_range_sync(topic, from, usize::MAX)?;
835 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
836 }
837
838 async fn ack(
839 &self,
840 topic: &Topic,
841 consumer: &ConsumerId,
842 up_to: EventId,
843 ) -> Result<(), LogError> {
844 let path = self.consumer_path(topic, consumer);
845 let payload = serde_json::json!({
846 "topic": topic.as_str(),
847 "consumer_id": consumer.as_str(),
848 "cursor": up_to,
849 "updated_at_ms": now_ms(),
850 });
851 write_json_atomically(&path, &payload)
852 }
853
854 async fn consumer_cursor(
855 &self,
856 topic: &Topic,
857 consumer: &ConsumerId,
858 ) -> Result<Option<EventId>, LogError> {
859 let path = self.consumer_path(topic, consumer);
860 if !path.is_file() {
861 return Ok(None);
862 }
863 let raw = std::fs::read_to_string(&path)
864 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
865 let payload: serde_json::Value = serde_json::from_str(&raw)
866 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
867 let cursor = payload
868 .get("cursor")
869 .and_then(serde_json::Value::as_u64)
870 .ok_or_else(|| {
871 LogError::Serde("event log consumer record missing numeric cursor".to_string())
872 })?;
873 Ok(Some(cursor))
874 }
875
876 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
877 let latest = self.latest_id_for_topic(topic)?;
878 if latest == 0 {
879 Ok(None)
880 } else {
881 Ok(Some(latest))
882 }
883 }
884
885 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
886 let _guard = self
887 .write_lock
888 .lock()
889 .expect("file event log write lock poisoned");
890 let path = self.topic_path(topic);
891 if !path.is_file() {
892 return Ok(CompactReport::default());
893 }
894 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
895 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
896 let tmp = path.with_extension("jsonl.tmp");
897 if retained.is_empty() {
898 let _ = std::fs::remove_file(&path);
899 } else {
900 let mut writer =
901 std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
902 LogError::Io(format!("event log tmp create error: {error}"))
903 })?);
904 use std::io::Write as _;
905 for (event_id, event) in &retained {
906 let line = serde_json::to_string(&FileRecord {
907 id: *event_id,
908 event: event.clone(),
909 })
910 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
911 writeln!(writer, "{line}")
912 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
913 }
914 writer
915 .flush()
916 .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
917 std::fs::rename(&tmp, &path).map_err(|error| {
918 LogError::Io(format!("event log compact finalize error: {error}"))
919 })?;
920 }
921 let latest = retained.last().map(|(event_id, _)| *event_id);
922 self.latest_ids
923 .lock()
924 .expect("file event log latest ids poisoned")
925 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
926 Ok(CompactReport {
927 removed,
928 remaining: retained.len(),
929 latest,
930 checkpointed: false,
931 })
932 }
933}
934
935pub struct SqliteEventLog {
936 path: PathBuf,
937 connection: Mutex<Connection>,
938 broadcasts: BroadcastMap,
939 queue_depth: usize,
940}
941
942impl SqliteEventLog {
943 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
944 if let Some(parent) = path.parent() {
945 std::fs::create_dir_all(parent)
946 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
947 }
948 let connection = Connection::open(&path)
949 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
950 connection
957 .busy_timeout(std::time::Duration::from_secs(5))
958 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
959 connection
960 .pragma_update(None, "journal_mode", "WAL")
961 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
962 connection
963 .pragma_update(None, "synchronous", "NORMAL")
964 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
965 connection
966 .execute_batch(
967 "CREATE TABLE IF NOT EXISTS topic_heads (
968 topic TEXT PRIMARY KEY,
969 last_id INTEGER NOT NULL
970 );
971 CREATE TABLE IF NOT EXISTS events (
972 topic TEXT NOT NULL,
973 event_id INTEGER NOT NULL,
974 kind TEXT NOT NULL,
975 payload TEXT NOT NULL,
976 headers TEXT NOT NULL,
977 occurred_at_ms INTEGER NOT NULL,
978 PRIMARY KEY (topic, event_id)
979 );
980 CREATE TABLE IF NOT EXISTS consumers (
981 topic TEXT NOT NULL,
982 consumer_id TEXT NOT NULL,
983 cursor INTEGER NOT NULL,
984 updated_at_ms INTEGER NOT NULL,
985 PRIMARY KEY (topic, consumer_id)
986 );",
987 )
988 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
989 Ok(Self {
990 path,
991 connection: Mutex::new(connection),
992 broadcasts: BroadcastMap::default(),
993 queue_depth: queue_depth.max(1),
994 })
995 }
996}
997
998impl EventLog for SqliteEventLog {
999 fn describe(&self) -> EventLogDescription {
1000 EventLogDescription {
1001 backend: EventLogBackendKind::Sqlite,
1002 location: Some(self.path.clone()),
1003 size_bytes: Some(sqlite_size_bytes(&self.path)),
1004 queue_depth: self.queue_depth,
1005 }
1006 }
1007
1008 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1009 let mut connection = self
1010 .connection
1011 .lock()
1012 .expect("sqlite event log connection poisoned");
1013 let tx = connection
1014 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1015 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1016 tx.execute(
1017 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1018 params![topic.as_str()],
1019 )
1020 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1021 tx.execute(
1022 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1023 params![topic.as_str()],
1024 )
1025 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1026 let event_id = tx
1027 .query_row(
1028 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1029 params![topic.as_str()],
1030 |row| row.get::<_, i64>(0),
1031 )
1032 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1033 .and_then(sqlite_i64_to_event_id)?;
1034 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1035 tx.execute(
1036 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1037 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1038 params![
1039 topic.as_str(),
1040 event_id_sql,
1041 event.kind,
1042 serde_json::to_string(&event.payload).map_err(|error| LogError::Serde(format!(
1043 "event log payload encode error: {error}"
1044 )))?,
1045 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1046 "event log headers encode error: {error}"
1047 )))?,
1048 event.occurred_at_ms
1049 ],
1050 )
1051 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1052 tx.commit()
1053 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1054 self.broadcasts
1055 .publish(topic, self.queue_depth, (event_id, event.clone()));
1056 Ok(event_id)
1057 }
1058
1059 async fn flush(&self) -> Result<(), LogError> {
1060 let connection = self
1061 .connection
1062 .lock()
1063 .expect("sqlite event log connection poisoned");
1064 connection
1065 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1066 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1067 Ok(())
1068 }
1069
1070 async fn read_range(
1071 &self,
1072 topic: &Topic,
1073 from: Option<EventId>,
1074 limit: usize,
1075 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1076 let connection = self
1077 .connection
1078 .lock()
1079 .expect("sqlite event log connection poisoned");
1080 let mut statement = connection
1081 .prepare(
1082 "SELECT event_id, kind, payload, headers, occurred_at_ms
1083 FROM events
1084 WHERE topic = ?1 AND event_id > ?2
1085 ORDER BY event_id ASC
1086 LIMIT ?3",
1087 )
1088 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1089 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1090 let rows = statement
1091 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1092 let payload: String = row.get(2)?;
1093 let headers: String = row.get(3)?;
1094 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1095 Ok((
1096 event_id,
1097 LogEvent {
1098 kind: row.get(1)?,
1099 payload: serde_json::from_str(&payload).map_err(|error| {
1100 rusqlite::Error::FromSqlConversionFailure(
1101 payload.len(),
1102 rusqlite::types::Type::Text,
1103 Box::new(error),
1104 )
1105 })?,
1106 headers: serde_json::from_str(&headers).map_err(|error| {
1107 rusqlite::Error::FromSqlConversionFailure(
1108 headers.len(),
1109 rusqlite::types::Type::Text,
1110 Box::new(error),
1111 )
1112 })?,
1113 occurred_at_ms: row.get(4)?,
1114 },
1115 ))
1116 })
1117 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1118 let mut events = Vec::new();
1119 for row in rows {
1120 events.push(
1121 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1122 );
1123 }
1124 Ok(events)
1125 }
1126
1127 async fn subscribe(
1128 self: Arc<Self>,
1129 topic: &Topic,
1130 from: Option<EventId>,
1131 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1132 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1133 let history = self.read_range(topic, from, usize::MAX).await?;
1134 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1135 }
1136
1137 async fn ack(
1138 &self,
1139 topic: &Topic,
1140 consumer: &ConsumerId,
1141 up_to: EventId,
1142 ) -> Result<(), LogError> {
1143 let connection = self
1144 .connection
1145 .lock()
1146 .expect("sqlite event log connection poisoned");
1147 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1148 connection
1149 .execute(
1150 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1151 VALUES (?1, ?2, ?3, ?4)
1152 ON CONFLICT(topic, consumer_id)
1153 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1154 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1155 )
1156 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1157 Ok(())
1158 }
1159
1160 async fn consumer_cursor(
1161 &self,
1162 topic: &Topic,
1163 consumer: &ConsumerId,
1164 ) -> Result<Option<EventId>, LogError> {
1165 let connection = self
1166 .connection
1167 .lock()
1168 .expect("sqlite event log connection poisoned");
1169 connection
1170 .query_row(
1171 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1172 params![topic.as_str(), consumer.as_str()],
1173 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1174 )
1175 .optional()
1176 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1177 }
1178
1179 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1180 let connection = self
1181 .connection
1182 .lock()
1183 .expect("sqlite event log connection poisoned");
1184 connection
1185 .query_row(
1186 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1187 params![topic.as_str()],
1188 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1189 )
1190 .optional()
1191 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1192 }
1193
1194 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1195 let connection = self
1196 .connection
1197 .lock()
1198 .expect("sqlite event log connection poisoned");
1199 let before_sql = event_id_to_sqlite_i64(before)?;
1200 let removed = connection
1201 .execute(
1202 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1203 params![topic.as_str(), before_sql],
1204 )
1205 .map_err(|error| {
1206 LogError::Sqlite(format!("event log compact delete error: {error}"))
1207 })?;
1208 let remaining = connection
1209 .query_row(
1210 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1211 params![topic.as_str()],
1212 |row| row.get::<_, i64>(0),
1213 )
1214 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1215 .and_then(sqlite_i64_to_usize)?;
1216 let latest = connection
1217 .query_row(
1218 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1219 params![topic.as_str()],
1220 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1221 )
1222 .optional()
1223 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1224 connection
1225 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1226 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1227 Ok(CompactReport {
1228 removed,
1229 remaining,
1230 latest,
1231 checkpointed: true,
1232 })
1233 }
1234}
1235
1236fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1237 let candidate = PathBuf::from(value);
1238 if candidate.is_absolute() {
1239 candidate
1240 } else {
1241 base_dir.join(candidate)
1242 }
1243}
1244
1245fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1246 if let Some(parent) = path.parent() {
1247 std::fs::create_dir_all(parent)
1248 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1249 }
1250 let tmp = path.with_extension("tmp");
1251 let encoded = serde_json::to_vec_pretty(payload)
1252 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1253 std::fs::write(&tmp, encoded)
1254 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1255 std::fs::rename(&tmp, path)
1256 .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1257 Ok(())
1258}
1259
1260fn sanitize_filename(value: &str) -> String {
1261 sanitize_topic_component(value)
1262}
1263
1264pub fn sanitize_topic_component(value: &str) -> String {
1265 value
1266 .chars()
1267 .map(|ch| {
1268 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1269 ch
1270 } else {
1271 '_'
1272 }
1273 })
1274 .collect()
1275}
1276
1277fn dir_size_bytes(path: &Path) -> u64 {
1278 if !path.exists() {
1279 return 0;
1280 }
1281 let mut total = 0;
1282 if let Ok(entries) = std::fs::read_dir(path) {
1283 for entry in entries.flatten() {
1284 let path = entry.path();
1285 if path.is_dir() {
1286 total += dir_size_bytes(&path);
1287 } else if let Ok(metadata) = entry.metadata() {
1288 total += metadata.len();
1289 }
1290 }
1291 }
1292 total
1293}
1294
1295fn sqlite_size_bytes(path: &Path) -> u64 {
1296 let mut total = file_size(path);
1297 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1298 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1299 total
1300}
1301
1302fn file_size(path: &Path) -> u64 {
1303 std::fs::metadata(path)
1304 .map(|metadata| metadata.len())
1305 .unwrap_or(0)
1306}
1307
1308fn sync_tree(root: &Path) -> Result<(), LogError> {
1309 if !root.exists() {
1310 return Ok(());
1311 }
1312 for entry in std::fs::read_dir(root)
1313 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1314 {
1315 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1316 let path = entry.path();
1317 if path.is_dir() {
1318 sync_tree(&path)?;
1319 continue;
1320 }
1321 std::fs::File::open(&path)
1322 .and_then(|file| file.sync_all())
1323 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1324 }
1325 Ok(())
1326}
1327
1328fn now_ms() -> i64 {
1329 std::time::SystemTime::now()
1330 .duration_since(std::time::UNIX_EPOCH)
1331 .map(|duration| duration.as_millis() as i64)
1332 .unwrap_or(0)
1333}
1334
1335fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1336 i64::try_from(event_id)
1337 .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1338}
1339
1340fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1341 u64::try_from(value)
1342 .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1343}
1344
1345fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1346 u64::try_from(value).map_err(|_| {
1347 rusqlite::Error::FromSqlConversionFailure(
1348 std::mem::size_of::<i64>(),
1349 rusqlite::types::Type::Integer,
1350 "sqlite event id is negative".into(),
1351 )
1352 })
1353}
1354
1355fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1356 usize::try_from(value)
1357 .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use super::*;
1363 use futures::StreamExt;
1364 use rand::{rngs::StdRng, RngExt, SeedableRng};
1365
1366 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1367 let topic = Topic::new("trigger.inbox").unwrap();
1368 for i in 0..10_000 {
1369 log.append(
1370 &topic,
1371 LogEvent::new("append", serde_json::json!({ "i": i })),
1372 )
1373 .await
1374 .unwrap();
1375 }
1376 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1377 assert_eq!(events.len(), 10_000);
1378 assert_eq!(events.first().unwrap().0, 1);
1379 assert_eq!(events.last().unwrap().0, 10_000);
1380 }
1381
1382 #[tokio::test(flavor = "current_thread")]
1383 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1384 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1385 exercise_basic_backend(log.clone()).await;
1386
1387 let topic = Topic::new("agent.transcript.demo").unwrap();
1388 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1389 let first = log
1390 .append(
1391 &topic,
1392 LogEvent::new("message", serde_json::json!({"text":"one"})),
1393 )
1394 .await
1395 .unwrap();
1396 let second = log
1397 .append(
1398 &topic,
1399 LogEvent::new("message", serde_json::json!({"text":"two"})),
1400 )
1401 .await
1402 .unwrap();
1403 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1404 assert_eq!(seen[0].as_ref().unwrap().0, first);
1405 assert_eq!(seen[1].as_ref().unwrap().0, second);
1406
1407 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1408 .await
1409 .unwrap();
1410 let compact = log.compact(&topic, first).await.unwrap();
1411 assert_eq!(compact.removed, 1);
1412 assert_eq!(compact.remaining, 1);
1413 }
1414
1415 #[tokio::test(flavor = "current_thread")]
1416 async fn file_backend_persists_across_reopen_and_compacts() {
1417 let dir = tempfile::tempdir().unwrap();
1418 let topic = Topic::new("trigger.outbox").unwrap();
1419 let first_log = Arc::new(AnyEventLog::File(
1420 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1421 ));
1422 first_log
1423 .append(
1424 &topic,
1425 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1426 )
1427 .await
1428 .unwrap();
1429 first_log
1430 .append(
1431 &topic,
1432 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1433 )
1434 .await
1435 .unwrap();
1436 drop(first_log);
1437
1438 let reopened = Arc::new(AnyEventLog::File(
1439 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1440 ));
1441 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1442 assert_eq!(events.len(), 2);
1443 let compact = reopened.compact(&topic, 1).await.unwrap();
1444 assert_eq!(compact.removed, 1);
1445 assert_eq!(
1446 reopened
1447 .read_range(&topic, None, usize::MAX)
1448 .await
1449 .unwrap()
1450 .len(),
1451 1
1452 );
1453 }
1454
1455 #[tokio::test(flavor = "current_thread")]
1456 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1457 let dir = tempfile::tempdir().unwrap();
1458 let path = dir.path().join("events.sqlite");
1459 let topic = Topic::new("daemon.demo.state").unwrap();
1460 let first_log = Arc::new(AnyEventLog::Sqlite(
1461 SqliteEventLog::open(path.clone(), 8).unwrap(),
1462 ));
1463 first_log
1464 .append(
1465 &topic,
1466 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1467 )
1468 .await
1469 .unwrap();
1470 first_log
1471 .append(
1472 &topic,
1473 LogEvent::new("state", serde_json::json!({"state":"active"})),
1474 )
1475 .await
1476 .unwrap();
1477 drop(first_log);
1478
1479 let reopened = Arc::new(AnyEventLog::Sqlite(
1480 SqliteEventLog::open(path.clone(), 8).unwrap(),
1481 ));
1482 assert_eq!(
1483 reopened
1484 .read_range(&topic, None, usize::MAX)
1485 .await
1486 .unwrap()
1487 .len(),
1488 2
1489 );
1490 let compact = reopened.compact(&topic, 1).await.unwrap();
1491 assert!(compact.checkpointed);
1492 let wal = PathBuf::from(format!("{}-wal", path.display()));
1493 assert!(file_size(&wal) == 0 || !wal.exists());
1494 }
1495
1496 #[tokio::test(flavor = "current_thread")]
1497 async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1498 let (sender, rx) = broadcast::channel(2);
1499 for i in 0..10 {
1500 sender
1501 .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1502 .unwrap();
1503 }
1504 let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1505
1506 match stream.next().await {
1507 Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1508 other => panic!("subscriber should surface lag, got {other:?}"),
1509 }
1510 }
1511
1512 #[tokio::test(flavor = "current_thread")]
1513 async fn randomized_reader_sequences_stay_monotonic() {
1514 let log = Arc::new(MemoryEventLog::new(32));
1515 let topic = Topic::new("fuzz.demo").unwrap();
1516 let mut readers = vec![
1517 log.clone().subscribe(&topic, None).await.unwrap(),
1518 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1519 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1520 ];
1521 let mut rng = StdRng::seed_from_u64(7);
1522 for _ in 0..64 {
1523 let value = rng.random_range(0..1000);
1524 log.append(
1525 &topic,
1526 LogEvent::new("rand", serde_json::json!({"value": value})),
1527 )
1528 .await
1529 .unwrap();
1530 }
1531
1532 let mut sequences = Vec::new();
1533 for reader in &mut readers {
1534 let mut ids = Vec::new();
1535 while let Some(item) = reader.next().await {
1536 match item {
1537 Ok((event_id, _)) => {
1538 ids.push(event_id);
1539 if ids.len() >= 16 {
1540 break;
1541 }
1542 }
1543 Err(LogError::ConsumerLagged(_)) => break,
1544 Err(error) => panic!("unexpected subscription error: {error}"),
1545 }
1546 }
1547 sequences.push(ids);
1548 }
1549
1550 for ids in sequences {
1551 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1552 }
1553 }
1554}