Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use futures::stream::BoxStream;
9use rusqlite::{params, Connection, OptionalExtension};
10use serde::{Deserialize, Serialize};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type EventId = u64;
15
16pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
17pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
18pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
19pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
20
21#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
22pub struct Topic(String);
23
24impl Topic {
25    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
26        let value = value.into();
27        if value.is_empty() {
28            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
29        }
30        if !value
31            .chars()
32            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
33        {
34            return Err(LogError::InvalidTopic(format!(
35                "topic '{value}' contains unsupported characters"
36            )));
37        }
38        Ok(Self(value))
39    }
40
41    pub fn as_str(&self) -> &str {
42        &self.0
43    }
44}
45
46impl fmt::Display for Topic {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        self.0.fmt(f)
49    }
50}
51
52impl FromStr for Topic {
53    type Err = LogError;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        Self::new(s)
57    }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
61pub struct ConsumerId(String);
62
63impl ConsumerId {
64    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
65        let value = value.into();
66        if value.trim().is_empty() {
67            return Err(LogError::InvalidConsumer(
68                "consumer id cannot be empty".to_string(),
69            ));
70        }
71        Ok(Self(value))
72    }
73
74    pub fn as_str(&self) -> &str {
75        &self.0
76    }
77}
78
79impl fmt::Display for ConsumerId {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        self.0.fmt(f)
82    }
83}
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum EventLogBackendKind {
88    Memory,
89    File,
90    Sqlite,
91}
92
93impl fmt::Display for EventLogBackendKind {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self {
96            Self::Memory => write!(f, "memory"),
97            Self::File => write!(f, "file"),
98            Self::Sqlite => write!(f, "sqlite"),
99        }
100    }
101}
102
103impl FromStr for EventLogBackendKind {
104    type Err = LogError;
105
106    fn from_str(value: &str) -> Result<Self, Self::Err> {
107        match value.trim().to_ascii_lowercase().as_str() {
108            "memory" => Ok(Self::Memory),
109            "file" => Ok(Self::File),
110            "sqlite" => Ok(Self::Sqlite),
111            other => Err(LogError::Config(format!(
112                "unsupported event log backend '{other}'"
113            ))),
114        }
115    }
116}
117
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
119pub struct LogEvent {
120    pub kind: String,
121    pub payload: serde_json::Value,
122    #[serde(default)]
123    pub headers: BTreeMap<String, String>,
124    pub occurred_at_ms: i64,
125}
126
127impl LogEvent {
128    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
129        Self {
130            kind: kind.into(),
131            payload,
132            headers: BTreeMap::new(),
133            occurred_at_ms: now_ms(),
134        }
135    }
136
137    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
138        self.headers = headers;
139        self
140    }
141}
142
143#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
144pub struct CompactReport {
145    pub removed: usize,
146    pub remaining: usize,
147    pub latest: Option<EventId>,
148    pub checkpointed: bool,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct EventLogDescription {
153    pub backend: EventLogBackendKind,
154    pub location: Option<PathBuf>,
155    pub size_bytes: Option<u64>,
156    pub queue_depth: usize,
157}
158
159#[derive(Debug)]
160pub enum LogError {
161    Config(String),
162    InvalidTopic(String),
163    InvalidConsumer(String),
164    Io(String),
165    Serde(String),
166    Sqlite(String),
167    ConsumerLagged(EventId),
168}
169
170impl fmt::Display for LogError {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        match self {
173            Self::Config(message)
174            | Self::InvalidTopic(message)
175            | Self::InvalidConsumer(message)
176            | Self::Io(message)
177            | Self::Serde(message)
178            | Self::Sqlite(message) => message.fmt(f),
179            Self::ConsumerLagged(last_id) => {
180                write!(f, "subscriber lagged behind after event {last_id}")
181            }
182        }
183    }
184}
185
186impl std::error::Error for LogError {}
187
188#[allow(async_fn_in_trait)]
189pub trait EventLog: Send + Sync {
190    fn describe(&self) -> EventLogDescription;
191
192    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
193
194    async fn flush(&self) -> Result<(), LogError>;
195
196    /// Read events strictly after `from`. `None` starts from the
197    /// beginning of the topic.
198    async fn read_range(
199        &self,
200        topic: &Topic,
201        from: Option<EventId>,
202        limit: usize,
203    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
204
205    /// `async fn` keeps the ergonomic generic surface; the boxed stream
206    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
207    async fn subscribe(
208        self: Arc<Self>,
209        topic: &Topic,
210        from: Option<EventId>,
211    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
212
213    async fn ack(
214        &self,
215        topic: &Topic,
216        consumer: &ConsumerId,
217        up_to: EventId,
218    ) -> Result<(), LogError>;
219
220    async fn consumer_cursor(
221        &self,
222        topic: &Topic,
223        consumer: &ConsumerId,
224    ) -> Result<Option<EventId>, LogError>;
225
226    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
227
228    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
229}
230
231#[derive(Clone, Debug)]
232pub struct EventLogConfig {
233    pub backend: EventLogBackendKind,
234    pub file_dir: PathBuf,
235    pub sqlite_path: PathBuf,
236    pub queue_depth: usize,
237}
238
239impl EventLogConfig {
240    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
241        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
242            .ok()
243            .map(|value| value.parse())
244            .transpose()?
245            .unwrap_or(EventLogBackendKind::Sqlite);
246        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
247            .ok()
248            .and_then(|value| value.parse::<usize>().ok())
249            .unwrap_or(128)
250            .max(1);
251
252        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
253            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
254            _ => crate::runtime_paths::event_log_dir(base_dir),
255        };
256        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
257            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
258            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
259        };
260
261        Ok(Self {
262            backend,
263            file_dir,
264            sqlite_path,
265            queue_depth,
266        })
267    }
268
269    pub fn location(&self) -> Option<PathBuf> {
270        match self.backend {
271            EventLogBackendKind::Memory => None,
272            EventLogBackendKind::File => Some(self.file_dir.clone()),
273            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
274        }
275    }
276}
277
278thread_local! {
279    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
280}
281
282pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
283    let config = EventLogConfig::for_base_dir(base_dir)?;
284    let log = open_event_log(&config)?;
285    ACTIVE_EVENT_LOG.with(|slot| {
286        *slot.borrow_mut() = Some(log.clone());
287    });
288    Ok(log)
289}
290
291pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
292    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
293    ACTIVE_EVENT_LOG.with(|slot| {
294        *slot.borrow_mut() = Some(log.clone());
295    });
296    log
297}
298
299pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
300    ACTIVE_EVENT_LOG.with(|slot| {
301        *slot.borrow_mut() = Some(log.clone());
302    });
303    log
304}
305
306pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
307    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
308}
309
310pub fn reset_active_event_log() {
311    ACTIVE_EVENT_LOG.with(|slot| {
312        *slot.borrow_mut() = None;
313    });
314}
315
316pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
317    let config = EventLogConfig::for_base_dir(base_dir)?;
318    let description = match config.backend {
319        EventLogBackendKind::Memory => EventLogDescription {
320            backend: EventLogBackendKind::Memory,
321            location: None,
322            size_bytes: None,
323            queue_depth: config.queue_depth,
324        },
325        EventLogBackendKind::File => EventLogDescription {
326            backend: EventLogBackendKind::File,
327            size_bytes: Some(dir_size_bytes(&config.file_dir)),
328            location: Some(config.file_dir),
329            queue_depth: config.queue_depth,
330        },
331        EventLogBackendKind::Sqlite => EventLogDescription {
332            backend: EventLogBackendKind::Sqlite,
333            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
334            location: Some(config.sqlite_path),
335            queue_depth: config.queue_depth,
336        },
337    };
338    Ok(description)
339}
340
341pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
342    match config.backend {
343        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
344            config.queue_depth,
345        )))),
346        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
347            config.file_dir.clone(),
348            config.queue_depth,
349        )?))),
350        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
351            config.sqlite_path.clone(),
352            config.queue_depth,
353        )?))),
354    }
355}
356
357pub enum AnyEventLog {
358    Memory(MemoryEventLog),
359    File(FileEventLog),
360    Sqlite(SqliteEventLog),
361}
362
363impl EventLog for AnyEventLog {
364    fn describe(&self) -> EventLogDescription {
365        match self {
366            Self::Memory(log) => log.describe(),
367            Self::File(log) => log.describe(),
368            Self::Sqlite(log) => log.describe(),
369        }
370    }
371
372    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
373        match self {
374            Self::Memory(log) => log.append(topic, event).await,
375            Self::File(log) => log.append(topic, event).await,
376            Self::Sqlite(log) => log.append(topic, event).await,
377        }
378    }
379
380    async fn flush(&self) -> Result<(), LogError> {
381        match self {
382            Self::Memory(log) => log.flush().await,
383            Self::File(log) => log.flush().await,
384            Self::Sqlite(log) => log.flush().await,
385        }
386    }
387
388    async fn read_range(
389        &self,
390        topic: &Topic,
391        from: Option<EventId>,
392        limit: usize,
393    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
394        match self {
395            Self::Memory(log) => log.read_range(topic, from, limit).await,
396            Self::File(log) => log.read_range(topic, from, limit).await,
397            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
398        }
399    }
400
401    async fn subscribe(
402        self: Arc<Self>,
403        topic: &Topic,
404        from: Option<EventId>,
405    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
406        let (rx, queue_depth) = match self.as_ref() {
407            Self::Memory(log) => (
408                log.broadcasts.subscribe(topic, log.queue_depth),
409                log.queue_depth,
410            ),
411            Self::File(log) => (
412                log.broadcasts.subscribe(topic, log.queue_depth),
413                log.queue_depth,
414            ),
415            Self::Sqlite(log) => (
416                log.broadcasts.subscribe(topic, log.queue_depth),
417                log.queue_depth,
418            ),
419        };
420        Ok(stream_from_broadcast(
421            self,
422            topic.clone(),
423            from,
424            rx,
425            queue_depth,
426        ))
427    }
428
429    async fn ack(
430        &self,
431        topic: &Topic,
432        consumer: &ConsumerId,
433        up_to: EventId,
434    ) -> Result<(), LogError> {
435        match self {
436            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
437            Self::File(log) => log.ack(topic, consumer, up_to).await,
438            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
439        }
440    }
441
442    async fn consumer_cursor(
443        &self,
444        topic: &Topic,
445        consumer: &ConsumerId,
446    ) -> Result<Option<EventId>, LogError> {
447        match self {
448            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
449            Self::File(log) => log.consumer_cursor(topic, consumer).await,
450            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
451        }
452    }
453
454    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
455        match self {
456            Self::Memory(log) => log.latest(topic).await,
457            Self::File(log) => log.latest(topic).await,
458            Self::Sqlite(log) => log.latest(topic).await,
459        }
460    }
461
462    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
463        match self {
464            Self::Memory(log) => log.compact(topic, before).await,
465            Self::File(log) => log.compact(topic, before).await,
466            Self::Sqlite(log) => log.compact(topic, before).await,
467        }
468    }
469}
470
471#[derive(Default)]
472struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
473
474impl BroadcastMap {
475    fn subscribe(
476        &self,
477        topic: &Topic,
478        capacity: usize,
479    ) -> broadcast::Receiver<(EventId, LogEvent)> {
480        self.sender(topic, capacity).subscribe()
481    }
482
483    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
484        let _ = self.sender(topic, capacity).send(record);
485    }
486
487    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
488        let mut map = self.0.lock().expect("event log broadcast map poisoned");
489        map.entry(topic.as_str().to_string())
490            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
491            .clone()
492    }
493}
494
495fn stream_from_broadcast<L>(
496    log: Arc<L>,
497    topic: Topic,
498    from: Option<EventId>,
499    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
500    queue_depth: usize,
501) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>>
502where
503    L: EventLog + 'static,
504{
505    let (tx, rx) = mpsc::channel(queue_depth.max(1));
506    std::thread::spawn(move || {
507        futures::executor::block_on(async move {
508            let history = match log.read_range(&topic, from, usize::MAX).await {
509                Ok(history) => history,
510                Err(error) => {
511                    let _ = tx.send(Err(error)).await;
512                    return;
513                }
514            };
515
516            let mut last_seen = from.unwrap_or(0);
517            for (event_id, event) in history {
518                last_seen = event_id;
519                if tx.send(Ok((event_id, event))).await.is_err() {
520                    return;
521                }
522            }
523
524            loop {
525                match live_rx.recv().await {
526                    Ok((event_id, event)) if event_id > last_seen => {
527                        last_seen = event_id;
528                        if tx.send(Ok((event_id, event))).await.is_err() {
529                            return;
530                        }
531                    }
532                    Ok(_) => {}
533                    Err(broadcast::error::RecvError::Closed) => return,
534                    Err(broadcast::error::RecvError::Lagged(_)) => {
535                        let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
536                        return;
537                    }
538                }
539            }
540        });
541    });
542    Box::pin(ReceiverStream::new(rx))
543}
544
545#[derive(Default)]
546struct MemoryState {
547    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
548    latest: HashMap<String, EventId>,
549    consumers: HashMap<(String, String), EventId>,
550}
551
552pub struct MemoryEventLog {
553    state: tokio::sync::Mutex<MemoryState>,
554    broadcasts: BroadcastMap,
555    queue_depth: usize,
556}
557
558impl MemoryEventLog {
559    pub fn new(queue_depth: usize) -> Self {
560        Self {
561            state: tokio::sync::Mutex::new(MemoryState::default()),
562            broadcasts: BroadcastMap::default(),
563            queue_depth: queue_depth.max(1),
564        }
565    }
566}
567
568impl EventLog for MemoryEventLog {
569    fn describe(&self) -> EventLogDescription {
570        EventLogDescription {
571            backend: EventLogBackendKind::Memory,
572            location: None,
573            size_bytes: None,
574            queue_depth: self.queue_depth,
575        }
576    }
577
578    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
579        let mut state = self.state.lock().await;
580        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
581        state.latest.insert(topic.as_str().to_string(), event_id);
582        state
583            .topics
584            .entry(topic.as_str().to_string())
585            .or_default()
586            .push_back((event_id, event.clone()));
587        drop(state);
588        self.broadcasts
589            .publish(topic, self.queue_depth, (event_id, event));
590        Ok(event_id)
591    }
592
593    async fn flush(&self) -> Result<(), LogError> {
594        Ok(())
595    }
596
597    async fn read_range(
598        &self,
599        topic: &Topic,
600        from: Option<EventId>,
601        limit: usize,
602    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
603        let from = from.unwrap_or(0);
604        let state = self.state.lock().await;
605        let events = state
606            .topics
607            .get(topic.as_str())
608            .into_iter()
609            .flat_map(|events| events.iter())
610            .filter(|(event_id, _)| *event_id > from)
611            .take(limit)
612            .map(|(event_id, event)| (*event_id, event.clone()))
613            .collect();
614        Ok(events)
615    }
616
617    async fn subscribe(
618        self: Arc<Self>,
619        topic: &Topic,
620        from: Option<EventId>,
621    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
622        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
623        Ok(stream_from_broadcast(
624            self.clone(),
625            topic.clone(),
626            from,
627            rx,
628            self.queue_depth,
629        ))
630    }
631
632    async fn ack(
633        &self,
634        topic: &Topic,
635        consumer: &ConsumerId,
636        up_to: EventId,
637    ) -> Result<(), LogError> {
638        let mut state = self.state.lock().await;
639        state.consumers.insert(
640            (topic.as_str().to_string(), consumer.as_str().to_string()),
641            up_to,
642        );
643        Ok(())
644    }
645
646    async fn consumer_cursor(
647        &self,
648        topic: &Topic,
649        consumer: &ConsumerId,
650    ) -> Result<Option<EventId>, LogError> {
651        let state = self.state.lock().await;
652        Ok(state
653            .consumers
654            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
655            .copied())
656    }
657
658    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
659        let state = self.state.lock().await;
660        Ok(state.latest.get(topic.as_str()).copied())
661    }
662
663    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
664        let mut state = self.state.lock().await;
665        let Some(events) = state.topics.get_mut(topic.as_str()) else {
666            return Ok(CompactReport::default());
667        };
668        let removed = events
669            .iter()
670            .take_while(|(event_id, _)| *event_id <= before)
671            .count();
672        for _ in 0..removed {
673            events.pop_front();
674        }
675        Ok(CompactReport {
676            removed,
677            remaining: events.len(),
678            latest: state.latest.get(topic.as_str()).copied(),
679            checkpointed: false,
680        })
681    }
682}
683
684#[derive(Serialize, Deserialize)]
685struct FileRecord {
686    id: EventId,
687    event: LogEvent,
688}
689
690pub struct FileEventLog {
691    root: PathBuf,
692    latest_ids: Mutex<HashMap<String, EventId>>,
693    write_lock: Mutex<()>,
694    broadcasts: BroadcastMap,
695    queue_depth: usize,
696}
697
698impl FileEventLog {
699    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
700        std::fs::create_dir_all(root.join("topics"))
701            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
702        std::fs::create_dir_all(root.join("consumers"))
703            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
704        Ok(Self {
705            root,
706            latest_ids: Mutex::new(HashMap::new()),
707            write_lock: Mutex::new(()),
708            broadcasts: BroadcastMap::default(),
709            queue_depth: queue_depth.max(1),
710        })
711    }
712
713    fn topic_path(&self, topic: &Topic) -> PathBuf {
714        self.root
715            .join("topics")
716            .join(format!("{}.jsonl", topic.as_str()))
717    }
718
719    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
720        self.root.join("consumers").join(format!(
721            "{}__{}.json",
722            topic.as_str(),
723            sanitize_filename(consumer.as_str())
724        ))
725    }
726
727    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
728        if let Some(event_id) = self
729            .latest_ids
730            .lock()
731            .expect("file event log latest ids poisoned")
732            .get(topic.as_str())
733            .copied()
734        {
735            return Ok(event_id);
736        }
737
738        let mut latest = 0;
739        let path = self.topic_path(topic);
740        if path.is_file() {
741            let file = std::fs::File::open(&path)
742                .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
743            for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
744                let line =
745                    line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
746                let record: FileRecord = serde_json::from_str(&line)
747                    .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
748                latest = record.id;
749            }
750        }
751        self.latest_ids
752            .lock()
753            .expect("file event log latest ids poisoned")
754            .insert(topic.as_str().to_string(), latest);
755        Ok(latest)
756    }
757
758    fn read_range_sync(
759        &self,
760        topic: &Topic,
761        from: Option<EventId>,
762        limit: usize,
763    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
764        let path = self.topic_path(topic);
765        if !path.is_file() {
766            return Ok(Vec::new());
767        }
768        let file = std::fs::File::open(&path)
769            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
770        let from = from.unwrap_or(0);
771        let mut events = Vec::new();
772        for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
773            let line =
774                line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
775            let record: FileRecord = serde_json::from_str(&line)
776                .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
777            if record.id > from {
778                events.push((record.id, record.event));
779            }
780            if events.len() >= limit {
781                break;
782            }
783        }
784        Ok(events)
785    }
786}
787
788impl EventLog for FileEventLog {
789    fn describe(&self) -> EventLogDescription {
790        EventLogDescription {
791            backend: EventLogBackendKind::File,
792            location: Some(self.root.clone()),
793            size_bytes: Some(dir_size_bytes(&self.root)),
794            queue_depth: self.queue_depth,
795        }
796    }
797
798    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
799        let _guard = self
800            .write_lock
801            .lock()
802            .expect("file event log write lock poisoned");
803        let next_id = self.latest_id_for_topic(topic)? + 1;
804        let record = FileRecord {
805            id: next_id,
806            event: event.clone(),
807        };
808        let path = self.topic_path(topic);
809        if let Some(parent) = path.parent() {
810            std::fs::create_dir_all(parent)
811                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
812        }
813        let line = serde_json::to_string(&record)
814            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
815        use std::io::Write as _;
816        let mut file = std::fs::OpenOptions::new()
817            .create(true)
818            .append(true)
819            .open(&path)
820            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
821        writeln!(file, "{line}")
822            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
823        self.latest_ids
824            .lock()
825            .expect("file event log latest ids poisoned")
826            .insert(topic.as_str().to_string(), next_id);
827        self.broadcasts
828            .publish(topic, self.queue_depth, (next_id, event));
829        Ok(next_id)
830    }
831
832    async fn flush(&self) -> Result<(), LogError> {
833        sync_tree(&self.root)
834    }
835
836    async fn read_range(
837        &self,
838        topic: &Topic,
839        from: Option<EventId>,
840        limit: usize,
841    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
842        self.read_range_sync(topic, from, limit)
843    }
844
845    async fn subscribe(
846        self: Arc<Self>,
847        topic: &Topic,
848        from: Option<EventId>,
849    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
850        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
851        Ok(stream_from_broadcast(
852            self.clone(),
853            topic.clone(),
854            from,
855            rx,
856            self.queue_depth,
857        ))
858    }
859
860    async fn ack(
861        &self,
862        topic: &Topic,
863        consumer: &ConsumerId,
864        up_to: EventId,
865    ) -> Result<(), LogError> {
866        let path = self.consumer_path(topic, consumer);
867        let payload = serde_json::json!({
868            "topic": topic.as_str(),
869            "consumer_id": consumer.as_str(),
870            "cursor": up_to,
871            "updated_at_ms": now_ms(),
872        });
873        write_json_atomically(&path, &payload)
874    }
875
876    async fn consumer_cursor(
877        &self,
878        topic: &Topic,
879        consumer: &ConsumerId,
880    ) -> Result<Option<EventId>, LogError> {
881        let path = self.consumer_path(topic, consumer);
882        if !path.is_file() {
883            return Ok(None);
884        }
885        let raw = std::fs::read_to_string(&path)
886            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
887        let payload: serde_json::Value = serde_json::from_str(&raw)
888            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
889        let cursor = payload
890            .get("cursor")
891            .and_then(serde_json::Value::as_u64)
892            .ok_or_else(|| {
893                LogError::Serde("event log consumer record missing numeric cursor".to_string())
894            })?;
895        Ok(Some(cursor))
896    }
897
898    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
899        let latest = self.latest_id_for_topic(topic)?;
900        if latest == 0 {
901            Ok(None)
902        } else {
903            Ok(Some(latest))
904        }
905    }
906
907    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
908        let _guard = self
909            .write_lock
910            .lock()
911            .expect("file event log write lock poisoned");
912        let path = self.topic_path(topic);
913        if !path.is_file() {
914            return Ok(CompactReport::default());
915        }
916        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
917        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
918        let tmp = path.with_extension("jsonl.tmp");
919        if retained.is_empty() {
920            let _ = std::fs::remove_file(&path);
921        } else {
922            let mut writer =
923                std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
924                    LogError::Io(format!("event log tmp create error: {error}"))
925                })?);
926            use std::io::Write as _;
927            for (event_id, event) in &retained {
928                let line = serde_json::to_string(&FileRecord {
929                    id: *event_id,
930                    event: event.clone(),
931                })
932                .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
933                writeln!(writer, "{line}")
934                    .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
935            }
936            writer
937                .flush()
938                .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
939            std::fs::rename(&tmp, &path).map_err(|error| {
940                LogError::Io(format!("event log compact finalize error: {error}"))
941            })?;
942        }
943        let latest = retained.last().map(|(event_id, _)| *event_id);
944        self.latest_ids
945            .lock()
946            .expect("file event log latest ids poisoned")
947            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
948        Ok(CompactReport {
949            removed,
950            remaining: retained.len(),
951            latest,
952            checkpointed: false,
953        })
954    }
955}
956
957pub struct SqliteEventLog {
958    path: PathBuf,
959    connection: Mutex<Connection>,
960    broadcasts: BroadcastMap,
961    queue_depth: usize,
962}
963
964impl SqliteEventLog {
965    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
966        if let Some(parent) = path.parent() {
967            std::fs::create_dir_all(parent)
968                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
969        }
970        let connection = Connection::open(&path)
971            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
972        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
973        // SQLITE_BUSY from a previous test's connection that hasn't finished
974        // dropping yet (parallel `cargo test` on the same process, distinct
975        // paths, still contends on SQLite's own global mutex under WAL-mode
976        // promotion). Without this, `journal_mode = WAL` fails fast with
977        // "database is locked" instead of retrying.
978        connection
979            .busy_timeout(std::time::Duration::from_secs(5))
980            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
981        connection
982            .pragma_update(None, "journal_mode", "WAL")
983            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
984        connection
985            .pragma_update(None, "synchronous", "NORMAL")
986            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
987        connection
988            .execute_batch(
989                "CREATE TABLE IF NOT EXISTS topic_heads (
990                    topic TEXT PRIMARY KEY,
991                    last_id INTEGER NOT NULL
992                );
993                CREATE TABLE IF NOT EXISTS events (
994                    topic TEXT NOT NULL,
995                    event_id INTEGER NOT NULL,
996                    kind TEXT NOT NULL,
997                    payload TEXT NOT NULL,
998                    headers TEXT NOT NULL,
999                    occurred_at_ms INTEGER NOT NULL,
1000                    PRIMARY KEY (topic, event_id)
1001                );
1002                CREATE TABLE IF NOT EXISTS consumers (
1003                    topic TEXT NOT NULL,
1004                    consumer_id TEXT NOT NULL,
1005                    cursor INTEGER NOT NULL,
1006                    updated_at_ms INTEGER NOT NULL,
1007                    PRIMARY KEY (topic, consumer_id)
1008                );",
1009            )
1010            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1011        Ok(Self {
1012            path,
1013            connection: Mutex::new(connection),
1014            broadcasts: BroadcastMap::default(),
1015            queue_depth: queue_depth.max(1),
1016        })
1017    }
1018}
1019
1020impl EventLog for SqliteEventLog {
1021    fn describe(&self) -> EventLogDescription {
1022        EventLogDescription {
1023            backend: EventLogBackendKind::Sqlite,
1024            location: Some(self.path.clone()),
1025            size_bytes: Some(sqlite_size_bytes(&self.path)),
1026            queue_depth: self.queue_depth,
1027        }
1028    }
1029
1030    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1031        let mut connection = self
1032            .connection
1033            .lock()
1034            .expect("sqlite event log connection poisoned");
1035        let tx = connection
1036            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1037            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1038        tx.execute(
1039            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1040            params![topic.as_str()],
1041        )
1042        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1043        tx.execute(
1044            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1045            params![topic.as_str()],
1046        )
1047        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1048        let event_id: EventId = tx
1049            .query_row(
1050                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1051                params![topic.as_str()],
1052                |row| row.get(0),
1053            )
1054            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))?;
1055        tx.execute(
1056            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1057             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1058            params![
1059                topic.as_str(),
1060                event_id,
1061                event.kind,
1062                serde_json::to_string(&event.payload).map_err(|error| LogError::Serde(format!(
1063                    "event log payload encode error: {error}"
1064                )))?,
1065                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1066                    "event log headers encode error: {error}"
1067                )))?,
1068                event.occurred_at_ms
1069            ],
1070        )
1071        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1072        tx.commit()
1073            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1074        self.broadcasts
1075            .publish(topic, self.queue_depth, (event_id, event.clone()));
1076        Ok(event_id)
1077    }
1078
1079    async fn flush(&self) -> Result<(), LogError> {
1080        let connection = self
1081            .connection
1082            .lock()
1083            .expect("sqlite event log connection poisoned");
1084        connection
1085            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1086            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1087        Ok(())
1088    }
1089
1090    async fn read_range(
1091        &self,
1092        topic: &Topic,
1093        from: Option<EventId>,
1094        limit: usize,
1095    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1096        let connection = self
1097            .connection
1098            .lock()
1099            .expect("sqlite event log connection poisoned");
1100        let mut statement = connection
1101            .prepare(
1102                "SELECT event_id, kind, payload, headers, occurred_at_ms
1103                 FROM events
1104                 WHERE topic = ?1 AND event_id > ?2
1105                 ORDER BY event_id ASC
1106                 LIMIT ?3",
1107            )
1108            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1109        let rows = statement
1110            .query_map(
1111                params![topic.as_str(), from.unwrap_or(0), limit as i64],
1112                |row| {
1113                    let payload: String = row.get(2)?;
1114                    let headers: String = row.get(3)?;
1115                    Ok((
1116                        row.get::<_, EventId>(0)?,
1117                        LogEvent {
1118                            kind: row.get(1)?,
1119                            payload: serde_json::from_str(&payload).map_err(|error| {
1120                                rusqlite::Error::FromSqlConversionFailure(
1121                                    payload.len(),
1122                                    rusqlite::types::Type::Text,
1123                                    Box::new(error),
1124                                )
1125                            })?,
1126                            headers: serde_json::from_str(&headers).map_err(|error| {
1127                                rusqlite::Error::FromSqlConversionFailure(
1128                                    headers.len(),
1129                                    rusqlite::types::Type::Text,
1130                                    Box::new(error),
1131                                )
1132                            })?,
1133                            occurred_at_ms: row.get(4)?,
1134                        },
1135                    ))
1136                },
1137            )
1138            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1139        let mut events = Vec::new();
1140        for row in rows {
1141            events.push(
1142                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1143            );
1144        }
1145        Ok(events)
1146    }
1147
1148    async fn subscribe(
1149        self: Arc<Self>,
1150        topic: &Topic,
1151        from: Option<EventId>,
1152    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1153        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1154        Ok(stream_from_broadcast(
1155            self.clone(),
1156            topic.clone(),
1157            from,
1158            rx,
1159            self.queue_depth,
1160        ))
1161    }
1162
1163    async fn ack(
1164        &self,
1165        topic: &Topic,
1166        consumer: &ConsumerId,
1167        up_to: EventId,
1168    ) -> Result<(), LogError> {
1169        let connection = self
1170            .connection
1171            .lock()
1172            .expect("sqlite event log connection poisoned");
1173        connection
1174            .execute(
1175                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1176                 VALUES (?1, ?2, ?3, ?4)
1177                 ON CONFLICT(topic, consumer_id)
1178                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1179                params![topic.as_str(), consumer.as_str(), up_to, now_ms()],
1180            )
1181            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1182        Ok(())
1183    }
1184
1185    async fn consumer_cursor(
1186        &self,
1187        topic: &Topic,
1188        consumer: &ConsumerId,
1189    ) -> Result<Option<EventId>, LogError> {
1190        let connection = self
1191            .connection
1192            .lock()
1193            .expect("sqlite event log connection poisoned");
1194        connection
1195            .query_row(
1196                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1197                params![topic.as_str(), consumer.as_str()],
1198                |row| row.get::<_, EventId>(0),
1199            )
1200            .optional()
1201            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1202    }
1203
1204    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1205        let connection = self
1206            .connection
1207            .lock()
1208            .expect("sqlite event log connection poisoned");
1209        connection
1210            .query_row(
1211                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1212                params![topic.as_str()],
1213                |row| row.get::<_, EventId>(0),
1214            )
1215            .optional()
1216            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1217    }
1218
1219    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1220        let connection = self
1221            .connection
1222            .lock()
1223            .expect("sqlite event log connection poisoned");
1224        let removed = connection
1225            .execute(
1226                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1227                params![topic.as_str(), before],
1228            )
1229            .map_err(|error| {
1230                LogError::Sqlite(format!("event log compact delete error: {error}"))
1231            })?;
1232        let remaining: usize = connection
1233            .query_row(
1234                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1235                params![topic.as_str()],
1236                |row| row.get(0),
1237            )
1238            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))?;
1239        let latest = connection
1240            .query_row(
1241                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1242                params![topic.as_str()],
1243                |row| row.get::<_, EventId>(0),
1244            )
1245            .optional()
1246            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1247        connection
1248            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1249            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1250        Ok(CompactReport {
1251            removed,
1252            remaining,
1253            latest,
1254            checkpointed: true,
1255        })
1256    }
1257}
1258
1259fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1260    let candidate = PathBuf::from(value);
1261    if candidate.is_absolute() {
1262        candidate
1263    } else {
1264        base_dir.join(candidate)
1265    }
1266}
1267
1268fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1269    if let Some(parent) = path.parent() {
1270        std::fs::create_dir_all(parent)
1271            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1272    }
1273    let tmp = path.with_extension("tmp");
1274    let encoded = serde_json::to_vec_pretty(payload)
1275        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1276    std::fs::write(&tmp, encoded)
1277        .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1278    std::fs::rename(&tmp, path)
1279        .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1280    Ok(())
1281}
1282
1283fn sanitize_filename(value: &str) -> String {
1284    sanitize_topic_component(value)
1285}
1286
1287pub fn sanitize_topic_component(value: &str) -> String {
1288    value
1289        .chars()
1290        .map(|ch| {
1291            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1292                ch
1293            } else {
1294                '_'
1295            }
1296        })
1297        .collect()
1298}
1299
1300fn dir_size_bytes(path: &Path) -> u64 {
1301    if !path.exists() {
1302        return 0;
1303    }
1304    let mut total = 0;
1305    if let Ok(entries) = std::fs::read_dir(path) {
1306        for entry in entries.flatten() {
1307            let path = entry.path();
1308            if path.is_dir() {
1309                total += dir_size_bytes(&path);
1310            } else if let Ok(metadata) = entry.metadata() {
1311                total += metadata.len();
1312            }
1313        }
1314    }
1315    total
1316}
1317
1318fn sqlite_size_bytes(path: &Path) -> u64 {
1319    let mut total = file_size(path);
1320    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1321    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1322    total
1323}
1324
1325fn file_size(path: &Path) -> u64 {
1326    std::fs::metadata(path)
1327        .map(|metadata| metadata.len())
1328        .unwrap_or(0)
1329}
1330
1331fn sync_tree(root: &Path) -> Result<(), LogError> {
1332    if !root.exists() {
1333        return Ok(());
1334    }
1335    for entry in std::fs::read_dir(root)
1336        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1337    {
1338        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1339        let path = entry.path();
1340        if path.is_dir() {
1341            sync_tree(&path)?;
1342            continue;
1343        }
1344        std::fs::File::open(&path)
1345            .and_then(|file| file.sync_all())
1346            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1347    }
1348    Ok(())
1349}
1350
1351fn now_ms() -> i64 {
1352    std::time::SystemTime::now()
1353        .duration_since(std::time::UNIX_EPOCH)
1354        .map(|duration| duration.as_millis() as i64)
1355        .unwrap_or(0)
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361    use futures::StreamExt;
1362    use rand::{rngs::StdRng, RngExt, SeedableRng};
1363
1364    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1365        let topic = Topic::new("trigger.inbox").unwrap();
1366        for i in 0..10_000 {
1367            log.append(
1368                &topic,
1369                LogEvent::new("append", serde_json::json!({ "i": i })),
1370            )
1371            .await
1372            .unwrap();
1373        }
1374        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1375        assert_eq!(events.len(), 10_000);
1376        assert_eq!(events.first().unwrap().0, 1);
1377        assert_eq!(events.last().unwrap().0, 10_000);
1378    }
1379
1380    #[tokio::test(flavor = "current_thread")]
1381    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1382        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1383        exercise_basic_backend(log.clone()).await;
1384
1385        let topic = Topic::new("agent.transcript.demo").unwrap();
1386        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1387        let first = log
1388            .append(
1389                &topic,
1390                LogEvent::new("message", serde_json::json!({"text":"one"})),
1391            )
1392            .await
1393            .unwrap();
1394        let second = log
1395            .append(
1396                &topic,
1397                LogEvent::new("message", serde_json::json!({"text":"two"})),
1398            )
1399            .await
1400            .unwrap();
1401        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1402        assert_eq!(seen[0].as_ref().unwrap().0, first);
1403        assert_eq!(seen[1].as_ref().unwrap().0, second);
1404
1405        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1406            .await
1407            .unwrap();
1408        let compact = log.compact(&topic, first).await.unwrap();
1409        assert_eq!(compact.removed, 1);
1410        assert_eq!(compact.remaining, 1);
1411    }
1412
1413    #[tokio::test(flavor = "current_thread")]
1414    async fn file_backend_persists_across_reopen_and_compacts() {
1415        let dir = tempfile::tempdir().unwrap();
1416        let topic = Topic::new("trigger.outbox").unwrap();
1417        let first_log = Arc::new(AnyEventLog::File(
1418            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1419        ));
1420        first_log
1421            .append(
1422                &topic,
1423                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1424            )
1425            .await
1426            .unwrap();
1427        first_log
1428            .append(
1429                &topic,
1430                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1431            )
1432            .await
1433            .unwrap();
1434        drop(first_log);
1435
1436        let reopened = Arc::new(AnyEventLog::File(
1437            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1438        ));
1439        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1440        assert_eq!(events.len(), 2);
1441        let compact = reopened.compact(&topic, 1).await.unwrap();
1442        assert_eq!(compact.removed, 1);
1443        assert_eq!(
1444            reopened
1445                .read_range(&topic, None, usize::MAX)
1446                .await
1447                .unwrap()
1448                .len(),
1449            1
1450        );
1451    }
1452
1453    #[tokio::test(flavor = "current_thread")]
1454    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1455        let dir = tempfile::tempdir().unwrap();
1456        let path = dir.path().join("events.sqlite");
1457        let topic = Topic::new("daemon.demo.state").unwrap();
1458        let first_log = Arc::new(AnyEventLog::Sqlite(
1459            SqliteEventLog::open(path.clone(), 8).unwrap(),
1460        ));
1461        first_log
1462            .append(
1463                &topic,
1464                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1465            )
1466            .await
1467            .unwrap();
1468        first_log
1469            .append(
1470                &topic,
1471                LogEvent::new("state", serde_json::json!({"state":"active"})),
1472            )
1473            .await
1474            .unwrap();
1475        drop(first_log);
1476
1477        let reopened = Arc::new(AnyEventLog::Sqlite(
1478            SqliteEventLog::open(path.clone(), 8).unwrap(),
1479        ));
1480        assert_eq!(
1481            reopened
1482                .read_range(&topic, None, usize::MAX)
1483                .await
1484                .unwrap()
1485                .len(),
1486            2
1487        );
1488        let compact = reopened.compact(&topic, 1).await.unwrap();
1489        assert!(compact.checkpointed);
1490        let wal = PathBuf::from(format!("{}-wal", path.display()));
1491        assert!(file_size(&wal) == 0 || !wal.exists());
1492    }
1493
1494    // CI-flaky: the broadcast buffer's lag detection is timing-dependent and
1495    // passes reliably under local single-threaded runtimes but occasionally
1496    // observes all 10 ticks before the lag signal surfaces under Linux CI
1497    // runners. Ignored on CI; still runs under `cargo test -- --ignored`.
1498    #[tokio::test(flavor = "current_thread")]
1499    #[ignore]
1500    async fn subscriber_reports_lag_when_broadcast_buffer_overflows() {
1501        let log = Arc::new(MemoryEventLog::new(2));
1502        let topic = Topic::new("lag.demo").unwrap();
1503        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1504        for i in 0..10 {
1505            log.append(&topic, LogEvent::new("tick", serde_json::json!({"i": i})))
1506                .await
1507                .unwrap();
1508        }
1509        let mut saw_lag = false;
1510        for _ in 0..4 {
1511            match stream.next().await {
1512                Some(Err(LogError::ConsumerLagged(_))) => {
1513                    saw_lag = true;
1514                    break;
1515                }
1516                Some(_) => {}
1517                None => break,
1518            }
1519        }
1520        assert!(saw_lag, "subscriber should surface lag");
1521    }
1522
1523    #[tokio::test(flavor = "current_thread")]
1524    async fn randomized_reader_sequences_stay_monotonic() {
1525        let log = Arc::new(MemoryEventLog::new(32));
1526        let topic = Topic::new("fuzz.demo").unwrap();
1527        let mut readers = vec![
1528            log.clone().subscribe(&topic, None).await.unwrap(),
1529            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1530            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1531        ];
1532        let mut rng = StdRng::seed_from_u64(7);
1533        for _ in 0..64 {
1534            let value = rng.random_range(0..1000);
1535            log.append(
1536                &topic,
1537                LogEvent::new("rand", serde_json::json!({"value": value})),
1538            )
1539            .await
1540            .unwrap();
1541        }
1542
1543        let mut sequences = Vec::new();
1544        for reader in &mut readers {
1545            let mut ids = Vec::new();
1546            while let Some(item) = reader.next().await {
1547                match item {
1548                    Ok((event_id, _)) => {
1549                        ids.push(event_id);
1550                        if ids.len() >= 16 {
1551                            break;
1552                        }
1553                    }
1554                    Err(LogError::ConsumerLagged(_)) => break,
1555                    Err(error) => panic!("unexpected subscription error: {error}"),
1556                }
1557            }
1558            sequences.push(ids);
1559        }
1560
1561        for ids in sequences {
1562            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1563        }
1564    }
1565}