Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15use crate::runtime_limits::RuntimeLimits;
16
17pub type EventId = u64;
18
19pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
20pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
21pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
22pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct Topic(String);
26
27impl Topic {
28    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
29        let value = value.into();
30        if value.is_empty() {
31            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
32        }
33        if !value
34            .chars()
35            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
36        {
37            return Err(LogError::InvalidTopic(format!(
38                "topic '{value}' contains unsupported characters"
39            )));
40        }
41        Ok(Self(value))
42    }
43
44    pub fn as_str(&self) -> &str {
45        &self.0
46    }
47}
48
49impl fmt::Display for Topic {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        self.0.fmt(f)
52    }
53}
54
55impl FromStr for Topic {
56    type Err = LogError;
57
58    fn from_str(s: &str) -> Result<Self, Self::Err> {
59        Self::new(s)
60    }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
64pub struct ConsumerId(String);
65
66impl ConsumerId {
67    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
68        let value = value.into();
69        if value.trim().is_empty() {
70            return Err(LogError::InvalidConsumer(
71                "consumer id cannot be empty".to_string(),
72            ));
73        }
74        Ok(Self(value))
75    }
76
77    pub fn as_str(&self) -> &str {
78        &self.0
79    }
80}
81
82impl fmt::Display for ConsumerId {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        self.0.fmt(f)
85    }
86}
87
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(rename_all = "snake_case")]
90pub enum EventLogBackendKind {
91    Memory,
92    File,
93    Sqlite,
94}
95
96impl fmt::Display for EventLogBackendKind {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        match self {
99            Self::Memory => write!(f, "memory"),
100            Self::File => write!(f, "file"),
101            Self::Sqlite => write!(f, "sqlite"),
102        }
103    }
104}
105
106impl FromStr for EventLogBackendKind {
107    type Err = LogError;
108
109    fn from_str(value: &str) -> Result<Self, Self::Err> {
110        match value.trim().to_ascii_lowercase().as_str() {
111            "memory" => Ok(Self::Memory),
112            "file" => Ok(Self::File),
113            "sqlite" => Ok(Self::Sqlite),
114            other => Err(LogError::Config(format!(
115                "unsupported event log backend '{other}'"
116            ))),
117        }
118    }
119}
120
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
122pub struct LogEvent {
123    pub kind: String,
124    pub payload: serde_json::Value,
125    #[serde(default)]
126    pub headers: BTreeMap<String, String>,
127    pub occurred_at_ms: i64,
128}
129
130impl LogEvent {
131    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
132        Self {
133            kind: kind.into(),
134            payload,
135            headers: BTreeMap::new(),
136            occurred_at_ms: now_ms(),
137        }
138    }
139
140    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
141        self.headers = headers;
142        self
143    }
144
145    /// Apply the unified redaction policy to this event's headers and
146    /// payload. Backends are intentionally left unaware of redaction —
147    /// emitters that need scrubbed events call this before append, and
148    /// readers that materialize events for display can apply the policy
149    /// again as defense in depth.
150    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
151        self.headers = policy.redact_headers(&self.headers);
152        policy.redact_json_in_place(&mut self.payload);
153    }
154}
155
156/// Serialized event payload form for large read paths.
157///
158/// `payload` contains the original JSON bytes for backends that can expose
159/// them directly. Callers that only need to forward or hash the payload can
160/// avoid materializing a `serde_json::Value`; callers that need structured
161/// access can opt in with `payload_json`.
162#[derive(Clone, Debug, PartialEq, Eq)]
163pub struct LogEventBytes {
164    pub kind: String,
165    pub payload: Bytes,
166    pub headers: BTreeMap<String, String>,
167    pub occurred_at_ms: i64,
168}
169
170impl LogEventBytes {
171    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
172        serde_json::from_slice(&self.payload)
173            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
174    }
175
176    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
177        Ok(LogEvent {
178            kind: self.kind,
179            payload: serde_json::from_slice(&self.payload).map_err(|error| {
180                LogError::Serde(format!("event log payload parse error: {error}"))
181            })?,
182            headers: self.headers,
183            occurred_at_ms: self.occurred_at_ms,
184        })
185    }
186}
187
188impl TryFrom<LogEvent> for LogEventBytes {
189    type Error = LogError;
190
191    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
192        let payload = serde_json::to_vec(&event.payload)
193            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
194        Ok(Self {
195            kind: event.kind,
196            payload: Bytes::from(payload),
197            headers: event.headers,
198            occurred_at_ms: event.occurred_at_ms,
199        })
200    }
201}
202
203#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
204pub struct CompactReport {
205    pub removed: usize,
206    pub remaining: usize,
207    pub latest: Option<EventId>,
208    pub checkpointed: bool,
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub struct AppendOutcome {
213    pub event_id: EventId,
214    pub event: LogEvent,
215    pub inserted: bool,
216}
217
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub struct EventLogDescription {
220    pub backend: EventLogBackendKind,
221    pub location: Option<PathBuf>,
222    pub size_bytes: Option<u64>,
223    pub queue_depth: usize,
224}
225
226#[derive(Debug)]
227pub enum LogError {
228    Config(String),
229    InvalidTopic(String),
230    InvalidConsumer(String),
231    Io(String),
232    Serde(String),
233    Sqlite(String),
234    ConsumerLagged(EventId),
235}
236
237impl fmt::Display for LogError {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            Self::Config(message)
241            | Self::InvalidTopic(message)
242            | Self::InvalidConsumer(message)
243            | Self::Io(message)
244            | Self::Serde(message)
245            | Self::Sqlite(message) => message.fmt(f),
246            Self::ConsumerLagged(last_id) => {
247                write!(f, "subscriber lagged behind after event {last_id}")
248            }
249        }
250    }
251}
252
253impl std::error::Error for LogError {}
254
255#[allow(async_fn_in_trait)]
256pub trait EventLog: Send + Sync {
257    fn describe(&self) -> EventLogDescription;
258
259    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
260
261    async fn flush(&self) -> Result<(), LogError>;
262
263    /// Read events strictly after `from`. `None` starts from the
264    /// beginning of the topic.
265    async fn read_range(
266        &self,
267        topic: &Topic,
268        from: Option<EventId>,
269        limit: usize,
270    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
271
272    async fn read_range_bytes(
273        &self,
274        topic: &Topic,
275        from: Option<EventId>,
276        limit: usize,
277    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
278        let events = self.read_range(topic, from, limit).await?;
279        events
280            .into_iter()
281            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
282            .collect()
283    }
284
285    /// `async fn` keeps the ergonomic generic surface; the boxed stream
286    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
287    async fn subscribe(
288        self: Arc<Self>,
289        topic: &Topic,
290        from: Option<EventId>,
291    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
292
293    async fn ack(
294        &self,
295        topic: &Topic,
296        consumer: &ConsumerId,
297        up_to: EventId,
298    ) -> Result<(), LogError>;
299
300    async fn consumer_cursor(
301        &self,
302        topic: &Topic,
303        consumer: &ConsumerId,
304    ) -> Result<Option<EventId>, LogError>;
305
306    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
307
308    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
309}
310
311#[derive(Clone, Debug)]
312pub struct EventLogConfig {
313    pub backend: EventLogBackendKind,
314    pub file_dir: PathBuf,
315    pub sqlite_path: PathBuf,
316    pub queue_depth: usize,
317}
318
319impl EventLogConfig {
320    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
321        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
322            .ok()
323            .map(|value| value.parse())
324            .transpose()?
325            .unwrap_or(EventLogBackendKind::Sqlite);
326        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
327            .ok()
328            .and_then(|value| value.parse::<usize>().ok())
329            .unwrap_or(RuntimeLimits::DEFAULT.default_event_log_queue_depth)
330            .max(1);
331
332        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
333            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
334            _ => crate::runtime_paths::event_log_dir(base_dir),
335        };
336        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
337            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
338            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
339        };
340
341        Ok(Self {
342            backend,
343            file_dir,
344            sqlite_path,
345            queue_depth,
346        })
347    }
348
349    pub fn location(&self) -> Option<PathBuf> {
350        match self.backend {
351            EventLogBackendKind::Memory => None,
352            EventLogBackendKind::File => Some(self.file_dir.clone()),
353            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
354        }
355    }
356}
357
358thread_local! {
359    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
360    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
361}
362
363pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
364    let config = EventLogConfig::for_base_dir(base_dir)?;
365    let log = open_event_log(&config)?;
366    ACTIVE_EVENT_LOG.with(|slot| {
367        *slot.borrow_mut() = Some(log.clone());
368    });
369    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
370        *slot.borrow_mut() = None;
371    });
372    Ok(log)
373}
374
375pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
376    let config = EventLogConfig::for_base_dir(base_dir)?;
377    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
378    if !has_active {
379        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
380            *slot.borrow_mut() = Some(config);
381        });
382    }
383    Ok(())
384}
385
386pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
387    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
388    ACTIVE_EVENT_LOG.with(|slot| {
389        *slot.borrow_mut() = Some(log.clone());
390    });
391    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
392        *slot.borrow_mut() = None;
393    });
394    log
395}
396
397pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
398    ACTIVE_EVENT_LOG.with(|slot| {
399        *slot.borrow_mut() = Some(log.clone());
400    });
401    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
402        *slot.borrow_mut() = None;
403    });
404    log
405}
406
407pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
408    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
409        return Some(log);
410    }
411
412    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
413    match open_event_log(&config) {
414        Ok(log) => Some(install_active_event_log(log)),
415        Err(error) => {
416            crate::events::log_warn("event_log.init", &error.to_string());
417            None
418        }
419    }
420}
421
422pub fn reset_active_event_log() {
423    ACTIVE_EVENT_LOG.with(|slot| {
424        *slot.borrow_mut() = None;
425    });
426    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
427        *slot.borrow_mut() = None;
428    });
429}
430
431pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
432    let config = EventLogConfig::for_base_dir(base_dir)?;
433    let description = match config.backend {
434        EventLogBackendKind::Memory => EventLogDescription {
435            backend: EventLogBackendKind::Memory,
436            location: None,
437            size_bytes: None,
438            queue_depth: config.queue_depth,
439        },
440        EventLogBackendKind::File => EventLogDescription {
441            backend: EventLogBackendKind::File,
442            size_bytes: Some(dir_size_bytes(&config.file_dir)),
443            location: Some(config.file_dir),
444            queue_depth: config.queue_depth,
445        },
446        EventLogBackendKind::Sqlite => EventLogDescription {
447            backend: EventLogBackendKind::Sqlite,
448            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
449            location: Some(config.sqlite_path),
450            queue_depth: config.queue_depth,
451        },
452    };
453    Ok(description)
454}
455
456pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
457    match config.backend {
458        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
459            config.queue_depth,
460        )))),
461        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
462            config.file_dir.clone(),
463            config.queue_depth,
464        )?))),
465        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
466            config.sqlite_path.clone(),
467            config.queue_depth,
468        )?))),
469    }
470}
471
472pub enum AnyEventLog {
473    Memory(MemoryEventLog),
474    File(FileEventLog),
475    Sqlite(SqliteEventLog),
476}
477
478impl AnyEventLog {
479    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
480        match self {
481            Self::Memory(log) => log.topics().await,
482            Self::File(log) => log.topics(),
483            Self::Sqlite(log) => log.topics(),
484        }
485    }
486
487    pub async fn append_idempotent_by_header(
488        &self,
489        topic: &Topic,
490        header: &str,
491        value: &str,
492        event: LogEvent,
493    ) -> Result<AppendOutcome, LogError> {
494        if header.trim().is_empty() {
495            return Err(LogError::Config(
496                "idempotent append header cannot be empty".to_string(),
497            ));
498        }
499        match self {
500            Self::Memory(log) => {
501                log.append_idempotent_by_header(topic, header, value, event)
502                    .await
503            }
504            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
505            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
506        }
507    }
508}
509
510impl EventLog for AnyEventLog {
511    fn describe(&self) -> EventLogDescription {
512        match self {
513            Self::Memory(log) => log.describe(),
514            Self::File(log) => log.describe(),
515            Self::Sqlite(log) => log.describe(),
516        }
517    }
518
519    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
520        match self {
521            Self::Memory(log) => log.append(topic, event).await,
522            Self::File(log) => log.append(topic, event).await,
523            Self::Sqlite(log) => log.append(topic, event).await,
524        }
525    }
526
527    async fn flush(&self) -> Result<(), LogError> {
528        match self {
529            Self::Memory(log) => log.flush().await,
530            Self::File(log) => log.flush().await,
531            Self::Sqlite(log) => log.flush().await,
532        }
533    }
534
535    async fn read_range(
536        &self,
537        topic: &Topic,
538        from: Option<EventId>,
539        limit: usize,
540    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
541        match self {
542            Self::Memory(log) => log.read_range(topic, from, limit).await,
543            Self::File(log) => log.read_range(topic, from, limit).await,
544            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
545        }
546    }
547
548    async fn read_range_bytes(
549        &self,
550        topic: &Topic,
551        from: Option<EventId>,
552        limit: usize,
553    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
554        match self {
555            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
556            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
557            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
558        }
559    }
560
561    async fn subscribe(
562        self: Arc<Self>,
563        topic: &Topic,
564        from: Option<EventId>,
565    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
566        let (rx, queue_depth) = match self.as_ref() {
567            Self::Memory(log) => (
568                log.broadcasts.subscribe(topic, log.queue_depth),
569                log.queue_depth,
570            ),
571            Self::File(log) => (
572                log.broadcasts.subscribe(topic, log.queue_depth),
573                log.queue_depth,
574            ),
575            Self::Sqlite(log) => (
576                log.broadcasts.subscribe(topic, log.queue_depth),
577                log.queue_depth,
578            ),
579        };
580        let history = self.read_range(topic, from, usize::MAX).await?;
581        Ok(stream_from_broadcast(history, from, rx, queue_depth))
582    }
583
584    async fn ack(
585        &self,
586        topic: &Topic,
587        consumer: &ConsumerId,
588        up_to: EventId,
589    ) -> Result<(), LogError> {
590        match self {
591            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
592            Self::File(log) => log.ack(topic, consumer, up_to).await,
593            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
594        }
595    }
596
597    async fn consumer_cursor(
598        &self,
599        topic: &Topic,
600        consumer: &ConsumerId,
601    ) -> Result<Option<EventId>, LogError> {
602        match self {
603            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
604            Self::File(log) => log.consumer_cursor(topic, consumer).await,
605            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
606        }
607    }
608
609    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
610        match self {
611            Self::Memory(log) => log.latest(topic).await,
612            Self::File(log) => log.latest(topic).await,
613            Self::Sqlite(log) => log.latest(topic).await,
614        }
615    }
616
617    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
618        match self {
619            Self::Memory(log) => log.compact(topic, before).await,
620            Self::File(log) => log.compact(topic, before).await,
621            Self::Sqlite(log) => log.compact(topic, before).await,
622        }
623    }
624}
625
626#[derive(Default)]
627struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
628
629impl BroadcastMap {
630    fn subscribe(
631        &self,
632        topic: &Topic,
633        capacity: usize,
634    ) -> broadcast::Receiver<(EventId, LogEvent)> {
635        self.sender(topic, capacity).subscribe()
636    }
637
638    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
639        let _ = self.sender(topic, capacity).send(record);
640    }
641
642    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
643        let mut map = self.0.lock().expect("event log broadcast map poisoned");
644        map.entry(topic.as_str().to_string())
645            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
646            .clone()
647    }
648}
649
650fn stream_from_broadcast(
651    history: Vec<(EventId, LogEvent)>,
652    from: Option<EventId>,
653    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
654    queue_depth: usize,
655) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
656    let (tx, rx) = mpsc::channel(queue_depth.max(1));
657    // Run the subscription forwarder as a tokio task rather than a detached
658    // OS thread. A dedicated thread running under `futures::executor::block_on`
659    // is invisible to the tokio runtime, so tests that use `start_paused = true`
660    // race against auto-advanced timers while the thread catches up in real
661    // time. Spawning on tokio makes the forwarder participate in runtime
662    // scheduling (including paused-time quiescence) and ties its lifetime to
663    // the runtime's shutdown.
664    tokio::spawn(async move {
665        let mut last_seen = from.unwrap_or(0);
666        for (event_id, event) in history {
667            last_seen = event_id;
668            if tx.send(Ok((event_id, event))).await.is_err() {
669                return;
670            }
671        }
672
673        loop {
674            tokio::select! {
675                _ = tx.closed() => return,
676                received = live_rx.recv() => {
677                    match received {
678                        Ok((event_id, event)) if event_id > last_seen => {
679                            last_seen = event_id;
680                            if tx.send(Ok((event_id, event))).await.is_err() {
681                                return;
682                            }
683                        }
684                        Ok(_) => {}
685                        Err(broadcast::error::RecvError::Closed) => return,
686                        Err(broadcast::error::RecvError::Lagged(_)) => {
687                            let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
688                            return;
689                        }
690                    }
691                }
692            }
693        }
694    });
695    Box::pin(ReceiverStream::new(rx))
696}
697
698fn prepare_event_after(
699    topic: &Topic,
700    event_id: EventId,
701    previous: Option<(EventId, &LogEvent)>,
702    event: LogEvent,
703) -> Result<LogEvent, LogError> {
704    let previous_hash = previous
705        .map(|(previous_id, previous_event)| {
706            crate::provenance::event_record_hash_from_headers(
707                topic.as_str(),
708                previous_id,
709                previous_event,
710            )
711        })
712        .transpose()?;
713    crate::provenance::prepare_event_for_append(topic.as_str(), event_id, previous_hash, event)
714}
715
716#[derive(Default)]
717struct MemoryState {
718    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
719    latest: HashMap<String, EventId>,
720    consumers: HashMap<(String, String), EventId>,
721}
722
723pub struct MemoryEventLog {
724    state: Mutex<MemoryState>,
725    broadcasts: BroadcastMap,
726    queue_depth: usize,
727}
728
729impl MemoryEventLog {
730    pub fn new(queue_depth: usize) -> Self {
731        Self {
732            state: Mutex::new(MemoryState::default()),
733            broadcasts: BroadcastMap::default(),
734            queue_depth: queue_depth.max(1),
735        }
736    }
737
738    fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
739        self.state
740            .lock()
741            .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
742    }
743
744    async fn topics(&self) -> Result<Vec<Topic>, LogError> {
745        let state = self.state()?;
746        let mut topics = state
747            .topics
748            .keys()
749            .map(|topic| Topic::new(topic.clone()))
750            .collect::<Result<Vec<_>, _>>()?;
751        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
752        Ok(topics)
753    }
754
755    async fn append_idempotent_by_header(
756        &self,
757        topic: &Topic,
758        header: &str,
759        value: &str,
760        event: LogEvent,
761    ) -> Result<AppendOutcome, LogError> {
762        let mut state = self.state()?;
763        if let Some((event_id, existing)) = state
764            .topics
765            .get(topic.as_str())
766            .into_iter()
767            .flat_map(|events| events.iter())
768            .find(|(_, event)| {
769                event
770                    .headers
771                    .get(header)
772                    .is_some_and(|found| found == value)
773            })
774        {
775            return Ok(AppendOutcome {
776                event_id: *event_id,
777                event: existing.clone(),
778                inserted: false,
779            });
780        }
781
782        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
783        let previous = state
784            .topics
785            .get(topic.as_str())
786            .and_then(|events| events.back())
787            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
788        let event = prepare_event_after(topic, event_id, previous, event)?;
789        state.latest.insert(topic.as_str().to_string(), event_id);
790        state
791            .topics
792            .entry(topic.as_str().to_string())
793            .or_default()
794            .push_back((event_id, event.clone()));
795        drop(state);
796        self.broadcasts
797            .publish(topic, self.queue_depth, (event_id, event.clone()));
798        Ok(AppendOutcome {
799            event_id,
800            event,
801            inserted: true,
802        })
803    }
804}
805
806impl EventLog for MemoryEventLog {
807    fn describe(&self) -> EventLogDescription {
808        EventLogDescription {
809            backend: EventLogBackendKind::Memory,
810            location: None,
811            size_bytes: None,
812            queue_depth: self.queue_depth,
813        }
814    }
815
816    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
817        let mut state = self.state()?;
818        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
819        let previous = state
820            .topics
821            .get(topic.as_str())
822            .and_then(|events| events.back())
823            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
824        let event = prepare_event_after(topic, event_id, previous, event)?;
825        state.latest.insert(topic.as_str().to_string(), event_id);
826        state
827            .topics
828            .entry(topic.as_str().to_string())
829            .or_default()
830            .push_back((event_id, event.clone()));
831        drop(state);
832        self.broadcasts
833            .publish(topic, self.queue_depth, (event_id, event));
834        Ok(event_id)
835    }
836
837    async fn flush(&self) -> Result<(), LogError> {
838        Ok(())
839    }
840
841    async fn read_range(
842        &self,
843        topic: &Topic,
844        from: Option<EventId>,
845        limit: usize,
846    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
847        let from = from.unwrap_or(0);
848        let state = self.state()?;
849        let events = state
850            .topics
851            .get(topic.as_str())
852            .into_iter()
853            .flat_map(|events| events.iter())
854            .filter(|(event_id, _)| *event_id > from)
855            .take(limit)
856            .map(|(event_id, event)| (*event_id, event.clone()))
857            .collect();
858        Ok(events)
859    }
860
861    async fn subscribe(
862        self: Arc<Self>,
863        topic: &Topic,
864        from: Option<EventId>,
865    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
866        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
867        let history = self.read_range(topic, from, usize::MAX).await?;
868        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
869    }
870
871    async fn ack(
872        &self,
873        topic: &Topic,
874        consumer: &ConsumerId,
875        up_to: EventId,
876    ) -> Result<(), LogError> {
877        let mut state = self.state()?;
878        state.consumers.insert(
879            (topic.as_str().to_string(), consumer.as_str().to_string()),
880            up_to,
881        );
882        Ok(())
883    }
884
885    async fn consumer_cursor(
886        &self,
887        topic: &Topic,
888        consumer: &ConsumerId,
889    ) -> Result<Option<EventId>, LogError> {
890        let state = self.state()?;
891        Ok(state
892            .consumers
893            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
894            .copied())
895    }
896
897    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
898        let state = self.state()?;
899        Ok(state.latest.get(topic.as_str()).copied())
900    }
901
902    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
903        let mut state = self.state()?;
904        let Some(events) = state.topics.get_mut(topic.as_str()) else {
905            return Ok(CompactReport::default());
906        };
907        let removed = events
908            .iter()
909            .take_while(|(event_id, _)| *event_id <= before)
910            .count();
911        for _ in 0..removed {
912            events.pop_front();
913        }
914        Ok(CompactReport {
915            removed,
916            remaining: events.len(),
917            latest: state.latest.get(topic.as_str()).copied(),
918            checkpointed: false,
919        })
920    }
921}
922
923#[derive(Serialize, Deserialize)]
924struct FileRecord {
925    id: EventId,
926    event: LogEvent,
927}
928
929pub struct FileEventLog {
930    root: PathBuf,
931    latest_ids: Mutex<HashMap<String, EventId>>,
932    write_lock: Mutex<()>,
933    broadcasts: BroadcastMap,
934    queue_depth: usize,
935}
936
937impl FileEventLog {
938    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
939        std::fs::create_dir_all(root.join("topics"))
940            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
941        std::fs::create_dir_all(root.join("consumers"))
942            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
943        Ok(Self {
944            root,
945            latest_ids: Mutex::new(HashMap::new()),
946            write_lock: Mutex::new(()),
947            broadcasts: BroadcastMap::default(),
948            queue_depth: queue_depth.max(1),
949        })
950    }
951
952    fn topic_path(&self, topic: &Topic) -> PathBuf {
953        self.root
954            .join("topics")
955            .join(format!("{}.jsonl", topic.as_str()))
956    }
957
958    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
959        self.root.join("consumers").join(format!(
960            "{}__{}.json",
961            topic.as_str(),
962            sanitize_filename(consumer.as_str())
963        ))
964    }
965
966    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
967        if let Some(event_id) = self
968            .latest_ids
969            .lock()
970            .expect("file event log latest ids poisoned")
971            .get(topic.as_str())
972            .copied()
973        {
974            return Ok(event_id);
975        }
976
977        let mut latest = 0;
978        let path = self.topic_path(topic);
979        if path.is_file() {
980            for record in read_file_records(&path)? {
981                latest = record.id;
982            }
983        }
984        self.latest_ids
985            .lock()
986            .expect("file event log latest ids poisoned")
987            .insert(topic.as_str().to_string(), latest);
988        Ok(latest)
989    }
990
991    fn read_range_sync(
992        &self,
993        topic: &Topic,
994        from: Option<EventId>,
995        limit: usize,
996    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
997        let path = self.topic_path(topic);
998        if !path.is_file() {
999            return Ok(Vec::new());
1000        }
1001        let from = from.unwrap_or(0);
1002        let mut events = Vec::new();
1003        for record in read_file_records(&path)? {
1004            if record.id > from {
1005                events.push((record.id, record.event));
1006            }
1007            if events.len() >= limit {
1008                break;
1009            }
1010        }
1011        Ok(events)
1012    }
1013
1014    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1015        let topics_dir = self.root.join("topics");
1016        if !topics_dir.is_dir() {
1017            return Ok(Vec::new());
1018        }
1019        let mut topics = Vec::new();
1020        for entry in std::fs::read_dir(&topics_dir)
1021            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
1022        {
1023            let entry = entry
1024                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
1025            let path = entry.path();
1026            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
1027                continue;
1028            }
1029            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1030                continue;
1031            };
1032            topics.push(Topic::new(stem.to_string())?);
1033        }
1034        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
1035        Ok(topics)
1036    }
1037
1038    fn append_idempotent_by_header(
1039        &self,
1040        topic: &Topic,
1041        header: &str,
1042        value: &str,
1043        event: LogEvent,
1044    ) -> Result<AppendOutcome, LogError> {
1045        let _guard = self
1046            .write_lock
1047            .lock()
1048            .expect("file event log write lock poisoned");
1049        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1050        if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
1051            event
1052                .headers
1053                .get(header)
1054                .is_some_and(|found| found == value)
1055        }) {
1056            return Ok(AppendOutcome {
1057                event_id: *event_id,
1058                event: existing.clone(),
1059                inserted: false,
1060            });
1061        }
1062
1063        let next_id = self.latest_id_for_topic(topic)? + 1;
1064        let previous = existing_events
1065            .last()
1066            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1067        let event = prepare_event_after(topic, next_id, previous, event)?;
1068        self.append_record_locked(topic, next_id, event)
1069    }
1070
1071    fn append_record_locked(
1072        &self,
1073        topic: &Topic,
1074        event_id: EventId,
1075        event: LogEvent,
1076    ) -> Result<AppendOutcome, LogError> {
1077        let record = FileRecord {
1078            id: event_id,
1079            event: event.clone(),
1080        };
1081        let path = self.topic_path(topic);
1082        if let Some(parent) = path.parent() {
1083            std::fs::create_dir_all(parent)
1084                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1085        }
1086        let line = serde_json::to_string(&record)
1087            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1088        use std::io::Write as _;
1089        let mut file = std::fs::OpenOptions::new()
1090            .create(true)
1091            .append(true)
1092            .open(&path)
1093            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1094        writeln!(file, "{line}")
1095            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1096        self.latest_ids
1097            .lock()
1098            .expect("file event log latest ids poisoned")
1099            .insert(topic.as_str().to_string(), event_id);
1100        self.broadcasts
1101            .publish(topic, self.queue_depth, (event_id, event.clone()));
1102        Ok(AppendOutcome {
1103            event_id,
1104            event,
1105            inserted: true,
1106        })
1107    }
1108}
1109
1110fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
1111    let file = std::fs::File::open(path)
1112        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1113    let mut reader = std::io::BufReader::new(file);
1114    let mut records = Vec::new();
1115    let mut line = Vec::new();
1116    loop {
1117        line.clear();
1118        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
1119            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
1120        if bytes_read == 0 {
1121            break;
1122        }
1123        if line.iter().all(u8::is_ascii_whitespace) {
1124            continue;
1125        }
1126        let complete_line = line.ends_with(b"\n");
1127        match serde_json::from_slice::<FileRecord>(&line) {
1128            Ok(record) => records.push(record),
1129            Err(_) if !complete_line => break,
1130            Err(error) => {
1131                return Err(LogError::Serde(format!("event log parse error: {error}")));
1132            }
1133        }
1134    }
1135    Ok(records)
1136}
1137
1138impl EventLog for FileEventLog {
1139    fn describe(&self) -> EventLogDescription {
1140        EventLogDescription {
1141            backend: EventLogBackendKind::File,
1142            location: Some(self.root.clone()),
1143            size_bytes: Some(dir_size_bytes(&self.root)),
1144            queue_depth: self.queue_depth,
1145        }
1146    }
1147
1148    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1149        let _guard = self
1150            .write_lock
1151            .lock()
1152            .expect("file event log write lock poisoned");
1153        let next_id = self.latest_id_for_topic(topic)? + 1;
1154        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1155        let previous = existing_events
1156            .last()
1157            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1158        let event = prepare_event_after(topic, next_id, previous, event)?;
1159        self.append_record_locked(topic, next_id, event)
1160            .map(|outcome| outcome.event_id)
1161    }
1162
1163    async fn flush(&self) -> Result<(), LogError> {
1164        sync_tree(&self.root)
1165    }
1166
1167    async fn read_range(
1168        &self,
1169        topic: &Topic,
1170        from: Option<EventId>,
1171        limit: usize,
1172    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1173        self.read_range_sync(topic, from, limit)
1174    }
1175
1176    async fn read_range_bytes(
1177        &self,
1178        topic: &Topic,
1179        from: Option<EventId>,
1180        limit: usize,
1181    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1182        self.read_range_sync(topic, from, limit)?
1183            .into_iter()
1184            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1185            .collect()
1186    }
1187
1188    async fn subscribe(
1189        self: Arc<Self>,
1190        topic: &Topic,
1191        from: Option<EventId>,
1192    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1193        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1194        let history = self.read_range_sync(topic, from, usize::MAX)?;
1195        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1196    }
1197
1198    async fn ack(
1199        &self,
1200        topic: &Topic,
1201        consumer: &ConsumerId,
1202        up_to: EventId,
1203    ) -> Result<(), LogError> {
1204        let path = self.consumer_path(topic, consumer);
1205        let payload = serde_json::json!({
1206            "topic": topic.as_str(),
1207            "consumer_id": consumer.as_str(),
1208            "cursor": up_to,
1209            "updated_at_ms": now_ms(),
1210        });
1211        write_json_atomically(&path, &payload)
1212    }
1213
1214    async fn consumer_cursor(
1215        &self,
1216        topic: &Topic,
1217        consumer: &ConsumerId,
1218    ) -> Result<Option<EventId>, LogError> {
1219        let path = self.consumer_path(topic, consumer);
1220        if !path.is_file() {
1221            return Ok(None);
1222        }
1223        let raw = std::fs::read_to_string(&path)
1224            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1225        let payload: serde_json::Value = serde_json::from_str(&raw)
1226            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1227        let cursor = payload
1228            .get("cursor")
1229            .and_then(serde_json::Value::as_u64)
1230            .ok_or_else(|| {
1231                LogError::Serde("event log consumer record missing numeric cursor".to_string())
1232            })?;
1233        Ok(Some(cursor))
1234    }
1235
1236    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1237        let latest = self.latest_id_for_topic(topic)?;
1238        if latest == 0 {
1239            Ok(None)
1240        } else {
1241            Ok(Some(latest))
1242        }
1243    }
1244
1245    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1246        let _guard = self
1247            .write_lock
1248            .lock()
1249            .expect("file event log write lock poisoned");
1250        let path = self.topic_path(topic);
1251        if !path.is_file() {
1252            return Ok(CompactReport::default());
1253        }
1254        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1255        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1256        if retained.is_empty() {
1257            let _ = std::fs::remove_file(&path);
1258        } else {
1259            crate::atomic_io::atomic_write_with(&path, |writer| {
1260                use std::io::Write as _;
1261                for (event_id, event) in &retained {
1262                    let line = serde_json::to_string(&FileRecord {
1263                        id: *event_id,
1264                        event: event.clone(),
1265                    })
1266                    .map_err(|error| std::io::Error::other(error.to_string()))?;
1267                    writeln!(writer, "{line}")?;
1268                }
1269                Ok(())
1270            })
1271            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1272        }
1273        let latest = retained.last().map(|(event_id, _)| *event_id);
1274        self.latest_ids
1275            .lock()
1276            .expect("file event log latest ids poisoned")
1277            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1278        Ok(CompactReport {
1279            removed,
1280            remaining: retained.len(),
1281            latest,
1282            checkpointed: false,
1283        })
1284    }
1285}
1286
1287pub struct SqliteEventLog {
1288    path: PathBuf,
1289    connection: Mutex<Connection>,
1290    broadcasts: BroadcastMap,
1291    queue_depth: usize,
1292}
1293
1294impl SqliteEventLog {
1295    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1296        if let Some(parent) = path.parent() {
1297            std::fs::create_dir_all(parent)
1298                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1299        }
1300        let connection = Connection::open(&path)
1301            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1302        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1303        // SQLITE_BUSY from a previous test's connection that hasn't finished
1304        // dropping yet (parallel `cargo test` on the same process, distinct
1305        // paths, still contends on SQLite's own global mutex under WAL-mode
1306        // promotion). Without this, `journal_mode = WAL` fails fast with
1307        // "database is locked" instead of retrying.
1308        connection
1309            .busy_timeout(std::time::Duration::from_secs(5))
1310            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1311        connection
1312            .pragma_update(None, "journal_mode", "WAL")
1313            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1314        connection
1315            .pragma_update(None, "synchronous", "NORMAL")
1316            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1317        connection
1318            .execute_batch(
1319                "CREATE TABLE IF NOT EXISTS topic_heads (
1320                    topic TEXT PRIMARY KEY,
1321                    last_id INTEGER NOT NULL
1322                );
1323                CREATE TABLE IF NOT EXISTS events (
1324                    topic TEXT NOT NULL,
1325                    event_id INTEGER NOT NULL,
1326                    kind TEXT NOT NULL,
1327                    payload BLOB NOT NULL,
1328                    headers TEXT NOT NULL,
1329                    occurred_at_ms INTEGER NOT NULL,
1330                    PRIMARY KEY (topic, event_id)
1331                );
1332                CREATE TABLE IF NOT EXISTS consumers (
1333                    topic TEXT NOT NULL,
1334                    consumer_id TEXT NOT NULL,
1335                    cursor INTEGER NOT NULL,
1336                    updated_at_ms INTEGER NOT NULL,
1337                    PRIMARY KEY (topic, consumer_id)
1338                );
1339                CREATE TABLE IF NOT EXISTS event_idempotency_keys (
1340                    topic TEXT NOT NULL,
1341                    key TEXT NOT NULL,
1342                    value TEXT NOT NULL,
1343                    event_id INTEGER NOT NULL,
1344                    PRIMARY KEY (topic, key, value),
1345                    FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
1346                );",
1347            )
1348            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1349        Ok(Self {
1350            path,
1351            connection: Mutex::new(connection),
1352            broadcasts: BroadcastMap::default(),
1353            queue_depth: queue_depth.max(1),
1354        })
1355    }
1356
1357    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1358        let connection = self
1359            .connection
1360            .lock()
1361            .expect("sqlite event log connection poisoned");
1362        let mut statement = connection
1363            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1364            .map_err(|error| {
1365                LogError::Sqlite(format!("event log topics prepare error: {error}"))
1366            })?;
1367        let rows = statement
1368            .query_map([], |row| row.get::<_, String>(0))
1369            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1370        let mut topics = Vec::new();
1371        for row in rows {
1372            topics.push(Topic::new(row.map_err(|error| {
1373                LogError::Sqlite(format!("event log topic row error: {error}"))
1374            })?)?);
1375        }
1376        Ok(topics)
1377    }
1378
1379    fn append_idempotent_by_header(
1380        &self,
1381        topic: &Topic,
1382        header: &str,
1383        value: &str,
1384        event: LogEvent,
1385    ) -> Result<AppendOutcome, LogError> {
1386        let mut connection = self
1387            .connection
1388            .lock()
1389            .expect("sqlite event log connection poisoned");
1390        let tx = connection
1391            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1392            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1393
1394        let existing = tx
1395            .query_row(
1396                "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
1397                 FROM event_idempotency_keys k
1398                 JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
1399                 WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3",
1400                params![topic.as_str(), header, value],
1401                |row| {
1402                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1403                    let headers: String = row.get(3)?;
1404                    Ok((
1405                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1406                        LogEvent {
1407                            kind: row.get(1)?,
1408                            payload: serde_json::from_slice(&payload).map_err(|error| {
1409                                rusqlite::Error::FromSqlConversionFailure(
1410                                    payload.len(),
1411                                    rusqlite::types::Type::Blob,
1412                                    Box::new(error),
1413                                )
1414                            })?,
1415                            headers: serde_json::from_str(&headers).map_err(|error| {
1416                                rusqlite::Error::FromSqlConversionFailure(
1417                                    headers.len(),
1418                                    rusqlite::types::Type::Text,
1419                                    Box::new(error),
1420                                )
1421                            })?,
1422                            occurred_at_ms: row.get(4)?,
1423                        },
1424                    ))
1425                },
1426            )
1427            .optional()
1428            .map_err(|error| {
1429                LogError::Sqlite(format!("event log idempotency read error: {error}"))
1430            })?;
1431
1432        if let Some((event_id, event)) = existing {
1433            return Ok(AppendOutcome {
1434                event_id,
1435                event,
1436                inserted: false,
1437            });
1438        }
1439
1440        tx.execute(
1441            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1442            params![topic.as_str()],
1443        )
1444        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1445        tx.execute(
1446            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1447            params![topic.as_str()],
1448        )
1449        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1450        let event_id = tx
1451            .query_row(
1452                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1453                params![topic.as_str()],
1454                |row| row.get::<_, i64>(0),
1455            )
1456            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1457            .and_then(sqlite_i64_to_event_id)?;
1458        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1459        let previous = tx
1460            .query_row(
1461                "SELECT event_id, kind, payload, headers, occurred_at_ms
1462                 FROM events
1463                 WHERE topic = ?1 AND event_id < ?2
1464                 ORDER BY event_id DESC
1465                 LIMIT 1",
1466                params![topic.as_str(), event_id_sql],
1467                |row| {
1468                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1469                    let headers: String = row.get(3)?;
1470                    Ok((
1471                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1472                        LogEvent {
1473                            kind: row.get(1)?,
1474                            payload: serde_json::from_slice(&payload).map_err(|error| {
1475                                rusqlite::Error::FromSqlConversionFailure(
1476                                    payload.len(),
1477                                    rusqlite::types::Type::Blob,
1478                                    Box::new(error),
1479                                )
1480                            })?,
1481                            headers: serde_json::from_str(&headers).map_err(|error| {
1482                                rusqlite::Error::FromSqlConversionFailure(
1483                                    headers.len(),
1484                                    rusqlite::types::Type::Text,
1485                                    Box::new(error),
1486                                )
1487                            })?,
1488                            occurred_at_ms: row.get(4)?,
1489                        },
1490                    ))
1491                },
1492            )
1493            .optional()
1494            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1495        let event = prepare_event_after(
1496            topic,
1497            event_id,
1498            previous
1499                .as_ref()
1500                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1501            event,
1502        )?;
1503        tx.execute(
1504            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1505             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1506            params![
1507                topic.as_str(),
1508                event_id_sql,
1509                event.kind,
1510                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1511                    "event log payload encode error: {error}"
1512                )))?,
1513                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1514                    "event log headers encode error: {error}"
1515                )))?,
1516                event.occurred_at_ms
1517            ],
1518        )
1519        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1520        tx.execute(
1521            "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
1522             VALUES (?1, ?2, ?3, ?4)",
1523            params![topic.as_str(), header, value, event_id_sql],
1524        )
1525        .map_err(|error| {
1526            LogError::Sqlite(format!("event log idempotency insert error: {error}"))
1527        })?;
1528        tx.commit()
1529            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1530        self.broadcasts
1531            .publish(topic, self.queue_depth, (event_id, event.clone()));
1532        Ok(AppendOutcome {
1533            event_id,
1534            event,
1535            inserted: true,
1536        })
1537    }
1538}
1539
1540impl EventLog for SqliteEventLog {
1541    fn describe(&self) -> EventLogDescription {
1542        EventLogDescription {
1543            backend: EventLogBackendKind::Sqlite,
1544            location: Some(self.path.clone()),
1545            size_bytes: Some(sqlite_size_bytes(&self.path)),
1546            queue_depth: self.queue_depth,
1547        }
1548    }
1549
1550    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1551        let mut connection = self
1552            .connection
1553            .lock()
1554            .expect("sqlite event log connection poisoned");
1555        let tx = connection
1556            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1557            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1558        tx.execute(
1559            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1560            params![topic.as_str()],
1561        )
1562        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1563        tx.execute(
1564            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1565            params![topic.as_str()],
1566        )
1567        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1568        let event_id = tx
1569            .query_row(
1570                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1571                params![topic.as_str()],
1572                |row| row.get::<_, i64>(0),
1573            )
1574            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1575            .and_then(sqlite_i64_to_event_id)?;
1576        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1577        let previous = tx
1578            .query_row(
1579                "SELECT event_id, kind, payload, headers, occurred_at_ms
1580                 FROM events
1581                 WHERE topic = ?1 AND event_id < ?2
1582                 ORDER BY event_id DESC
1583                 LIMIT 1",
1584                params![topic.as_str(), event_id_sql],
1585                |row| {
1586                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1587                    let headers: String = row.get(3)?;
1588                    Ok((
1589                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1590                        LogEvent {
1591                            kind: row.get(1)?,
1592                            payload: serde_json::from_slice(&payload).map_err(|error| {
1593                                rusqlite::Error::FromSqlConversionFailure(
1594                                    payload.len(),
1595                                    rusqlite::types::Type::Blob,
1596                                    Box::new(error),
1597                                )
1598                            })?,
1599                            headers: serde_json::from_str(&headers).map_err(|error| {
1600                                rusqlite::Error::FromSqlConversionFailure(
1601                                    headers.len(),
1602                                    rusqlite::types::Type::Text,
1603                                    Box::new(error),
1604                                )
1605                            })?,
1606                            occurred_at_ms: row.get(4)?,
1607                        },
1608                    ))
1609                },
1610            )
1611            .optional()
1612            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1613        let event = prepare_event_after(
1614            topic,
1615            event_id,
1616            previous
1617                .as_ref()
1618                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1619            event,
1620        )?;
1621        tx.execute(
1622            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1623             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1624            params![
1625                topic.as_str(),
1626                event_id_sql,
1627                event.kind,
1628                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1629                    "event log payload encode error: {error}"
1630                )))?,
1631                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1632                    "event log headers encode error: {error}"
1633                )))?,
1634                event.occurred_at_ms
1635            ],
1636        )
1637        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1638        tx.commit()
1639            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1640        self.broadcasts
1641            .publish(topic, self.queue_depth, (event_id, event.clone()));
1642        Ok(event_id)
1643    }
1644
1645    async fn flush(&self) -> Result<(), LogError> {
1646        let connection = self
1647            .connection
1648            .lock()
1649            .expect("sqlite event log connection poisoned");
1650        connection
1651            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1652            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1653        Ok(())
1654    }
1655
1656    async fn read_range(
1657        &self,
1658        topic: &Topic,
1659        from: Option<EventId>,
1660        limit: usize,
1661    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1662        let connection = self
1663            .connection
1664            .lock()
1665            .expect("sqlite event log connection poisoned");
1666        let mut statement = connection
1667            .prepare(
1668                "SELECT event_id, kind, payload, headers, occurred_at_ms
1669                 FROM events
1670                 WHERE topic = ?1 AND event_id > ?2
1671                 ORDER BY event_id ASC
1672                 LIMIT ?3",
1673            )
1674            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1675        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1676        let rows = statement
1677            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1678                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1679                let headers: String = row.get(3)?;
1680                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1681                Ok((
1682                    event_id,
1683                    LogEvent {
1684                        kind: row.get(1)?,
1685                        payload: serde_json::from_slice(&payload).map_err(|error| {
1686                            rusqlite::Error::FromSqlConversionFailure(
1687                                payload.len(),
1688                                rusqlite::types::Type::Blob,
1689                                Box::new(error),
1690                            )
1691                        })?,
1692                        headers: serde_json::from_str(&headers).map_err(|error| {
1693                            rusqlite::Error::FromSqlConversionFailure(
1694                                headers.len(),
1695                                rusqlite::types::Type::Text,
1696                                Box::new(error),
1697                            )
1698                        })?,
1699                        occurred_at_ms: row.get(4)?,
1700                    },
1701                ))
1702            })
1703            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1704        let mut events = Vec::new();
1705        for row in rows {
1706            events.push(
1707                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1708            );
1709        }
1710        Ok(events)
1711    }
1712
1713    async fn read_range_bytes(
1714        &self,
1715        topic: &Topic,
1716        from: Option<EventId>,
1717        limit: usize,
1718    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1719        let connection = self
1720            .connection
1721            .lock()
1722            .expect("sqlite event log connection poisoned");
1723        let mut statement = connection
1724            .prepare(
1725                "SELECT event_id, kind, payload, headers, occurred_at_ms
1726                 FROM events
1727                 WHERE topic = ?1 AND event_id > ?2
1728                 ORDER BY event_id ASC
1729                 LIMIT ?3",
1730            )
1731            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1732        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1733        let rows = statement
1734            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1735                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1736                let headers: String = row.get(3)?;
1737                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1738                Ok((
1739                    event_id,
1740                    LogEventBytes {
1741                        kind: row.get(1)?,
1742                        payload: Bytes::from(payload),
1743                        headers: serde_json::from_str(&headers).map_err(|error| {
1744                            rusqlite::Error::FromSqlConversionFailure(
1745                                headers.len(),
1746                                rusqlite::types::Type::Text,
1747                                Box::new(error),
1748                            )
1749                        })?,
1750                        occurred_at_ms: row.get(4)?,
1751                    },
1752                ))
1753            })
1754            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1755        let mut events = Vec::new();
1756        for row in rows {
1757            events.push(
1758                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1759            );
1760        }
1761        Ok(events)
1762    }
1763
1764    async fn subscribe(
1765        self: Arc<Self>,
1766        topic: &Topic,
1767        from: Option<EventId>,
1768    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1769        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1770        let history = self.read_range(topic, from, usize::MAX).await?;
1771        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1772    }
1773
1774    async fn ack(
1775        &self,
1776        topic: &Topic,
1777        consumer: &ConsumerId,
1778        up_to: EventId,
1779    ) -> Result<(), LogError> {
1780        let connection = self
1781            .connection
1782            .lock()
1783            .expect("sqlite event log connection poisoned");
1784        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1785        connection
1786            .execute(
1787                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1788                 VALUES (?1, ?2, ?3, ?4)
1789                 ON CONFLICT(topic, consumer_id)
1790                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1791                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1792            )
1793            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1794        Ok(())
1795    }
1796
1797    async fn consumer_cursor(
1798        &self,
1799        topic: &Topic,
1800        consumer: &ConsumerId,
1801    ) -> Result<Option<EventId>, LogError> {
1802        let connection = self
1803            .connection
1804            .lock()
1805            .expect("sqlite event log connection poisoned");
1806        connection
1807            .query_row(
1808                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1809                params![topic.as_str(), consumer.as_str()],
1810                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1811            )
1812            .optional()
1813            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1814    }
1815
1816    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1817        let connection = self
1818            .connection
1819            .lock()
1820            .expect("sqlite event log connection poisoned");
1821        connection
1822            .query_row(
1823                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1824                params![topic.as_str()],
1825                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1826            )
1827            .optional()
1828            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1829    }
1830
1831    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1832        let connection = self
1833            .connection
1834            .lock()
1835            .expect("sqlite event log connection poisoned");
1836        let before_sql = event_id_to_sqlite_i64(before)?;
1837        connection
1838            .execute(
1839                "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
1840                params![topic.as_str(), before_sql],
1841            )
1842            .map_err(|error| {
1843                LogError::Sqlite(format!("event log idempotency compact error: {error}"))
1844            })?;
1845        let removed = connection
1846            .execute(
1847                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1848                params![topic.as_str(), before_sql],
1849            )
1850            .map_err(|error| {
1851                LogError::Sqlite(format!("event log compact delete error: {error}"))
1852            })?;
1853        let remaining = connection
1854            .query_row(
1855                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1856                params![topic.as_str()],
1857                |row| row.get::<_, i64>(0),
1858            )
1859            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1860            .and_then(sqlite_i64_to_usize)?;
1861        let latest = connection
1862            .query_row(
1863                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1864                params![topic.as_str()],
1865                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1866            )
1867            .optional()
1868            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1869        connection
1870            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1871            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1872        Ok(CompactReport {
1873            removed,
1874            remaining,
1875            latest,
1876            checkpointed: true,
1877        })
1878    }
1879}
1880
1881fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1882    let candidate = PathBuf::from(value);
1883    if candidate.is_absolute() {
1884        candidate
1885    } else {
1886        base_dir.join(candidate)
1887    }
1888}
1889
1890fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1891    let encoded = serde_json::to_vec_pretty(payload)
1892        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1893    crate::atomic_io::atomic_write(path, &encoded)
1894        .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1895}
1896
1897fn sanitize_filename(value: &str) -> String {
1898    sanitize_topic_component(value)
1899}
1900
1901pub fn sanitize_topic_component(value: &str) -> String {
1902    value
1903        .chars()
1904        .map(|ch| {
1905            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1906                ch
1907            } else {
1908                '_'
1909            }
1910        })
1911        .collect()
1912}
1913
1914fn dir_size_bytes(path: &Path) -> u64 {
1915    if !path.exists() {
1916        return 0;
1917    }
1918    let mut total = 0;
1919    if let Ok(entries) = std::fs::read_dir(path) {
1920        for entry in entries.flatten() {
1921            let path = entry.path();
1922            if path.is_dir() {
1923                total += dir_size_bytes(&path);
1924            } else if let Ok(metadata) = entry.metadata() {
1925                total += metadata.len();
1926            }
1927        }
1928    }
1929    total
1930}
1931
1932fn sqlite_size_bytes(path: &Path) -> u64 {
1933    let mut total = file_size(path);
1934    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1935    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1936    total
1937}
1938
1939fn file_size(path: &Path) -> u64 {
1940    std::fs::metadata(path)
1941        .map(|metadata| metadata.len())
1942        .unwrap_or(0)
1943}
1944
1945fn sync_tree(root: &Path) -> Result<(), LogError> {
1946    if !root.exists() {
1947        return Ok(());
1948    }
1949    for entry in std::fs::read_dir(root)
1950        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1951    {
1952        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1953        let path = entry.path();
1954        if path.is_dir() {
1955            sync_tree(&path)?;
1956            continue;
1957        }
1958        std::fs::File::open(&path)
1959            .and_then(|file| file.sync_all())
1960            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1961    }
1962    Ok(())
1963}
1964
1965fn now_ms() -> i64 {
1966    std::time::SystemTime::now()
1967        .duration_since(std::time::UNIX_EPOCH)
1968        .map(|duration| duration.as_millis() as i64)
1969        .unwrap_or(0)
1970}
1971
1972fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1973    i64::try_from(event_id)
1974        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1975}
1976
1977fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1978    u64::try_from(value)
1979        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1980}
1981
1982fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1983    u64::try_from(value).map_err(|_| {
1984        rusqlite::Error::FromSqlConversionFailure(
1985            std::mem::size_of::<i64>(),
1986            rusqlite::types::Type::Integer,
1987            "sqlite event id is negative".into(),
1988        )
1989    })
1990}
1991
1992fn sqlite_json_bytes_for_row(
1993    row: &rusqlite::Row<'_>,
1994    index: usize,
1995    name: &str,
1996) -> rusqlite::Result<Vec<u8>> {
1997    let value = row.get_ref(index)?;
1998    match value {
1999        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
2000            Ok(bytes.to_vec())
2001        }
2002        other => Err(rusqlite::Error::InvalidColumnType(
2003            index,
2004            name.to_string(),
2005            other.data_type(),
2006        )),
2007    }
2008}
2009
2010fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
2011    usize::try_from(value)
2012        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
2013}
2014
2015#[cfg(test)]
2016mod tests {
2017    use super::*;
2018    use futures::StreamExt;
2019    use rand::{rngs::StdRng, RngExt, SeedableRng};
2020
2021    #[test]
2022    fn lazy_default_event_log_opens_on_first_access() {
2023        reset_active_event_log();
2024        let dir = tempfile::tempdir().unwrap();
2025
2026        install_lazy_default_for_base_dir(dir.path()).unwrap();
2027        assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2028        assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2029
2030        let _log = active_event_log().expect("lazy event log should open on demand");
2031        assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2032        assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2033        reset_active_event_log();
2034    }
2035
2036    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
2037        let topic = Topic::new("trigger.inbox").unwrap();
2038        for i in 0..10_000 {
2039            log.append(
2040                &topic,
2041                LogEvent::new("append", serde_json::json!({ "i": i })),
2042            )
2043            .await
2044            .unwrap();
2045        }
2046        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2047        assert_eq!(events.len(), 10_000);
2048        assert_eq!(events.first().unwrap().0, 1);
2049        assert_eq!(events.last().unwrap().0, 10_000);
2050    }
2051
2052    async fn exercise_idempotent_append(log: Arc<AnyEventLog>) {
2053        let topic = Topic::new("channel.tenant.default.pr").unwrap();
2054        let mut first_headers = BTreeMap::new();
2055        first_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2056        let first = log
2057            .append_idempotent_by_header(
2058                &topic,
2059                "harn.channel.id",
2060                "event-1",
2061                LogEvent::new("channel.emit", serde_json::json!({"n": 1}))
2062                    .with_headers(first_headers),
2063            )
2064            .await
2065            .unwrap();
2066        assert!(first.inserted);
2067        assert_eq!(first.event_id, 1);
2068
2069        let mut duplicate_headers = BTreeMap::new();
2070        duplicate_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2071        let duplicate = log
2072            .append_idempotent_by_header(
2073                &topic,
2074                "harn.channel.id",
2075                "event-1",
2076                LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2077                    .with_headers(duplicate_headers),
2078            )
2079            .await
2080            .unwrap();
2081        assert!(!duplicate.inserted);
2082        assert_eq!(duplicate.event_id, first.event_id);
2083        assert_eq!(duplicate.event.payload, serde_json::json!({"n": 1}));
2084
2085        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2086        assert_eq!(events.len(), 1);
2087        assert_eq!(events[0].0, first.event_id);
2088    }
2089
2090    #[tokio::test(flavor = "current_thread")]
2091    async fn memory_backend_supports_append_read_subscribe_and_compact() {
2092        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2093        exercise_basic_backend(log.clone()).await;
2094
2095        let topic = Topic::new("agent.transcript.demo").unwrap();
2096        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
2097        let first = log
2098            .append(
2099                &topic,
2100                LogEvent::new("message", serde_json::json!({"text":"one"})),
2101            )
2102            .await
2103            .unwrap();
2104        let second = log
2105            .append(
2106                &topic,
2107                LogEvent::new("message", serde_json::json!({"text":"two"})),
2108            )
2109            .await
2110            .unwrap();
2111        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
2112        assert_eq!(seen[0].as_ref().unwrap().0, first);
2113        assert_eq!(seen[1].as_ref().unwrap().0, second);
2114
2115        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
2116            .await
2117            .unwrap();
2118        let compact = log.compact(&topic, first).await.unwrap();
2119        assert_eq!(compact.removed, 1);
2120        assert_eq!(compact.remaining, 1);
2121    }
2122
2123    #[tokio::test(flavor = "current_thread")]
2124    async fn memory_backend_idempotent_append_returns_original_event() {
2125        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2126        exercise_idempotent_append(log).await;
2127    }
2128
2129    #[tokio::test(flavor = "current_thread")]
2130    async fn file_backend_persists_across_reopen_and_compacts() {
2131        let dir = tempfile::tempdir().unwrap();
2132        let topic = Topic::new("trigger.outbox").unwrap();
2133        let first_log = Arc::new(AnyEventLog::File(
2134            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2135        ));
2136        first_log
2137            .append(
2138                &topic,
2139                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
2140            )
2141            .await
2142            .unwrap();
2143        first_log
2144            .append(
2145                &topic,
2146                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
2147            )
2148            .await
2149            .unwrap();
2150        drop(first_log);
2151
2152        let reopened = Arc::new(AnyEventLog::File(
2153            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2154        ));
2155        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2156        assert_eq!(events.len(), 2);
2157        let compact = reopened.compact(&topic, 1).await.unwrap();
2158        assert_eq!(compact.removed, 1);
2159        assert_eq!(
2160            reopened
2161                .read_range(&topic, None, usize::MAX)
2162                .await
2163                .unwrap()
2164                .len(),
2165            1
2166        );
2167    }
2168
2169    #[tokio::test(flavor = "current_thread")]
2170    async fn file_backend_skips_torn_tail_on_restart() {
2171        let dir = tempfile::tempdir().unwrap();
2172        let topic = Topic::new("trigger.inbox").unwrap();
2173        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2174        first_log
2175            .append(
2176                &topic,
2177                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
2178            )
2179            .await
2180            .unwrap();
2181        drop(first_log);
2182
2183        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
2184        use std::io::Write as _;
2185        let mut file = std::fs::OpenOptions::new()
2186            .append(true)
2187            .open(&topic_path)
2188            .unwrap();
2189        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
2190        drop(file);
2191
2192        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2193        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2194        assert_eq!(events.len(), 1);
2195        assert_eq!(events[0].0, 1);
2196        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
2197    }
2198
2199    #[tokio::test(flavor = "current_thread")]
2200    async fn file_backend_idempotent_append_returns_original_event() {
2201        let dir = tempfile::tempdir().unwrap();
2202        let log = Arc::new(AnyEventLog::File(
2203            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2204        ));
2205        exercise_idempotent_append(log).await;
2206    }
2207
2208    #[tokio::test(flavor = "current_thread")]
2209    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
2210        let dir = tempfile::tempdir().unwrap();
2211        let path = dir.path().join("events.sqlite");
2212        let topic = Topic::new("daemon.demo.state").unwrap();
2213        let first_log = Arc::new(AnyEventLog::Sqlite(
2214            SqliteEventLog::open(path.clone(), 8).unwrap(),
2215        ));
2216        first_log
2217            .append(
2218                &topic,
2219                LogEvent::new("state", serde_json::json!({"state":"idle"})),
2220            )
2221            .await
2222            .unwrap();
2223        first_log
2224            .append(
2225                &topic,
2226                LogEvent::new("state", serde_json::json!({"state":"active"})),
2227            )
2228            .await
2229            .unwrap();
2230        drop(first_log);
2231
2232        let reopened = Arc::new(AnyEventLog::Sqlite(
2233            SqliteEventLog::open(path.clone(), 8).unwrap(),
2234        ));
2235        assert_eq!(
2236            reopened
2237                .read_range(&topic, None, usize::MAX)
2238                .await
2239                .unwrap()
2240                .len(),
2241            2
2242        );
2243        let compact = reopened.compact(&topic, 1).await.unwrap();
2244        assert!(compact.checkpointed);
2245        let wal = PathBuf::from(format!("{}-wal", path.display()));
2246        assert!(file_size(&wal) == 0 || !wal.exists());
2247    }
2248
2249    #[tokio::test(flavor = "current_thread")]
2250    async fn sqlite_backend_idempotent_append_returns_original_event() {
2251        let dir = tempfile::tempdir().unwrap();
2252        let path = dir.path().join("events.sqlite");
2253        let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2254        exercise_idempotent_append(log).await;
2255    }
2256
2257    #[tokio::test(flavor = "current_thread")]
2258    async fn sqlite_backend_compacts_idempotency_keys_with_events() {
2259        let dir = tempfile::tempdir().unwrap();
2260        let path = dir.path().join("events.sqlite");
2261        let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2262        let topic = Topic::new("channel.tenant.default.compacted").unwrap();
2263        let mut headers = BTreeMap::new();
2264        headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2265        let first = log
2266            .append_idempotent_by_header(
2267                &topic,
2268                "harn.channel.id",
2269                "event-1",
2270                LogEvent::new("channel.emit", serde_json::json!({"n": 1})).with_headers(headers),
2271            )
2272            .await
2273            .unwrap();
2274        log.compact(&topic, first.event_id).await.unwrap();
2275
2276        let mut replacement_headers = BTreeMap::new();
2277        replacement_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2278        let replacement = log
2279            .append_idempotent_by_header(
2280                &topic,
2281                "harn.channel.id",
2282                "event-1",
2283                LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2284                    .with_headers(replacement_headers),
2285            )
2286            .await
2287            .unwrap();
2288        assert!(replacement.inserted);
2289        assert!(replacement.event_id > first.event_id);
2290        assert_eq!(replacement.event.payload, serde_json::json!({"n": 2}));
2291    }
2292
2293    #[tokio::test(flavor = "current_thread")]
2294    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
2295        let dir = tempfile::tempdir().unwrap();
2296        let path = dir.path().join("events.sqlite");
2297        let topic = Topic::new("observability.action_graph").unwrap();
2298        let log = SqliteEventLog::open(path, 8).unwrap();
2299        let event_id = log
2300            .append(
2301                &topic,
2302                LogEvent::new(
2303                    "snapshot",
2304                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
2305                ),
2306            )
2307            .await
2308            .unwrap();
2309
2310        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2311        assert_eq!(events.len(), 1);
2312        assert_eq!(events[0].0, event_id);
2313        assert_eq!(
2314            events[0].1.payload_json().unwrap(),
2315            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
2316        );
2317    }
2318
2319    #[tokio::test(flavor = "current_thread")]
2320    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
2321        let dir = tempfile::tempdir().unwrap();
2322        let path = dir.path().join("events.sqlite");
2323        let topic = Topic::new("agent.transcript.legacy").unwrap();
2324        let log = SqliteEventLog::open(path, 8).unwrap();
2325        {
2326            let connection = log.connection.lock().unwrap();
2327            connection
2328                .execute(
2329                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
2330                    params![topic.as_str()],
2331                )
2332                .unwrap();
2333            connection
2334                .execute(
2335                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
2336                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
2337                    params![topic.as_str(), "{\"text\":\"old\"}"],
2338                )
2339                .unwrap();
2340        }
2341
2342        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2343        assert_eq!(
2344            events[0].1.payload_json().unwrap(),
2345            serde_json::json!({"text": "old"})
2346        );
2347        assert_eq!(
2348            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
2349            "legacy"
2350        );
2351    }
2352
2353    #[tokio::test(flavor = "current_thread")]
2354    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
2355        let (sender, rx) = broadcast::channel(2);
2356        for i in 0..10 {
2357            sender
2358                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
2359                .unwrap();
2360        }
2361        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2362
2363        match stream.next().await {
2364            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
2365            other => panic!("subscriber should surface lag, got {other:?}"),
2366        }
2367    }
2368
2369    #[tokio::test(flavor = "current_thread")]
2370    async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
2371        let (sender, rx) = broadcast::channel(2);
2372        let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2373        assert_eq!(sender.receiver_count(), 1);
2374        drop(stream);
2375
2376        tokio::time::timeout(std::time::Duration::from_millis(100), async {
2377            while sender.receiver_count() != 0 {
2378                tokio::task::yield_now().await;
2379            }
2380        })
2381        .await
2382        .expect("subscription receiver should close after consumer drop");
2383    }
2384
2385    #[tokio::test(flavor = "current_thread")]
2386    async fn randomized_reader_sequences_stay_monotonic() {
2387        let log = Arc::new(MemoryEventLog::new(32));
2388        let topic = Topic::new("fuzz.demo").unwrap();
2389        let mut readers = vec![
2390            log.clone().subscribe(&topic, None).await.unwrap(),
2391            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
2392            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
2393        ];
2394        let mut rng = StdRng::seed_from_u64(7);
2395        for _ in 0..64 {
2396            let value = rng.random_range(0..1000);
2397            log.append(
2398                &topic,
2399                LogEvent::new("rand", serde_json::json!({"value": value})),
2400            )
2401            .await
2402            .unwrap();
2403        }
2404
2405        let mut sequences = Vec::new();
2406        for reader in &mut readers {
2407            let mut ids = Vec::new();
2408            while let Some(item) = reader.next().await {
2409                match item {
2410                    Ok((event_id, _)) => {
2411                        ids.push(event_id);
2412                        if ids.len() >= 16 {
2413                            break;
2414                        }
2415                    }
2416                    Err(LogError::ConsumerLagged(_)) => break,
2417                    Err(error) => panic!("unexpected subscription error: {error}"),
2418                }
2419            }
2420            sequences.push(ids);
2421        }
2422
2423        for ids in sequences {
2424            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
2425        }
2426    }
2427}