Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use futures::stream::BoxStream;
9use rusqlite::{params, Connection, OptionalExtension};
10use serde::{Deserialize, Serialize};
11use tokio::sync::{broadcast, mpsc};
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type EventId = u64;
15
16pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
17pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
18pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
19pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
20
21#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
22pub struct Topic(String);
23
24impl Topic {
25    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
26        let value = value.into();
27        if value.is_empty() {
28            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
29        }
30        if !value
31            .chars()
32            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
33        {
34            return Err(LogError::InvalidTopic(format!(
35                "topic '{value}' contains unsupported characters"
36            )));
37        }
38        Ok(Self(value))
39    }
40
41    pub fn as_str(&self) -> &str {
42        &self.0
43    }
44}
45
46impl fmt::Display for Topic {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        self.0.fmt(f)
49    }
50}
51
52impl FromStr for Topic {
53    type Err = LogError;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        Self::new(s)
57    }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
61pub struct ConsumerId(String);
62
63impl ConsumerId {
64    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
65        let value = value.into();
66        if value.trim().is_empty() {
67            return Err(LogError::InvalidConsumer(
68                "consumer id cannot be empty".to_string(),
69            ));
70        }
71        Ok(Self(value))
72    }
73
74    pub fn as_str(&self) -> &str {
75        &self.0
76    }
77}
78
79impl fmt::Display for ConsumerId {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        self.0.fmt(f)
82    }
83}
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum EventLogBackendKind {
88    Memory,
89    File,
90    Sqlite,
91}
92
93impl fmt::Display for EventLogBackendKind {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self {
96            Self::Memory => write!(f, "memory"),
97            Self::File => write!(f, "file"),
98            Self::Sqlite => write!(f, "sqlite"),
99        }
100    }
101}
102
103impl FromStr for EventLogBackendKind {
104    type Err = LogError;
105
106    fn from_str(value: &str) -> Result<Self, Self::Err> {
107        match value.trim().to_ascii_lowercase().as_str() {
108            "memory" => Ok(Self::Memory),
109            "file" => Ok(Self::File),
110            "sqlite" => Ok(Self::Sqlite),
111            other => Err(LogError::Config(format!(
112                "unsupported event log backend '{other}'"
113            ))),
114        }
115    }
116}
117
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
119pub struct LogEvent {
120    pub kind: String,
121    pub payload: serde_json::Value,
122    #[serde(default)]
123    pub headers: BTreeMap<String, String>,
124    pub occurred_at_ms: i64,
125}
126
127impl LogEvent {
128    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
129        Self {
130            kind: kind.into(),
131            payload,
132            headers: BTreeMap::new(),
133            occurred_at_ms: now_ms(),
134        }
135    }
136
137    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
138        self.headers = headers;
139        self
140    }
141}
142
143#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
144pub struct CompactReport {
145    pub removed: usize,
146    pub remaining: usize,
147    pub latest: Option<EventId>,
148    pub checkpointed: bool,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct EventLogDescription {
153    pub backend: EventLogBackendKind,
154    pub location: Option<PathBuf>,
155    pub size_bytes: Option<u64>,
156    pub queue_depth: usize,
157}
158
159#[derive(Debug)]
160pub enum LogError {
161    Config(String),
162    InvalidTopic(String),
163    InvalidConsumer(String),
164    Io(String),
165    Serde(String),
166    Sqlite(String),
167    ConsumerLagged(EventId),
168}
169
170impl fmt::Display for LogError {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        match self {
173            Self::Config(message)
174            | Self::InvalidTopic(message)
175            | Self::InvalidConsumer(message)
176            | Self::Io(message)
177            | Self::Serde(message)
178            | Self::Sqlite(message) => message.fmt(f),
179            Self::ConsumerLagged(last_id) => {
180                write!(f, "subscriber lagged behind after event {last_id}")
181            }
182        }
183    }
184}
185
186impl std::error::Error for LogError {}
187
188#[allow(async_fn_in_trait)]
189pub trait EventLog: Send + Sync {
190    fn describe(&self) -> EventLogDescription;
191
192    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
193
194    async fn flush(&self) -> Result<(), LogError>;
195
196    /// Read events strictly after `from`. `None` starts from the
197    /// beginning of the topic.
198    async fn read_range(
199        &self,
200        topic: &Topic,
201        from: Option<EventId>,
202        limit: usize,
203    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
204
205    /// `async fn` keeps the ergonomic generic surface; the boxed stream
206    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
207    async fn subscribe(
208        self: Arc<Self>,
209        topic: &Topic,
210        from: Option<EventId>,
211    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
212
213    async fn ack(
214        &self,
215        topic: &Topic,
216        consumer: &ConsumerId,
217        up_to: EventId,
218    ) -> Result<(), LogError>;
219
220    async fn consumer_cursor(
221        &self,
222        topic: &Topic,
223        consumer: &ConsumerId,
224    ) -> Result<Option<EventId>, LogError>;
225
226    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
227
228    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
229}
230
231#[derive(Clone, Debug)]
232pub struct EventLogConfig {
233    pub backend: EventLogBackendKind,
234    pub file_dir: PathBuf,
235    pub sqlite_path: PathBuf,
236    pub queue_depth: usize,
237}
238
239impl EventLogConfig {
240    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
241        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
242            .ok()
243            .map(|value| value.parse())
244            .transpose()?
245            .unwrap_or(EventLogBackendKind::Sqlite);
246        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
247            .ok()
248            .and_then(|value| value.parse::<usize>().ok())
249            .unwrap_or(128)
250            .max(1);
251
252        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
253            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
254            _ => crate::runtime_paths::event_log_dir(base_dir),
255        };
256        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
257            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
258            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
259        };
260
261        Ok(Self {
262            backend,
263            file_dir,
264            sqlite_path,
265            queue_depth,
266        })
267    }
268
269    pub fn location(&self) -> Option<PathBuf> {
270        match self.backend {
271            EventLogBackendKind::Memory => None,
272            EventLogBackendKind::File => Some(self.file_dir.clone()),
273            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
274        }
275    }
276}
277
278thread_local! {
279    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
280}
281
282pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
283    let config = EventLogConfig::for_base_dir(base_dir)?;
284    let log = open_event_log(&config)?;
285    ACTIVE_EVENT_LOG.with(|slot| {
286        *slot.borrow_mut() = Some(log.clone());
287    });
288    Ok(log)
289}
290
291pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
292    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
293    ACTIVE_EVENT_LOG.with(|slot| {
294        *slot.borrow_mut() = Some(log.clone());
295    });
296    log
297}
298
299pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
300    ACTIVE_EVENT_LOG.with(|slot| {
301        *slot.borrow_mut() = Some(log.clone());
302    });
303    log
304}
305
306pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
307    ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone())
308}
309
310pub fn reset_active_event_log() {
311    ACTIVE_EVENT_LOG.with(|slot| {
312        *slot.borrow_mut() = None;
313    });
314}
315
316pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
317    let config = EventLogConfig::for_base_dir(base_dir)?;
318    let description = match config.backend {
319        EventLogBackendKind::Memory => EventLogDescription {
320            backend: EventLogBackendKind::Memory,
321            location: None,
322            size_bytes: None,
323            queue_depth: config.queue_depth,
324        },
325        EventLogBackendKind::File => EventLogDescription {
326            backend: EventLogBackendKind::File,
327            size_bytes: Some(dir_size_bytes(&config.file_dir)),
328            location: Some(config.file_dir),
329            queue_depth: config.queue_depth,
330        },
331        EventLogBackendKind::Sqlite => EventLogDescription {
332            backend: EventLogBackendKind::Sqlite,
333            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
334            location: Some(config.sqlite_path),
335            queue_depth: config.queue_depth,
336        },
337    };
338    Ok(description)
339}
340
341pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
342    match config.backend {
343        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
344            config.queue_depth,
345        )))),
346        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
347            config.file_dir.clone(),
348            config.queue_depth,
349        )?))),
350        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
351            config.sqlite_path.clone(),
352            config.queue_depth,
353        )?))),
354    }
355}
356
357pub enum AnyEventLog {
358    Memory(MemoryEventLog),
359    File(FileEventLog),
360    Sqlite(SqliteEventLog),
361}
362
363impl EventLog for AnyEventLog {
364    fn describe(&self) -> EventLogDescription {
365        match self {
366            Self::Memory(log) => log.describe(),
367            Self::File(log) => log.describe(),
368            Self::Sqlite(log) => log.describe(),
369        }
370    }
371
372    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
373        match self {
374            Self::Memory(log) => log.append(topic, event).await,
375            Self::File(log) => log.append(topic, event).await,
376            Self::Sqlite(log) => log.append(topic, event).await,
377        }
378    }
379
380    async fn flush(&self) -> Result<(), LogError> {
381        match self {
382            Self::Memory(log) => log.flush().await,
383            Self::File(log) => log.flush().await,
384            Self::Sqlite(log) => log.flush().await,
385        }
386    }
387
388    async fn read_range(
389        &self,
390        topic: &Topic,
391        from: Option<EventId>,
392        limit: usize,
393    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
394        match self {
395            Self::Memory(log) => log.read_range(topic, from, limit).await,
396            Self::File(log) => log.read_range(topic, from, limit).await,
397            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
398        }
399    }
400
401    async fn subscribe(
402        self: Arc<Self>,
403        topic: &Topic,
404        from: Option<EventId>,
405    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
406        let (rx, queue_depth) = match self.as_ref() {
407            Self::Memory(log) => (
408                log.broadcasts.subscribe(topic, log.queue_depth),
409                log.queue_depth,
410            ),
411            Self::File(log) => (
412                log.broadcasts.subscribe(topic, log.queue_depth),
413                log.queue_depth,
414            ),
415            Self::Sqlite(log) => (
416                log.broadcasts.subscribe(topic, log.queue_depth),
417                log.queue_depth,
418            ),
419        };
420        let history = self.read_range(topic, from, usize::MAX).await?;
421        Ok(stream_from_broadcast(history, from, rx, queue_depth))
422    }
423
424    async fn ack(
425        &self,
426        topic: &Topic,
427        consumer: &ConsumerId,
428        up_to: EventId,
429    ) -> Result<(), LogError> {
430        match self {
431            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
432            Self::File(log) => log.ack(topic, consumer, up_to).await,
433            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
434        }
435    }
436
437    async fn consumer_cursor(
438        &self,
439        topic: &Topic,
440        consumer: &ConsumerId,
441    ) -> Result<Option<EventId>, LogError> {
442        match self {
443            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
444            Self::File(log) => log.consumer_cursor(topic, consumer).await,
445            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
446        }
447    }
448
449    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
450        match self {
451            Self::Memory(log) => log.latest(topic).await,
452            Self::File(log) => log.latest(topic).await,
453            Self::Sqlite(log) => log.latest(topic).await,
454        }
455    }
456
457    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
458        match self {
459            Self::Memory(log) => log.compact(topic, before).await,
460            Self::File(log) => log.compact(topic, before).await,
461            Self::Sqlite(log) => log.compact(topic, before).await,
462        }
463    }
464}
465
466#[derive(Default)]
467struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
468
469impl BroadcastMap {
470    fn subscribe(
471        &self,
472        topic: &Topic,
473        capacity: usize,
474    ) -> broadcast::Receiver<(EventId, LogEvent)> {
475        self.sender(topic, capacity).subscribe()
476    }
477
478    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
479        let _ = self.sender(topic, capacity).send(record);
480    }
481
482    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
483        let mut map = self.0.lock().expect("event log broadcast map poisoned");
484        map.entry(topic.as_str().to_string())
485            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
486            .clone()
487    }
488}
489
490fn stream_from_broadcast(
491    history: Vec<(EventId, LogEvent)>,
492    from: Option<EventId>,
493    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
494    queue_depth: usize,
495) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
496    let (tx, rx) = mpsc::channel(queue_depth.max(1));
497    // Run the subscription forwarder as a tokio task rather than a detached
498    // OS thread. A dedicated thread running under `futures::executor::block_on`
499    // is invisible to the tokio runtime, so tests that use `start_paused = true`
500    // race against auto-advanced timers while the thread catches up in real
501    // time. Spawning on tokio makes the forwarder participate in runtime
502    // scheduling (including paused-time quiescence) and ties its lifetime to
503    // the runtime's shutdown.
504    tokio::spawn(async move {
505        let mut last_seen = from.unwrap_or(0);
506        for (event_id, event) in history {
507            last_seen = event_id;
508            if tx.send(Ok((event_id, event))).await.is_err() {
509                return;
510            }
511        }
512
513        loop {
514            match live_rx.recv().await {
515                Ok((event_id, event)) if event_id > last_seen => {
516                    last_seen = event_id;
517                    if tx.send(Ok((event_id, event))).await.is_err() {
518                        return;
519                    }
520                }
521                Ok(_) => {}
522                Err(broadcast::error::RecvError::Closed) => return,
523                Err(broadcast::error::RecvError::Lagged(_)) => {
524                    let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
525                    return;
526                }
527            }
528        }
529    });
530    Box::pin(ReceiverStream::new(rx))
531}
532
533#[derive(Default)]
534struct MemoryState {
535    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
536    latest: HashMap<String, EventId>,
537    consumers: HashMap<(String, String), EventId>,
538}
539
540pub struct MemoryEventLog {
541    state: tokio::sync::Mutex<MemoryState>,
542    broadcasts: BroadcastMap,
543    queue_depth: usize,
544}
545
546impl MemoryEventLog {
547    pub fn new(queue_depth: usize) -> Self {
548        Self {
549            state: tokio::sync::Mutex::new(MemoryState::default()),
550            broadcasts: BroadcastMap::default(),
551            queue_depth: queue_depth.max(1),
552        }
553    }
554}
555
556impl EventLog for MemoryEventLog {
557    fn describe(&self) -> EventLogDescription {
558        EventLogDescription {
559            backend: EventLogBackendKind::Memory,
560            location: None,
561            size_bytes: None,
562            queue_depth: self.queue_depth,
563        }
564    }
565
566    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
567        let mut state = self.state.lock().await;
568        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
569        state.latest.insert(topic.as_str().to_string(), event_id);
570        state
571            .topics
572            .entry(topic.as_str().to_string())
573            .or_default()
574            .push_back((event_id, event.clone()));
575        drop(state);
576        self.broadcasts
577            .publish(topic, self.queue_depth, (event_id, event));
578        Ok(event_id)
579    }
580
581    async fn flush(&self) -> Result<(), LogError> {
582        Ok(())
583    }
584
585    async fn read_range(
586        &self,
587        topic: &Topic,
588        from: Option<EventId>,
589        limit: usize,
590    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
591        let from = from.unwrap_or(0);
592        let state = self.state.lock().await;
593        let events = state
594            .topics
595            .get(topic.as_str())
596            .into_iter()
597            .flat_map(|events| events.iter())
598            .filter(|(event_id, _)| *event_id > from)
599            .take(limit)
600            .map(|(event_id, event)| (*event_id, event.clone()))
601            .collect();
602        Ok(events)
603    }
604
605    async fn subscribe(
606        self: Arc<Self>,
607        topic: &Topic,
608        from: Option<EventId>,
609    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
610        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
611        let history = self.read_range(topic, from, usize::MAX).await?;
612        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
613    }
614
615    async fn ack(
616        &self,
617        topic: &Topic,
618        consumer: &ConsumerId,
619        up_to: EventId,
620    ) -> Result<(), LogError> {
621        let mut state = self.state.lock().await;
622        state.consumers.insert(
623            (topic.as_str().to_string(), consumer.as_str().to_string()),
624            up_to,
625        );
626        Ok(())
627    }
628
629    async fn consumer_cursor(
630        &self,
631        topic: &Topic,
632        consumer: &ConsumerId,
633    ) -> Result<Option<EventId>, LogError> {
634        let state = self.state.lock().await;
635        Ok(state
636            .consumers
637            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
638            .copied())
639    }
640
641    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
642        let state = self.state.lock().await;
643        Ok(state.latest.get(topic.as_str()).copied())
644    }
645
646    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
647        let mut state = self.state.lock().await;
648        let Some(events) = state.topics.get_mut(topic.as_str()) else {
649            return Ok(CompactReport::default());
650        };
651        let removed = events
652            .iter()
653            .take_while(|(event_id, _)| *event_id <= before)
654            .count();
655        for _ in 0..removed {
656            events.pop_front();
657        }
658        Ok(CompactReport {
659            removed,
660            remaining: events.len(),
661            latest: state.latest.get(topic.as_str()).copied(),
662            checkpointed: false,
663        })
664    }
665}
666
667#[derive(Serialize, Deserialize)]
668struct FileRecord {
669    id: EventId,
670    event: LogEvent,
671}
672
673pub struct FileEventLog {
674    root: PathBuf,
675    latest_ids: Mutex<HashMap<String, EventId>>,
676    write_lock: Mutex<()>,
677    broadcasts: BroadcastMap,
678    queue_depth: usize,
679}
680
681impl FileEventLog {
682    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
683        std::fs::create_dir_all(root.join("topics"))
684            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
685        std::fs::create_dir_all(root.join("consumers"))
686            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
687        Ok(Self {
688            root,
689            latest_ids: Mutex::new(HashMap::new()),
690            write_lock: Mutex::new(()),
691            broadcasts: BroadcastMap::default(),
692            queue_depth: queue_depth.max(1),
693        })
694    }
695
696    fn topic_path(&self, topic: &Topic) -> PathBuf {
697        self.root
698            .join("topics")
699            .join(format!("{}.jsonl", topic.as_str()))
700    }
701
702    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
703        self.root.join("consumers").join(format!(
704            "{}__{}.json",
705            topic.as_str(),
706            sanitize_filename(consumer.as_str())
707        ))
708    }
709
710    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
711        if let Some(event_id) = self
712            .latest_ids
713            .lock()
714            .expect("file event log latest ids poisoned")
715            .get(topic.as_str())
716            .copied()
717        {
718            return Ok(event_id);
719        }
720
721        let mut latest = 0;
722        let path = self.topic_path(topic);
723        if path.is_file() {
724            let file = std::fs::File::open(&path)
725                .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
726            for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
727                let line =
728                    line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
729                let record: FileRecord = serde_json::from_str(&line)
730                    .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
731                latest = record.id;
732            }
733        }
734        self.latest_ids
735            .lock()
736            .expect("file event log latest ids poisoned")
737            .insert(topic.as_str().to_string(), latest);
738        Ok(latest)
739    }
740
741    fn read_range_sync(
742        &self,
743        topic: &Topic,
744        from: Option<EventId>,
745        limit: usize,
746    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
747        let path = self.topic_path(topic);
748        if !path.is_file() {
749            return Ok(Vec::new());
750        }
751        let file = std::fs::File::open(&path)
752            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
753        let from = from.unwrap_or(0);
754        let mut events = Vec::new();
755        for line in std::io::BufRead::lines(std::io::BufReader::new(file)) {
756            let line =
757                line.map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
758            let record: FileRecord = serde_json::from_str(&line)
759                .map_err(|error| LogError::Serde(format!("event log parse error: {error}")))?;
760            if record.id > from {
761                events.push((record.id, record.event));
762            }
763            if events.len() >= limit {
764                break;
765            }
766        }
767        Ok(events)
768    }
769}
770
771impl EventLog for FileEventLog {
772    fn describe(&self) -> EventLogDescription {
773        EventLogDescription {
774            backend: EventLogBackendKind::File,
775            location: Some(self.root.clone()),
776            size_bytes: Some(dir_size_bytes(&self.root)),
777            queue_depth: self.queue_depth,
778        }
779    }
780
781    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
782        let _guard = self
783            .write_lock
784            .lock()
785            .expect("file event log write lock poisoned");
786        let next_id = self.latest_id_for_topic(topic)? + 1;
787        let record = FileRecord {
788            id: next_id,
789            event: event.clone(),
790        };
791        let path = self.topic_path(topic);
792        if let Some(parent) = path.parent() {
793            std::fs::create_dir_all(parent)
794                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
795        }
796        let line = serde_json::to_string(&record)
797            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
798        use std::io::Write as _;
799        let mut file = std::fs::OpenOptions::new()
800            .create(true)
801            .append(true)
802            .open(&path)
803            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
804        writeln!(file, "{line}")
805            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
806        self.latest_ids
807            .lock()
808            .expect("file event log latest ids poisoned")
809            .insert(topic.as_str().to_string(), next_id);
810        self.broadcasts
811            .publish(topic, self.queue_depth, (next_id, event));
812        Ok(next_id)
813    }
814
815    async fn flush(&self) -> Result<(), LogError> {
816        sync_tree(&self.root)
817    }
818
819    async fn read_range(
820        &self,
821        topic: &Topic,
822        from: Option<EventId>,
823        limit: usize,
824    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
825        self.read_range_sync(topic, from, limit)
826    }
827
828    async fn subscribe(
829        self: Arc<Self>,
830        topic: &Topic,
831        from: Option<EventId>,
832    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
833        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
834        let history = self.read_range_sync(topic, from, usize::MAX)?;
835        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
836    }
837
838    async fn ack(
839        &self,
840        topic: &Topic,
841        consumer: &ConsumerId,
842        up_to: EventId,
843    ) -> Result<(), LogError> {
844        let path = self.consumer_path(topic, consumer);
845        let payload = serde_json::json!({
846            "topic": topic.as_str(),
847            "consumer_id": consumer.as_str(),
848            "cursor": up_to,
849            "updated_at_ms": now_ms(),
850        });
851        write_json_atomically(&path, &payload)
852    }
853
854    async fn consumer_cursor(
855        &self,
856        topic: &Topic,
857        consumer: &ConsumerId,
858    ) -> Result<Option<EventId>, LogError> {
859        let path = self.consumer_path(topic, consumer);
860        if !path.is_file() {
861            return Ok(None);
862        }
863        let raw = std::fs::read_to_string(&path)
864            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
865        let payload: serde_json::Value = serde_json::from_str(&raw)
866            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
867        let cursor = payload
868            .get("cursor")
869            .and_then(serde_json::Value::as_u64)
870            .ok_or_else(|| {
871                LogError::Serde("event log consumer record missing numeric cursor".to_string())
872            })?;
873        Ok(Some(cursor))
874    }
875
876    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
877        let latest = self.latest_id_for_topic(topic)?;
878        if latest == 0 {
879            Ok(None)
880        } else {
881            Ok(Some(latest))
882        }
883    }
884
885    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
886        let _guard = self
887            .write_lock
888            .lock()
889            .expect("file event log write lock poisoned");
890        let path = self.topic_path(topic);
891        if !path.is_file() {
892            return Ok(CompactReport::default());
893        }
894        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
895        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
896        let tmp = path.with_extension("jsonl.tmp");
897        if retained.is_empty() {
898            let _ = std::fs::remove_file(&path);
899        } else {
900            let mut writer =
901                std::io::BufWriter::new(std::fs::File::create(&tmp).map_err(|error| {
902                    LogError::Io(format!("event log tmp create error: {error}"))
903                })?);
904            use std::io::Write as _;
905            for (event_id, event) in &retained {
906                let line = serde_json::to_string(&FileRecord {
907                    id: *event_id,
908                    event: event.clone(),
909                })
910                .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
911                writeln!(writer, "{line}")
912                    .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
913            }
914            writer
915                .flush()
916                .map_err(|error| LogError::Io(format!("event log flush error: {error}")))?;
917            std::fs::rename(&tmp, &path).map_err(|error| {
918                LogError::Io(format!("event log compact finalize error: {error}"))
919            })?;
920        }
921        let latest = retained.last().map(|(event_id, _)| *event_id);
922        self.latest_ids
923            .lock()
924            .expect("file event log latest ids poisoned")
925            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
926        Ok(CompactReport {
927            removed,
928            remaining: retained.len(),
929            latest,
930            checkpointed: false,
931        })
932    }
933}
934
935pub struct SqliteEventLog {
936    path: PathBuf,
937    connection: Mutex<Connection>,
938    broadcasts: BroadcastMap,
939    queue_depth: usize,
940}
941
942impl SqliteEventLog {
943    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
944        if let Some(parent) = path.parent() {
945            std::fs::create_dir_all(parent)
946                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
947        }
948        let connection = Connection::open(&path)
949            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
950        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
951        // SQLITE_BUSY from a previous test's connection that hasn't finished
952        // dropping yet (parallel `cargo test` on the same process, distinct
953        // paths, still contends on SQLite's own global mutex under WAL-mode
954        // promotion). Without this, `journal_mode = WAL` fails fast with
955        // "database is locked" instead of retrying.
956        connection
957            .busy_timeout(std::time::Duration::from_secs(5))
958            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
959        connection
960            .pragma_update(None, "journal_mode", "WAL")
961            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
962        connection
963            .pragma_update(None, "synchronous", "NORMAL")
964            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
965        connection
966            .execute_batch(
967                "CREATE TABLE IF NOT EXISTS topic_heads (
968                    topic TEXT PRIMARY KEY,
969                    last_id INTEGER NOT NULL
970                );
971                CREATE TABLE IF NOT EXISTS events (
972                    topic TEXT NOT NULL,
973                    event_id INTEGER NOT NULL,
974                    kind TEXT NOT NULL,
975                    payload TEXT NOT NULL,
976                    headers TEXT NOT NULL,
977                    occurred_at_ms INTEGER NOT NULL,
978                    PRIMARY KEY (topic, event_id)
979                );
980                CREATE TABLE IF NOT EXISTS consumers (
981                    topic TEXT NOT NULL,
982                    consumer_id TEXT NOT NULL,
983                    cursor INTEGER NOT NULL,
984                    updated_at_ms INTEGER NOT NULL,
985                    PRIMARY KEY (topic, consumer_id)
986                );",
987            )
988            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
989        Ok(Self {
990            path,
991            connection: Mutex::new(connection),
992            broadcasts: BroadcastMap::default(),
993            queue_depth: queue_depth.max(1),
994        })
995    }
996}
997
998impl EventLog for SqliteEventLog {
999    fn describe(&self) -> EventLogDescription {
1000        EventLogDescription {
1001            backend: EventLogBackendKind::Sqlite,
1002            location: Some(self.path.clone()),
1003            size_bytes: Some(sqlite_size_bytes(&self.path)),
1004            queue_depth: self.queue_depth,
1005        }
1006    }
1007
1008    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1009        let mut connection = self
1010            .connection
1011            .lock()
1012            .expect("sqlite event log connection poisoned");
1013        let tx = connection
1014            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1015            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1016        tx.execute(
1017            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1018            params![topic.as_str()],
1019        )
1020        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1021        tx.execute(
1022            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1023            params![topic.as_str()],
1024        )
1025        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1026        let event_id = tx
1027            .query_row(
1028                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1029                params![topic.as_str()],
1030                |row| row.get::<_, i64>(0),
1031            )
1032            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1033            .and_then(sqlite_i64_to_event_id)?;
1034        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1035        tx.execute(
1036            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1037             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1038            params![
1039                topic.as_str(),
1040                event_id_sql,
1041                event.kind,
1042                serde_json::to_string(&event.payload).map_err(|error| LogError::Serde(format!(
1043                    "event log payload encode error: {error}"
1044                )))?,
1045                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1046                    "event log headers encode error: {error}"
1047                )))?,
1048                event.occurred_at_ms
1049            ],
1050        )
1051        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1052        tx.commit()
1053            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1054        self.broadcasts
1055            .publish(topic, self.queue_depth, (event_id, event.clone()));
1056        Ok(event_id)
1057    }
1058
1059    async fn flush(&self) -> Result<(), LogError> {
1060        let connection = self
1061            .connection
1062            .lock()
1063            .expect("sqlite event log connection poisoned");
1064        connection
1065            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1066            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1067        Ok(())
1068    }
1069
1070    async fn read_range(
1071        &self,
1072        topic: &Topic,
1073        from: Option<EventId>,
1074        limit: usize,
1075    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1076        let connection = self
1077            .connection
1078            .lock()
1079            .expect("sqlite event log connection poisoned");
1080        let mut statement = connection
1081            .prepare(
1082                "SELECT event_id, kind, payload, headers, occurred_at_ms
1083                 FROM events
1084                 WHERE topic = ?1 AND event_id > ?2
1085                 ORDER BY event_id ASC
1086                 LIMIT ?3",
1087            )
1088            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1089        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1090        let rows = statement
1091            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1092                let payload: String = row.get(2)?;
1093                let headers: String = row.get(3)?;
1094                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1095                Ok((
1096                    event_id,
1097                    LogEvent {
1098                        kind: row.get(1)?,
1099                        payload: serde_json::from_str(&payload).map_err(|error| {
1100                            rusqlite::Error::FromSqlConversionFailure(
1101                                payload.len(),
1102                                rusqlite::types::Type::Text,
1103                                Box::new(error),
1104                            )
1105                        })?,
1106                        headers: serde_json::from_str(&headers).map_err(|error| {
1107                            rusqlite::Error::FromSqlConversionFailure(
1108                                headers.len(),
1109                                rusqlite::types::Type::Text,
1110                                Box::new(error),
1111                            )
1112                        })?,
1113                        occurred_at_ms: row.get(4)?,
1114                    },
1115                ))
1116            })
1117            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1118        let mut events = Vec::new();
1119        for row in rows {
1120            events.push(
1121                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1122            );
1123        }
1124        Ok(events)
1125    }
1126
1127    async fn subscribe(
1128        self: Arc<Self>,
1129        topic: &Topic,
1130        from: Option<EventId>,
1131    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1132        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1133        let history = self.read_range(topic, from, usize::MAX).await?;
1134        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1135    }
1136
1137    async fn ack(
1138        &self,
1139        topic: &Topic,
1140        consumer: &ConsumerId,
1141        up_to: EventId,
1142    ) -> Result<(), LogError> {
1143        let connection = self
1144            .connection
1145            .lock()
1146            .expect("sqlite event log connection poisoned");
1147        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1148        connection
1149            .execute(
1150                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1151                 VALUES (?1, ?2, ?3, ?4)
1152                 ON CONFLICT(topic, consumer_id)
1153                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1154                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1155            )
1156            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1157        Ok(())
1158    }
1159
1160    async fn consumer_cursor(
1161        &self,
1162        topic: &Topic,
1163        consumer: &ConsumerId,
1164    ) -> Result<Option<EventId>, LogError> {
1165        let connection = self
1166            .connection
1167            .lock()
1168            .expect("sqlite event log connection poisoned");
1169        connection
1170            .query_row(
1171                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1172                params![topic.as_str(), consumer.as_str()],
1173                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1174            )
1175            .optional()
1176            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1177    }
1178
1179    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1180        let connection = self
1181            .connection
1182            .lock()
1183            .expect("sqlite event log connection poisoned");
1184        connection
1185            .query_row(
1186                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1187                params![topic.as_str()],
1188                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1189            )
1190            .optional()
1191            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1192    }
1193
1194    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1195        let connection = self
1196            .connection
1197            .lock()
1198            .expect("sqlite event log connection poisoned");
1199        let before_sql = event_id_to_sqlite_i64(before)?;
1200        let removed = connection
1201            .execute(
1202                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1203                params![topic.as_str(), before_sql],
1204            )
1205            .map_err(|error| {
1206                LogError::Sqlite(format!("event log compact delete error: {error}"))
1207            })?;
1208        let remaining = connection
1209            .query_row(
1210                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1211                params![topic.as_str()],
1212                |row| row.get::<_, i64>(0),
1213            )
1214            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1215            .and_then(sqlite_i64_to_usize)?;
1216        let latest = connection
1217            .query_row(
1218                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1219                params![topic.as_str()],
1220                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1221            )
1222            .optional()
1223            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1224        connection
1225            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1226            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1227        Ok(CompactReport {
1228            removed,
1229            remaining,
1230            latest,
1231            checkpointed: true,
1232        })
1233    }
1234}
1235
1236fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1237    let candidate = PathBuf::from(value);
1238    if candidate.is_absolute() {
1239        candidate
1240    } else {
1241        base_dir.join(candidate)
1242    }
1243}
1244
1245fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1246    if let Some(parent) = path.parent() {
1247        std::fs::create_dir_all(parent)
1248            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1249    }
1250    let tmp = path.with_extension("tmp");
1251    let encoded = serde_json::to_vec_pretty(payload)
1252        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1253    std::fs::write(&tmp, encoded)
1254        .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1255    std::fs::rename(&tmp, path)
1256        .map_err(|error| LogError::Io(format!("event log rename error: {error}")))?;
1257    Ok(())
1258}
1259
1260fn sanitize_filename(value: &str) -> String {
1261    sanitize_topic_component(value)
1262}
1263
1264pub fn sanitize_topic_component(value: &str) -> String {
1265    value
1266        .chars()
1267        .map(|ch| {
1268            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1269                ch
1270            } else {
1271                '_'
1272            }
1273        })
1274        .collect()
1275}
1276
1277fn dir_size_bytes(path: &Path) -> u64 {
1278    if !path.exists() {
1279        return 0;
1280    }
1281    let mut total = 0;
1282    if let Ok(entries) = std::fs::read_dir(path) {
1283        for entry in entries.flatten() {
1284            let path = entry.path();
1285            if path.is_dir() {
1286                total += dir_size_bytes(&path);
1287            } else if let Ok(metadata) = entry.metadata() {
1288                total += metadata.len();
1289            }
1290        }
1291    }
1292    total
1293}
1294
1295fn sqlite_size_bytes(path: &Path) -> u64 {
1296    let mut total = file_size(path);
1297    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1298    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1299    total
1300}
1301
1302fn file_size(path: &Path) -> u64 {
1303    std::fs::metadata(path)
1304        .map(|metadata| metadata.len())
1305        .unwrap_or(0)
1306}
1307
1308fn sync_tree(root: &Path) -> Result<(), LogError> {
1309    if !root.exists() {
1310        return Ok(());
1311    }
1312    for entry in std::fs::read_dir(root)
1313        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1314    {
1315        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1316        let path = entry.path();
1317        if path.is_dir() {
1318            sync_tree(&path)?;
1319            continue;
1320        }
1321        std::fs::File::open(&path)
1322            .and_then(|file| file.sync_all())
1323            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1324    }
1325    Ok(())
1326}
1327
1328fn now_ms() -> i64 {
1329    std::time::SystemTime::now()
1330        .duration_since(std::time::UNIX_EPOCH)
1331        .map(|duration| duration.as_millis() as i64)
1332        .unwrap_or(0)
1333}
1334
1335fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1336    i64::try_from(event_id)
1337        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1338}
1339
1340fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1341    u64::try_from(value)
1342        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1343}
1344
1345fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1346    u64::try_from(value).map_err(|_| {
1347        rusqlite::Error::FromSqlConversionFailure(
1348            std::mem::size_of::<i64>(),
1349            rusqlite::types::Type::Integer,
1350            "sqlite event id is negative".into(),
1351        )
1352    })
1353}
1354
1355fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
1356    usize::try_from(value)
1357        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use super::*;
1363    use futures::StreamExt;
1364    use rand::{rngs::StdRng, RngExt, SeedableRng};
1365
1366    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
1367        let topic = Topic::new("trigger.inbox").unwrap();
1368        for i in 0..10_000 {
1369            log.append(
1370                &topic,
1371                LogEvent::new("append", serde_json::json!({ "i": i })),
1372            )
1373            .await
1374            .unwrap();
1375        }
1376        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
1377        assert_eq!(events.len(), 10_000);
1378        assert_eq!(events.first().unwrap().0, 1);
1379        assert_eq!(events.last().unwrap().0, 10_000);
1380    }
1381
1382    #[tokio::test(flavor = "current_thread")]
1383    async fn memory_backend_supports_append_read_subscribe_and_compact() {
1384        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
1385        exercise_basic_backend(log.clone()).await;
1386
1387        let topic = Topic::new("agent.transcript.demo").unwrap();
1388        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
1389        let first = log
1390            .append(
1391                &topic,
1392                LogEvent::new("message", serde_json::json!({"text":"one"})),
1393            )
1394            .await
1395            .unwrap();
1396        let second = log
1397            .append(
1398                &topic,
1399                LogEvent::new("message", serde_json::json!({"text":"two"})),
1400            )
1401            .await
1402            .unwrap();
1403        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
1404        assert_eq!(seen[0].as_ref().unwrap().0, first);
1405        assert_eq!(seen[1].as_ref().unwrap().0, second);
1406
1407        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
1408            .await
1409            .unwrap();
1410        let compact = log.compact(&topic, first).await.unwrap();
1411        assert_eq!(compact.removed, 1);
1412        assert_eq!(compact.remaining, 1);
1413    }
1414
1415    #[tokio::test(flavor = "current_thread")]
1416    async fn file_backend_persists_across_reopen_and_compacts() {
1417        let dir = tempfile::tempdir().unwrap();
1418        let topic = Topic::new("trigger.outbox").unwrap();
1419        let first_log = Arc::new(AnyEventLog::File(
1420            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1421        ));
1422        first_log
1423            .append(
1424                &topic,
1425                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
1426            )
1427            .await
1428            .unwrap();
1429        first_log
1430            .append(
1431                &topic,
1432                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
1433            )
1434            .await
1435            .unwrap();
1436        drop(first_log);
1437
1438        let reopened = Arc::new(AnyEventLog::File(
1439            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
1440        ));
1441        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
1442        assert_eq!(events.len(), 2);
1443        let compact = reopened.compact(&topic, 1).await.unwrap();
1444        assert_eq!(compact.removed, 1);
1445        assert_eq!(
1446            reopened
1447                .read_range(&topic, None, usize::MAX)
1448                .await
1449                .unwrap()
1450                .len(),
1451            1
1452        );
1453    }
1454
1455    #[tokio::test(flavor = "current_thread")]
1456    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
1457        let dir = tempfile::tempdir().unwrap();
1458        let path = dir.path().join("events.sqlite");
1459        let topic = Topic::new("daemon.demo.state").unwrap();
1460        let first_log = Arc::new(AnyEventLog::Sqlite(
1461            SqliteEventLog::open(path.clone(), 8).unwrap(),
1462        ));
1463        first_log
1464            .append(
1465                &topic,
1466                LogEvent::new("state", serde_json::json!({"state":"idle"})),
1467            )
1468            .await
1469            .unwrap();
1470        first_log
1471            .append(
1472                &topic,
1473                LogEvent::new("state", serde_json::json!({"state":"active"})),
1474            )
1475            .await
1476            .unwrap();
1477        drop(first_log);
1478
1479        let reopened = Arc::new(AnyEventLog::Sqlite(
1480            SqliteEventLog::open(path.clone(), 8).unwrap(),
1481        ));
1482        assert_eq!(
1483            reopened
1484                .read_range(&topic, None, usize::MAX)
1485                .await
1486                .unwrap()
1487                .len(),
1488            2
1489        );
1490        let compact = reopened.compact(&topic, 1).await.unwrap();
1491        assert!(compact.checkpointed);
1492        let wal = PathBuf::from(format!("{}-wal", path.display()));
1493        assert!(file_size(&wal) == 0 || !wal.exists());
1494    }
1495
1496    #[tokio::test(flavor = "current_thread")]
1497    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
1498        let (sender, rx) = broadcast::channel(2);
1499        for i in 0..10 {
1500            sender
1501                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
1502                .unwrap();
1503        }
1504        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
1505
1506        match stream.next().await {
1507            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
1508            other => panic!("subscriber should surface lag, got {other:?}"),
1509        }
1510    }
1511
1512    #[tokio::test(flavor = "current_thread")]
1513    async fn randomized_reader_sequences_stay_monotonic() {
1514        let log = Arc::new(MemoryEventLog::new(32));
1515        let topic = Topic::new("fuzz.demo").unwrap();
1516        let mut readers = vec![
1517            log.clone().subscribe(&topic, None).await.unwrap(),
1518            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
1519            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
1520        ];
1521        let mut rng = StdRng::seed_from_u64(7);
1522        for _ in 0..64 {
1523            let value = rng.random_range(0..1000);
1524            log.append(
1525                &topic,
1526                LogEvent::new("rand", serde_json::json!({"value": value})),
1527            )
1528            .await
1529            .unwrap();
1530        }
1531
1532        let mut sequences = Vec::new();
1533        for reader in &mut readers {
1534            let mut ids = Vec::new();
1535            while let Some(item) = reader.next().await {
1536                match item {
1537                    Ok((event_id, _)) => {
1538                        ids.push(event_id);
1539                        if ids.len() >= 16 {
1540                            break;
1541                        }
1542                    }
1543                    Err(LogError::ConsumerLagged(_)) => break,
1544                    Err(error) => panic!("unexpected subscription error: {error}"),
1545                }
1546            }
1547            sequences.push(ids);
1548        }
1549
1550        for ids in sequences {
1551            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
1552        }
1553    }
1554}