1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use futures::stream::BoxStream;
9use rusqlite::{params, Connection, OptionalExtension};
10use serde::{Deserialize, Serialize};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type EventId = u64;
15
16pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
17pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
18pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
19pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
20
21#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
22pub struct Topic(String);
23
24impl Topic {
25 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
26 let value = value.into();
27 if value.is_empty() {
28 return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
29 }
30 if !value
31 .chars()
32 .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
33 {
34 return Err(LogError::InvalidTopic(format!(
35 "topic '{value}' contains unsupported characters"
36 )));
37 }
38 Ok(Self(value))
39 }
40
41 pub fn as_str(&self) -> &str {
42 &self.0
43 }
44}
45
46impl fmt::Display for Topic {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 self.0.fmt(f)
49 }
50}
51
52impl FromStr for Topic {
53 type Err = LogError;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 Self::new(s)
57 }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
61pub struct ConsumerId(String);
62
63impl ConsumerId {
64 pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
65 let value = value.into();
66 if value.trim().is_empty() {
67 return Err(LogError::InvalidConsumer(
68 "consumer id cannot be empty".to_string(),
69 ));
70 }
71 Ok(Self(value))
72 }
73
74 pub fn as_str(&self) -> &str {
75 &self.0
76 }
77}
78
79impl fmt::Display for ConsumerId {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 self.0.fmt(f)
82 }
83}
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum EventLogBackendKind {
88 Memory,
89 File,
90 Sqlite,
91}
92
93impl fmt::Display for EventLogBackendKind {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self {
96 Self::Memory => write!(f, "memory"),
97 Self::File => write!(f, "file"),
98 Self::Sqlite => write!(f, "sqlite"),
99 }
100 }
101}
102
103impl FromStr for EventLogBackendKind {
104 type Err = LogError;
105
106 fn from_str(value: &str) -> Result<Self, Self::Err> {
107 match value.trim().to_ascii_lowercase().as_str() {
108 "memory" => Ok(Self::Memory),
109 "file" => Ok(Self::File),
110 "sqlite" => Ok(Self::Sqlite),
111 other => Err(LogError::Config(format!(
112 "unsupported event log backend '{other}'"
113 ))),
114 }
115 }
116}
117
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
119pub struct LogEvent {
120 pub kind: String,
121 pub payload: serde_json::Value,
122 #[serde(default)]
123 pub headers: BTreeMap<String, String>,
124 pub occurred_at_ms: i64,
125}
126
127impl LogEvent {
128 pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
129 Self {
130 kind: kind.into(),
131 payload,
132 headers: BTreeMap::new(),
133 occurred_at_ms: now_ms(),
134 }
135 }
136
137 pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
138 self.headers = headers;
139 self
140 }
141}
142
143#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
144pub struct CompactReport {
145 pub removed: usize,
146 pub remaining: usize,
147 pub latest: Option<EventId>,
148 pub checkpointed: bool,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct EventLogDescription {
153 pub backend: EventLogBackendKind,
154 pub location: Option<PathBuf>,
155 pub size_bytes: Option<u64>,
156 pub queue_depth: usize,
157}
158
159#[derive(Debug)]
160pub enum LogError {
161 Config(String),
162 InvalidTopic(String),
163 InvalidConsumer(String),
164 Io(String),
165 Serde(String),
166 Sqlite(String),
167 ConsumerLagged(EventId),
168}
169
170impl fmt::Display for LogError {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 match self {
173 Self::Config(message)
174 | Self::InvalidTopic(message)
175 | Self::InvalidConsumer(message)
176 | Self::Io(message)
177 | Self::Serde(message)
178 | Self::Sqlite(message) => message.fmt(f),
179 Self::ConsumerLagged(last_id) => {
180 write!(f, "subscriber lagged behind after event {last_id}")
181 }
182 }
183 }
184}
185
186impl std::error::Error for LogError {}
187
188#[allow(async_fn_in_trait)]
189pub trait EventLog: Send + Sync {
190 fn describe(&self) -> EventLogDescription;
191
192 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
193
194 async fn flush(&self) -> Result<(), LogError>;
195
196 async fn read_range(
199 &self,
200 topic: &Topic,
201 from: Option<EventId>,
202 limit: usize,
203 ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
204
205 async fn subscribe(
208 self: Arc<Self>,
209 topic: &Topic,
210 from: Option<EventId>,
211 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
212
213 async fn ack(
214 &self,
215 topic: &Topic,
216 consumer: &ConsumerId,
217 up_to: EventId,
218 ) -> Result<(), LogError>;
219
220 async fn consumer_cursor(
221 &self,
222 topic: &Topic,
223 consumer: &ConsumerId,
224 ) -> Result<Option<EventId>, LogError>;
225
226 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
227
228 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
229}
230
231#[derive(Clone, Debug)]
232pub struct EventLogConfig {
233 pub backend: EventLogBackendKind,
234 pub file_dir: PathBuf,
235 pub sqlite_path: PathBuf,
236 pub queue_depth: usize,
237}
238
239impl EventLogConfig {
240 pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
241 let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
242 .ok()
243 .map(|value| value.parse())
244 .transpose()?
245 .unwrap_or(EventLogBackendKind::Sqlite);
246 let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
247 .ok()
248 .and_then(|value| value.parse::<usize>().ok())
249 .unwrap_or(128)
250 .max(1);
251
252 let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
253 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
254 _ => crate::runtime_paths::event_log_dir(base_dir),
255 };
256 let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
257 Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
258 _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
259 };
260
261 Ok(Self {
262 backend,
263 file_dir,
264 sqlite_path,
265 queue_depth,
266 })
267 }
268
269 pub fn location(&self) -> Option<PathBuf> {
270 match self.backend {
271 EventLogBackendKind::Memory => None,
272 EventLogBackendKind::File => Some(self.file_dir.clone()),
273 EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
274 }
275 }
276}
277
278thread_local! {
279 static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
280}
281
282pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
283 let config = EventLogConfig::for_base_dir(base_dir)?;
284 let log = open_event_log(&config)?;
285 ACTIVE_EVENT_LOG.with(|slot| {
286 *slot.borrow_mut() = Some(log.clone());
287 });
288 Ok(log)
289}
290
291pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
292 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
293 ACTIVE_EVENT_LOG.with(|slot| {
294 *slot.borrow_mut() = Some(log.clone());
295 });
296 log
297}
298
299pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
300 ACTIVE_EVENT_LOG.with(|slot| {
301 *slot.borrow_mut() = Some(log.clone());
302 });
303 log
304}
305
306pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
307 ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
308}
309
310pub fn reset_active_event_log() {
311 ACTIVE_EVENT_LOG.with(|slot| {
312 *slot.borrow_mut() = None;
313 });
314}
315
316pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
317 let config = EventLogConfig::for_base_dir(base_dir)?;
318 let description = match config.backend {
319 EventLogBackendKind::Memory => EventLogDescription {
320 backend: EventLogBackendKind::Memory,
321 location: None,
322 size_bytes: None,
323 queue_depth: config.queue_depth,
324 },
325 EventLogBackendKind::File => EventLogDescription {
326 backend: EventLogBackendKind::File,
327 size_bytes: Some(dir_size_bytes(&config.file_dir)),
328 location: Some(config.file_dir),
329 queue_depth: config.queue_depth,
330 },
331 EventLogBackendKind::Sqlite => EventLogDescription {
332 backend: EventLogBackendKind::Sqlite,
333 size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
334 location: Some(config.sqlite_path),
335 queue_depth: config.queue_depth,
336 },
337 };
338 Ok(description)
339}
340
341pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
342 match config.backend {
343 EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
344 config.queue_depth,
345 )))),
346 EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
347 config.file_dir.clone(),
348 config.queue_depth,
349 )?))),
350 EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
351 config.sqlite_path.clone(),
352 config.queue_depth,
353 )?))),
354 }
355}
356
357pub enum AnyEventLog {
358 Memory(MemoryEventLog),
359 File(FileEventLog),
360 Sqlite(SqliteEventLog),
361}
362
363impl EventLog for AnyEventLog {
364 fn describe(&self) -> EventLogDescription {
365 match self {
366 Self::Memory(log) => log.describe(),
367 Self::File(log) => log.describe(),
368 Self::Sqlite(log) => log.describe(),
369 }
370 }
371
372 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
373 match self {
374 Self::Memory(log) => log.append(topic, event).await,
375 Self::File(log) => log.append(topic, event).await,
376 Self::Sqlite(log) => log.append(topic, event).await,
377 }
378 }
379
380 async fn flush(&self) -> Result<(), LogError> {
381 match self {
382 Self::Memory(log) => log.flush().await,
383 Self::File(log) => log.flush().await,
384 Self::Sqlite(log) => log.flush().await,
385 }
386 }
387
388 async fn read_range(
389 &self,
390 topic: &Topic,
391 from: Option<EventId>,
392 limit: usize,
393 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
394 match self {
395 Self::Memory(log) => log.read_range(topic, from, limit).await,
396 Self::File(log) => log.read_range(topic, from, limit).await,
397 Self::Sqlite(log) => log.read_range(topic, from, limit).await,
398 }
399 }
400
401 async fn subscribe(
402 self: Arc<Self>,
403 topic: &Topic,
404 from: Option<EventId>,
405 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
406 let (rx, queue_depth) = match self.as_ref() {
407 Self::Memory(log) => (
408 log.broadcasts.subscribe(topic, log.queue_depth),
409 log.queue_depth,
410 ),
411 Self::File(log) => (
412 log.broadcasts.subscribe(topic, log.queue_depth),
413 log.queue_depth,
414 ),
415 Self::Sqlite(log) => (
416 log.broadcasts.subscribe(topic, log.queue_depth),
417 log.queue_depth,
418 ),
419 };
420 Ok(stream_from_broadcast(
421 self,
422 topic.clone(),
423 from,
424 rx,
425 queue_depth,
426 ))
427 }
428
429 async fn ack(
430 &self,
431 topic: &Topic,
432 consumer: &ConsumerId,
433 up_to: EventId,
434 ) -> Result<(), LogError> {
435 match self {
436 Self::Memory(log) => log.ack(topic, consumer, up_to).await,
437 Self::File(log) => log.ack(topic, consumer, up_to).await,
438 Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
439 }
440 }
441
442 async fn consumer_cursor(
443 &self,
444 topic: &Topic,
445 consumer: &ConsumerId,
446 ) -> Result<Option<EventId>, LogError> {
447 match self {
448 Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
449 Self::File(log) => log.consumer_cursor(topic, consumer).await,
450 Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
451 }
452 }
453
454 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
455 match self {
456 Self::Memory(log) => log.latest(topic).await,
457 Self::File(log) => log.latest(topic).await,
458 Self::Sqlite(log) => log.latest(topic).await,
459 }
460 }
461
462 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
463 match self {
464 Self::Memory(log) => log.compact(topic, before).await,
465 Self::File(log) => log.compact(topic, before).await,
466 Self::Sqlite(log) => log.compact(topic, before).await,
467 }
468 }
469}
470
471#[derive(Default)]
472struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
473
474impl BroadcastMap {
475 fn subscribe(
476 &self,
477 topic: &Topic,
478 capacity: usize,
479 ) -> broadcast::Receiver<(EventId, LogEvent)> {
480 self.sender(topic, capacity).subscribe()
481 }
482
483 fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
484 let _ = self.sender(topic, capacity).send(record);
485 }
486
487 fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
488 let mut map = self.0.lock().expect("event log broadcast map poisoned");
489 map.entry(topic.as_str().to_string())
490 .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
491 .clone()
492 }
493}
494
495fn stream_from_broadcast<L>(
496 log: Arc<L>,
497 topic: Topic,
498 from: Option<EventId>,
499 mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
500 queue_depth: usize,
501) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>>
502where
503 L: EventLog + 'static,
504{
505 let (tx, rx) = mpsc::channel(queue_depth.max(1));
506 std::thread::spawn(move || {
507 futures::executor::block_on(async move {
508 let history = match log.read_range(&topic, from, usize::MAX).await {
509 Ok(history) => history,
510 Err(error) => {
511 let _ = tx.send(Err(error)).await;
512 return;
513 }
514 };
515
516 let mut last_seen = from.unwrap_or(0);
517 for (event_id, event) in history {
518 last_seen = event_id;
519 if tx.send(Ok((event_id, event))).await.is_err() {
520 return;
521 }
522 }
523
524 loop {
525 match live_rx.recv().await {
526 Ok((event_id, event)) if event_id > last_seen => {
527 last_seen = event_id;
528 if tx.send(Ok((event_id, event))).await.is_err() {
529 return;
530 }
531 }
532 Ok(_) => {}
533 Err(broadcast::error::RecvError::Closed) => return,
534 Err(broadcast::error::RecvError::Lagged(_)) => {
535 let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
536 return;
537 }
538 }
539 }
540 });
541 });
542 Box::pin(ReceiverStream::new(rx))
543}
544
545#[derive(Default)]
546struct MemoryState {
547 topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
548 latest: HashMap<String, EventId>,
549 consumers: HashMap<(String, String), EventId>,
550}
551
552pub struct MemoryEventLog {
553 state: tokio::sync::Mutex<MemoryState>,
554 broadcasts: BroadcastMap,
555 queue_depth: usize,
556}
557
558impl MemoryEventLog {
559 pub fn new(queue_depth: usize) -> Self {
560 Self {
561 state: tokio::sync::Mutex::new(MemoryState::default()),
562 broadcasts: BroadcastMap::default(),
563 queue_depth: queue_depth.max(1),
564 }
565 }
566}
567
568impl EventLog for MemoryEventLog {
569 fn describe(&self) -> EventLogDescription {
570 EventLogDescription {
571 backend: EventLogBackendKind::Memory,
572 location: None,
573 size_bytes: None,
574 queue_depth: self.queue_depth,
575 }
576 }
577
578 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
579 let mut state = self.state.lock().await;
580 let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
581 state.latest.insert(topic.as_str().to_string(), event_id);
582 state
583 .topics
584 .entry(topic.as_str().to_string())
585 .or_default()
586 .push_back((event_id, event.clone()));
587 drop(state);
588 self.broadcasts
589 .publish(topic, self.queue_depth, (event_id, event));
590 Ok(event_id)
591 }
592
593 async fn flush(&self) -> Result<(), LogError> {
594 Ok(())
595 }
596
597 async fn read_range(
598 &self,
599 topic: &Topic,
600 from: Option<EventId>,
601 limit: usize,
602 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
603 let from = from.unwrap_or(0);
604 let state = self.state.lock().await;
605 let events = state
606 .topics
607 .get(topic.as_str())
608 .into_iter()
609 .flat_map(|events| events.iter())
610 .filter(|(event_id, _)| *event_id > from)
611 .take(limit)
612 .map(|(event_id, event)| (*event_id, event.clone()))
613 .collect();
614 Ok(events)
615 }
616
617 async fn subscribe(
618 self: Arc<Self>,
619 topic: &Topic,
620 from: Option<EventId>,
621 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
622 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
623 Ok(stream_from_broadcast(
624 self.clone(),
625 topic.clone(),
626 from,
627 rx,
628 self.queue_depth,
629 ))
630 }
631
632 async fn ack(
633 &self,
634 topic: &Topic,
635 consumer: &ConsumerId,
636 up_to: EventId,
637 ) -> Result<(), LogError> {
638 let mut state = self.state.lock().await;
639 state.consumers.insert(
640 (topic.as_str().to_string(), consumer.as_str().to_string()),
641 up_to,
642 );
643 Ok(())
644 }
645
646 async fn consumer_cursor(
647 &self,
648 topic: &Topic,
649 consumer: &ConsumerId,
650 ) -> Result<Option<EventId>, LogError> {
651 let state = self.state.lock().await;
652 Ok(state
653 .consumers
654 .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
655 .copied())
656 }
657
658 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
659 let state = self.state.lock().await;
660 Ok(state.latest.get(topic.as_str()).copied())
661 }
662
663 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
664 let mut state = self.state.lock().await;
665 let Some(events) = state.topics.get_mut(topic.as_str()) else {
666 return Ok(CompactReport::default());
667 };
668 let removed = events
669 .iter()
670 .take_while(|(event_id, _)| *event_id <= before)
671 .count();
672 for _ in 0..removed {
673 events.pop_front();
674 }
675 Ok(CompactReport {
676 removed,
677 remaining: events.len(),
678 latest: state.latest.get(topic.as_str()).copied(),
679 checkpointed: false,
680 })
681 }
682}
683
684#[derive(Serialize, Deserialize)]
685struct FileRecord {
686 id: EventId,
687 event: LogEvent,
688}
689
690pub struct FileEventLog {
691 root: PathBuf,
692 latest_ids: Mutex<HashMap<String, EventId>>,
693 write_lock: Mutex<()>,
694 broadcasts: BroadcastMap,
695 queue_depth: usize,
696}
697
698impl FileEventLog {
699 pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
700 std::fs::create_dir_all(root.join("topics"))
701 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
702 std::fs::create_dir_all(root.join("consumers"))
703 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
704 Ok(Self {
705 root,
706 latest_ids: Mutex::new(HashMap::new()),
707 write_lock: Mutex::new(()),
708 broadcasts: BroadcastMap::default(),
709 queue_depth: queue_depth.max(1),
710 })
711 }
712
713 fn topic_path(&self, topic: &Topic) -> PathBuf {
714 self.root
715 .join("topics")
716 .join(format!("{}.jsonl", topic.as_str()))
717 }
718
719 fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
720 self.root.join("consumers").join(format!(
721 "{}__{}.json",
722 topic.as_str(),
723 sanitize_filename(consumer.as_str())
724 ))
725 }
726
727 fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
728 if let Some(event_id) = self
729 .latest_ids
730 .lock()
731 .expect("file event log latest ids poisoned")
732 .get(topic.as_str())
733 .copied()
734 {
735 return Ok(event_id);
736 }
737
738 let mut latest = 0;
739 let path = self.topic_path(topic);
740 if path.is_file() {
741 let file = std::fs::File::open(&path)
742 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
743 for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
744 let line =
745 line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
746 let record: FileRecord = serde_json::from_str(&line)
747 .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
748 latest = record.id;
749 }
750 }
751 self.latest_ids
752 .lock()
753 .expect("file event log latest ids poisoned")
754 .insert(topic.as_str().to_string(), latest);
755 Ok(latest)
756 }
757
758 fn read_range_sync(
759 &self,
760 topic: &Topic,
761 from: Option<EventId>,
762 limit: usize,
763 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
764 let path = self.topic_path(topic);
765 if !path.is_file() {
766 return Ok(Vec::new());
767 }
768 let file = std::fs::File::open(&path)
769 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
770 let from = from.unwrap_or(0);
771 let mut events = Vec::new();
772 for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
773 let line =
774 line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
775 let record: FileRecord = serde_json::from_str(&line)
776 .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
777 if record.id > from {
778 events.push((record.id, record.event));
779 }
780 if events.len() >= limit {
781 break;
782 }
783 }
784 Ok(events)
785 }
786}
787
788impl EventLog for FileEventLog {
789 fn describe(&self) -> EventLogDescription {
790 EventLogDescription {
791 backend: EventLogBackendKind::File,
792 location: Some(self.root.clone()),
793 size_bytes: Some(dir_size_bytes(&self.root)),
794 queue_depth: self.queue_depth,
795 }
796 }
797
798 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
799 let _guard = self
800 .write_lock
801 .lock()
802 .expect("file event log write lock poisoned");
803 let next_id = self.latest_id_for_topic(topic)? + 1;
804 let record = FileRecord {
805 id: next_id,
806 event: event.clone(),
807 };
808 let path = self.topic_path(topic);
809 if let Some(parent) = path.parent() {
810 std::fs::create_dir_all(parent)
811 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
812 }
813 let line = serde_json::to_string(&record)
814 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
815 use std::io::Write as _;
816 let mut file = std::fs::OpenOptions::new()
817 .create(true)
818 .append(true)
819 .open(&path)
820 .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
821 writeln!(file, "{line}")
822 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
823 self.latest_ids
824 .lock()
825 .expect("file event log latest ids poisoned")
826 .insert(topic.as_str().to_string(), next_id);
827 self.broadcasts
828 .publish(topic, self.queue_depth, (next_id, event));
829 Ok(next_id)
830 }
831
832 async fn flush(&self) -> Result<(), LogError> {
833 sync_tree(&self.root)
834 }
835
836 async fn read_range(
837 &self,
838 topic: &Topic,
839 from: Option<EventId>,
840 limit: usize,
841 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
842 self.read_range_sync(topic, from, limit)
843 }
844
845 async fn subscribe(
846 self: Arc<Self>,
847 topic: &Topic,
848 from: Option<EventId>,
849 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
850 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
851 Ok(stream_from_broadcast(
852 self.clone(),
853 topic.clone(),
854 from,
855 rx,
856 self.queue_depth,
857 ))
858 }
859
860 async fn ack(
861 &self,
862 topic: &Topic,
863 consumer: &ConsumerId,
864 up_to: EventId,
865 ) -> Result<(), LogError> {
866 let path = self.consumer_path(topic, consumer);
867 let payload = serde_json::json!({
868 "topic": topic.as_str(),
869 "consumer_id": consumer.as_str(),
870 "cursor": up_to,
871 "updated_at_ms": now_ms(),
872 });
873 write_json_atomically(&path, &payload)
874 }
875
876 async fn consumer_cursor(
877 &self,
878 topic: &Topic,
879 consumer: &ConsumerId,
880 ) -> Result<Option<EventId>, LogError> {
881 let path = self.consumer_path(topic, consumer);
882 if !path.is_file() {
883 return Ok(None);
884 }
885 let raw = std::fs::read_to_string(&path)
886 .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
887 let payload: serde_json::Value = serde_json::from_str(&raw)
888 .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
889 let cursor = payload
890 .get("cursor")
891 .and_then(serde_json::Value::as_u64)
892 .ok_or_else(|| {
893 LogError::Serde("event log consumer record missing numeric cursor".to_string())
894 })?;
895 Ok(Some(cursor))
896 }
897
898 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
899 let latest = self.latest_id_for_topic(topic)?;
900 if latest == 0 {
901 Ok(None)
902 } else {
903 Ok(Some(latest))
904 }
905 }
906
907 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
908 let _guard = self
909 .write_lock
910 .lock()
911 .expect("file event log write lock poisoned");
912 let path = self.topic_path(topic);
913 if !path.is_file() {
914 return Ok(CompactReport::default());
915 }
916 let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
917 let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
918 let tmp = path.with_extension("jsonl.tmp");
919 if retained.is_empty() {
920 let _ = std::fs::remove_file(&path);
921 } else {
922 let mut writer =
923 std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
924 LogError::Io(format!("event log tmp create error: {error}"))
925 })?);
926 use std::io::Write as _;
927 for (event_id, event) in &retained {
928 let line = serde_json::to_string(&FileRecord {
929 id: *event_id,
930 event: event.clone(),
931 })
932 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
933 writeln!(writer, "{line}")
934 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
935 }
936 writer
937 .flush()
938 .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
939 std::fs::rename(&tmp, &path).map_err(|error| {
940 LogError::Io(format!("event log compact finalize error: {error}"))
941 })?;
942 }
943 let latest = retained.last().map(|(event_id, _)| *event_id);
944 self.latest_ids
945 .lock()
946 .expect("file event log latest ids poisoned")
947 .insert(topic.as_str().to_string(), latest.unwrap_or(0));
948 Ok(CompactReport {
949 removed,
950 remaining: retained.len(),
951 latest,
952 checkpointed: false,
953 })
954 }
955}
956
957pub struct SqliteEventLog {
958 path: PathBuf,
959 connection: Mutex<Connection>,
960 broadcasts: BroadcastMap,
961 queue_depth: usize,
962}
963
964impl SqliteEventLog {
965 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
966 if let Some(parent) = path.parent() {
967 std::fs::create_dir_all(parent)
968 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
969 }
970 let connection = Connection::open(&path)
971 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
972 connection
979 .busy_timeout(std::time::Duration::from_secs(5))
980 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
981 connection
982 .pragma_update(None, "journal_mode", "WAL")
983 .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
984 connection
985 .pragma_update(None, "synchronous", "NORMAL")
986 .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
987 connection
988 .execute_batch(
989 "CREATE TABLE IF NOT EXISTS topic_heads (
990 topic TEXT PRIMARY KEY,
991 last_id INTEGER NOT NULL
992 );
993 CREATE TABLE IF NOT EXISTS events (
994 topic TEXT NOT NULL,
995 event_id INTEGER NOT NULL,
996 kind TEXT NOT NULL,
997 payload TEXT NOT NULL,
998 headers TEXT NOT NULL,
999 occurred_at_ms INTEGER NOT NULL,
1000 PRIMARY KEY (topic, event_id)
1001 );
1002 CREATE TABLE IF NOT EXISTS consumers (
1003 topic TEXT NOT NULL,
1004 consumer_id TEXT NOT NULL,
1005 cursor INTEGER NOT NULL,
1006 updated_at_ms INTEGER NOT NULL,
1007 PRIMARY KEY (topic, consumer_id)
1008 );",
1009 )
1010 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1011 Ok(Self {
1012 path,
1013 connection: Mutex::new(connection),
1014 broadcasts: BroadcastMap::default(),
1015 queue_depth: queue_depth.max(1),
1016 })
1017 }
1018}
1019
1020impl EventLog for SqliteEventLog {
1021 fn describe(&self) -> EventLogDescription {
1022 EventLogDescription {
1023 backend: EventLogBackendKind::Sqlite,
1024 location: Some(self.path.clone()),
1025 size_bytes: Some(sqlite_size_bytes(&self.path)),
1026 queue_depth: self.queue_depth,
1027 }
1028 }
1029
1030 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1031 let mut connection = self
1032 .connection
1033 .lock()
1034 .expect("sqlite event log connection poisoned");
1035 let tx = connection
1036 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1037 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1038 tx.execute(
1039 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1040 params![topic.as_str()],
1041 )
1042 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1043 tx.execute(
1044 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1045 params![topic.as_str()],
1046 )
1047 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1048 let event_id: EventId = tx
1049 .query_row(
1050 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1051 params![topic.as_str()],
1052 |row| row.get(0),
1053 )
1054 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))?;
1055 tx.execute(
1056 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1057 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1058 params![
1059 topic.as_str(),
1060 event_id,
1061 event.kind,
1062 serde_json::to_string(&event.payload).map_err(|error| LogError::Serde(format!(
1063 "event log payload encode error: {error}"
1064 )))?,
1065 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1066 "event log headers encode error: {error}"
1067 )))?,
1068 event.occurred_at_ms
1069 ],
1070 )
1071 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1072 tx.commit()
1073 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1074 self.broadcasts
1075 .publish(topic, self.queue_depth, (event_id, event.clone()));
1076 Ok(event_id)
1077 }
1078
1079 async fn flush(&self) -> Result<(), LogError> {
1080 let connection = self
1081 .connection
1082 .lock()
1083 .expect("sqlite event log connection poisoned");
1084 connection
1085 .execute_batch("PRAGMA wal_checkpoint(FULL);")
1086 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1087 Ok(())
1088 }
1089
1090 async fn read_range(
1091 &self,
1092 topic: &Topic,
1093 from: Option<EventId>,
1094 limit: usize,
1095 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1096 let connection = self
1097 .connection
1098 .lock()
1099 .expect("sqlite event log connection poisoned");
1100 let mut statement = connection
1101 .prepare(
1102 "SELECT event_id, kind, payload, headers, occurred_at_ms
1103 FROM events
1104 WHERE topic = ?1 AND event_id > ?2
1105 ORDER BY event_id ASC
1106 LIMIT ?3",
1107 )
1108 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1109 let rows = statement
1110 .query_map(
1111 params![topic.as_str(), from.unwrap_or(0), limit as i64],
1112 |row| {
1113 let payload: String = row.get(2)?;
1114 let headers: String = row.get(3)?;
1115 Ok((
1116 row.get::<_, EventId>(0)?,
1117 LogEvent {
1118 kind: row.get(1)?,
1119 payload: serde_json::from_str(&payload).map_err(|error| {
1120 rusqlite::Error::FromSqlConversionFailure(
1121 payload.len(),
1122 rusqlite::types::Type::Text,
1123 Box::new(error),
1124 )
1125 })?,
1126 headers: serde_json::from_str(&headers).map_err(|error| {
1127 rusqlite::Error::FromSqlConversionFailure(
1128 headers.len(),
1129 rusqlite::types::Type::Text,
1130 Box::new(error),
1131 )
1132 })?,
1133 occurred_at_ms: row.get(4)?,
1134 },
1135 ))
1136 },
1137 )
1138 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1139 let mut events = Vec::new();
1140 for row in rows {
1141 events.push(
1142 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1143 );
1144 }
1145 Ok(events)
1146 }
1147
1148 async fn subscribe(
1149 self: Arc<Self>,
1150 topic: &Topic,
1151 from: Option<EventId>,
1152 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1153 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1154 Ok(stream_from_broadcast(
1155 self.clone(),
1156 topic.clone(),
1157 from,
1158 rx,
1159 self.queue_depth,
1160 ))
1161 }
1162
1163 async fn ack(
1164 &self,
1165 topic: &Topic,
1166 consumer: &ConsumerId,
1167 up_to: EventId,
1168 ) -> Result<(), LogError> {
1169 let connection = self
1170 .connection
1171 .lock()
1172 .expect("sqlite event log connection poisoned");
1173 connection
1174 .execute(
1175 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1176 VALUES (?1, ?2, ?3, ?4)
1177 ON CONFLICT(topic, consumer_id)
1178 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1179 params![topic.as_str(), consumer.as_str(), up_to, now_ms()],
1180 )
1181 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1182 Ok(())
1183 }
1184
1185 async fn consumer_cursor(
1186 &self,
1187 topic: &Topic,
1188 consumer: &ConsumerId,
1189 ) -> Result<Option<EventId>, LogError> {
1190 let connection = self
1191 .connection
1192 .lock()
1193 .expect("sqlite event log connection poisoned");
1194 connection
1195 .query_row(
1196 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1197 params![topic.as_str(), consumer.as_str()],
1198 |row| row.get::<_, EventId>(0),
1199 )
1200 .optional()
1201 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1202 }
1203
1204 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1205 let connection = self
1206 .connection
1207 .lock()
1208 .expect("sqlite event log connection poisoned");
1209 connection
1210 .query_row(
1211 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1212 params![topic.as_str()],
1213 |row| row.get::<_, EventId>(0),
1214 )
1215 .optional()
1216 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1217 }
1218
1219 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1220 let connection = self
1221 .connection
1222 .lock()
1223 .expect("sqlite event log connection poisoned");
1224 let removed = connection
1225 .execute(
1226 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1227 params![topic.as_str(), before],
1228 )
1229 .map_err(|error| {
1230 LogError::Sqlite(format!("event log compact delete error: {error}"))
1231 })?;
1232 let remaining: usize = connection
1233 .query_row(
1234 "SELECT COUNT(*) FROM events WHERE topic = ?1",
1235 params![topic.as_str()],
1236 |row| row.get(0),
1237 )
1238 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))?;
1239 let latest = connection
1240 .query_row(
1241 "SELECT last_id FROM topic_heads WHERE topic = ?1",
1242 params![topic.as_str()],
1243 |row| row.get::<_, EventId>(0),
1244 )
1245 .optional()
1246 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1247 connection
1248 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1249 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1250 Ok(CompactReport {
1251 removed,
1252 remaining,
1253 latest,
1254 checkpointed: true,
1255 })
1256 }
1257}
1258
1259fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1260 let candidate = PathBuf::from(value);
1261 if candidate.is_absolute() {
1262 candidate
1263 } else {
1264 base_dir.join(candidate)
1265 }
1266}
1267
1268fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1269 if let Some(parent) = path.parent() {
1270 std::fs::create_dir_all(parent)
1271 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1272 }
1273 let tmp = path.with_extension("tmp");
1274 let encoded = serde_json::to_vec_pretty(payload)
1275 .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1276 std::fs::write(&tmp, encoded)
1277 .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1278 std::fs::rename(&tmp, path)
1279 .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1280 Ok(())
1281}
1282
1283fn sanitize_filename(value: &str) -> String {
1284 sanitize_topic_component(value)
1285}
1286
1287pub fn sanitize_topic_component(value: &str) -> String {
1288 value
1289 .chars()
1290 .map(|ch| {
1291 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1292 ch
1293 } else {
1294 '_'
1295 }
1296 })
1297 .collect()
1298}
1299
1300fn dir_size_bytes(path: &Path) -> u64 {
1301 if !path.exists() {
1302 return 0;
1303 }
1304 let mut total = 0;
1305 if let Ok(entries) = std::fs::read_dir(path) {
1306 for entry in entries.flatten() {
1307 let path = entry.path();
1308 if path.is_dir() {
1309 total += dir_size_bytes(&path);
1310 } else if let Ok(metadata) = entry.metadata() {
1311 total += metadata.len();
1312 }
1313 }
1314 }
1315 total
1316}
1317
1318fn sqlite_size_bytes(path: &Path) -> u64 {
1319 let mut total = file_size(path);
1320 total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1321 total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1322 total
1323}
1324
1325fn file_size(path: &Path) -> u64 {
1326 std::fs::metadata(path)
1327 .map(|metadata| metadata.len())
1328 .unwrap_or(0)
1329}
1330
1331fn sync_tree(root: &Path) -> Result<(), LogError> {
1332 if !root.exists() {
1333 return Ok(());
1334 }
1335 for entry in std::fs::read_dir(root)
1336 .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1337 {
1338 let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1339 let path = entry.path();
1340 if path.is_dir() {
1341 sync_tree(&path)?;
1342 continue;
1343 }
1344 std::fs::File::open(&path)
1345 .and_then(|file| file.sync_all())
1346 .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1347 }
1348 Ok(())
1349}
1350
1351fn now_ms() -> i64 {
1352 std::time::SystemTime::now()
1353 .duration_since(std::time::UNIX_EPOCH)
1354 .map(|duration| duration.as_millis() as i64)
1355 .unwrap_or(0)
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361 use futures::StreamExt;
1362 use rand::{rngs::StdRng, RngExt, SeedableRng};
1363
1364 async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1365 let topic = Topic::new("trigger.inbox").unwrap();
1366 for i in 0..10_000 {
1367 log.append(
1368 &topic,
1369 LogEvent::new("append", serde_json::json!({ "i": i })),
1370 )
1371 .await
1372 .unwrap();
1373 }
1374 let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1375 assert_eq!(events.len(), 10_000);
1376 assert_eq!(events.first().unwrap().0, 1);
1377 assert_eq!(events.last().unwrap().0, 10_000);
1378 }
1379
1380 #[tokio::test(flavor = "current_thread")]
1381 async fn memory_backend_supports_append_read_subscribe_and_compact() {
1382 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1383 exercise_basic_backend(log.clone()).await;
1384
1385 let topic = Topic::new("agent.transcript.demo").unwrap();
1386 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1387 let first = log
1388 .append(
1389 &topic,
1390 LogEvent::new("message", serde_json::json!({"text":"one"})),
1391 )
1392 .await
1393 .unwrap();
1394 let second = log
1395 .append(
1396 &topic,
1397 LogEvent::new("message", serde_json::json!({"text":"two"})),
1398 )
1399 .await
1400 .unwrap();
1401 let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1402 assert_eq!(seen[0].as_ref().unwrap().0, first);
1403 assert_eq!(seen[1].as_ref().unwrap().0, second);
1404
1405 log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1406 .await
1407 .unwrap();
1408 let compact = log.compact(&topic, first).await.unwrap();
1409 assert_eq!(compact.removed, 1);
1410 assert_eq!(compact.remaining, 1);
1411 }
1412
1413 #[tokio::test(flavor = "current_thread")]
1414 async fn file_backend_persists_across_reopen_and_compacts() {
1415 let dir = tempfile::tempdir().unwrap();
1416 let topic = Topic::new("trigger.outbox").unwrap();
1417 let first_log = Arc::new(AnyEventLog::File(
1418 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1419 ));
1420 first_log
1421 .append(
1422 &topic,
1423 LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1424 )
1425 .await
1426 .unwrap();
1427 first_log
1428 .append(
1429 &topic,
1430 LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1431 )
1432 .await
1433 .unwrap();
1434 drop(first_log);
1435
1436 let reopened = Arc::new(AnyEventLog::File(
1437 FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1438 ));
1439 let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1440 assert_eq!(events.len(), 2);
1441 let compact = reopened.compact(&topic, 1).await.unwrap();
1442 assert_eq!(compact.removed, 1);
1443 assert_eq!(
1444 reopened
1445 .read_range(&topic, None, usize::MAX)
1446 .await
1447 .unwrap()
1448 .len(),
1449 1
1450 );
1451 }
1452
1453 #[tokio::test(flavor = "current_thread")]
1454 async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1455 let dir = tempfile::tempdir().unwrap();
1456 let path = dir.path().join("events.sqlite");
1457 let topic = Topic::new("daemon.demo.state").unwrap();
1458 let first_log = Arc::new(AnyEventLog::Sqlite(
1459 SqliteEventLog::open(path.clone(), 8).unwrap(),
1460 ));
1461 first_log
1462 .append(
1463 &topic,
1464 LogEvent::new("state", serde_json::json!({"state":"idle"})),
1465 )
1466 .await
1467 .unwrap();
1468 first_log
1469 .append(
1470 &topic,
1471 LogEvent::new("state", serde_json::json!({"state":"active"})),
1472 )
1473 .await
1474 .unwrap();
1475 drop(first_log);
1476
1477 let reopened = Arc::new(AnyEventLog::Sqlite(
1478 SqliteEventLog::open(path.clone(), 8).unwrap(),
1479 ));
1480 assert_eq!(
1481 reopened
1482 .read_range(&topic, None, usize::MAX)
1483 .await
1484 .unwrap()
1485 .len(),
1486 2
1487 );
1488 let compact = reopened.compact(&topic, 1).await.unwrap();
1489 assert!(compact.checkpointed);
1490 let wal = PathBuf::from(format!("{}-wal", path.display()));
1491 assert!(file_size(&wal) == 0 || !wal.exists());
1492 }
1493
1494 #[tokio::test(flavor = "current_thread")]
1499 #[ignore]
1500 async fn subscriber_reports_lag_when_broadcast_buffer_overflows() {
1501 let log = Arc::new(MemoryEventLog::new(2));
1502 let topic = Topic::new("lag.demo").unwrap();
1503 let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1504 for i in 0..10 {
1505 log.append(&topic, LogEvent::new("tick", serde_json::json!({"i": i})))
1506 .await
1507 .unwrap();
1508 }
1509 let mut saw_lag = false;
1510 for _ in 0..4 {
1511 match stream.next().await {
1512 Some(Err(LogError::ConsumerLagged(_))) => {
1513 saw_lag = true;
1514 break;
1515 }
1516 Some(_) => {}
1517 None => break,
1518 }
1519 }
1520 assert!(saw_lag, "subscriber should surface lag");
1521 }
1522
1523 #[tokio::test(flavor = "current_thread")]
1524 async fn randomized_reader_sequences_stay_monotonic() {
1525 let log = Arc::new(MemoryEventLog::new(32));
1526 let topic = Topic::new("fuzz.demo").unwrap();
1527 let mut readers = vec![
1528 log.clone().subscribe(&topic, None).await.unwrap(),
1529 log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1530 log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1531 ];
1532 let mut rng = StdRng::seed_from_u64(7);
1533 for _ in 0..64 {
1534 let value = rng.random_range(0..1000);
1535 log.append(
1536 &topic,
1537 LogEvent::new("rand", serde_json::json!({"value": value})),
1538 )
1539 .await
1540 .unwrap();
1541 }
1542
1543 let mut sequences = Vec::new();
1544 for reader in &mut readers {
1545 let mut ids = Vec::new();
1546 while let Some(item) = reader.next().await {
1547 match item {
1548 Ok((event_id, _)) => {
1549 ids.push(event_id);
1550 if ids.len() >= 16 {
1551 break;
1552 }
1553 }
1554 Err(LogError::ConsumerLagged(_)) => break,
1555 Err(error) => panic!("unexpected subscription error: {error}"),
1556 }
1557 }
1558 sequences.push(ids);
1559 }
1560
1561 for ids in sequences {
1562 assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1563 }
1564 }
1565}