Skip to main content

harn_vm/event_log/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap, VecDeque};
3use std::fmt;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::{Arc, Mutex};
7
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use rusqlite::{params, Connection, OptionalExtension};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{broadcast, mpsc};
13use tokio_stream::wrappers::ReceiverStream;
14
15pub type EventId = u64;
16
17pub const HARN_EVENT_LOG_BACKEND_ENV: &str = "HARN_EVENT_LOG_BACKEND";
18pub const HARN_EVENT_LOG_DIR_ENV: &str = "HARN_EVENT_LOG_DIR";
19pub const HARN_EVENT_LOG_SQLITE_PATH_ENV: &str = "HARN_EVENT_LOG_SQLITE_PATH";
20pub const HARN_EVENT_LOG_QUEUE_DEPTH_ENV: &str = "HARN_EVENT_LOG_QUEUE_DEPTH";
21
22#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23pub struct Topic(String);
24
25impl Topic {
26    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
27        let value = value.into();
28        if value.is_empty() {
29            return Err(LogError::InvalidTopic("topic cannot be empty".to_string()));
30        }
31        if !value
32            .chars()
33            .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-'))
34        {
35            return Err(LogError::InvalidTopic(format!(
36                "topic '{value}' contains unsupported characters"
37            )));
38        }
39        Ok(Self(value))
40    }
41
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl fmt::Display for Topic {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        self.0.fmt(f)
50    }
51}
52
53impl FromStr for Topic {
54    type Err = LogError;
55
56    fn from_str(s: &str) -> Result<Self, Self::Err> {
57        Self::new(s)
58    }
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62pub struct ConsumerId(String);
63
64impl ConsumerId {
65    pub fn new(value: impl Into<String>) -> Result<Self, LogError> {
66        let value = value.into();
67        if value.trim().is_empty() {
68            return Err(LogError::InvalidConsumer(
69                "consumer id cannot be empty".to_string(),
70            ));
71        }
72        Ok(Self(value))
73    }
74
75    pub fn as_str(&self) -> &str {
76        &self.0
77    }
78}
79
80impl fmt::Display for ConsumerId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        self.0.fmt(f)
83    }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum EventLogBackendKind {
89    Memory,
90    File,
91    Sqlite,
92}
93
94impl fmt::Display for EventLogBackendKind {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Self::Memory => write!(f, "memory"),
98            Self::File => write!(f, "file"),
99            Self::Sqlite => write!(f, "sqlite"),
100        }
101    }
102}
103
104impl FromStr for EventLogBackendKind {
105    type Err = LogError;
106
107    fn from_str(value: &str) -> Result<Self, Self::Err> {
108        match value.trim().to_ascii_lowercase().as_str() {
109            "memory" => Ok(Self::Memory),
110            "file" => Ok(Self::File),
111            "sqlite" => Ok(Self::Sqlite),
112            other => Err(LogError::Config(format!(
113                "unsupported event log backend '{other}'"
114            ))),
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
120pub struct LogEvent {
121    pub kind: String,
122    pub payload: serde_json::Value,
123    #[serde(default)]
124    pub headers: BTreeMap<String, String>,
125    pub occurred_at_ms: i64,
126}
127
128impl LogEvent {
129    pub fn new(kind: impl Into<String>, payload: serde_json::Value) -> Self {
130        Self {
131            kind: kind.into(),
132            payload,
133            headers: BTreeMap::new(),
134            occurred_at_ms: now_ms(),
135        }
136    }
137
138    pub fn with_headers(mut self, headers: BTreeMap<String, String>) -> Self {
139        self.headers = headers;
140        self
141    }
142
143    /// Apply the unified redaction policy to this event's headers and
144    /// payload. Backends are intentionally left unaware of redaction —
145    /// emitters that need scrubbed events call this before append, and
146    /// readers that materialize events for display can apply the policy
147    /// again as defense in depth.
148    pub fn redact_in_place(&mut self, policy: &crate::redact::RedactionPolicy) {
149        self.headers = policy.redact_headers(&self.headers);
150        policy.redact_json_in_place(&mut self.payload);
151    }
152}
153
154/// Serialized event payload form for large read paths.
155///
156/// `payload` contains the original JSON bytes for backends that can expose
157/// them directly. Callers that only need to forward or hash the payload can
158/// avoid materializing a `serde_json::Value`; callers that need structured
159/// access can opt in with `payload_json`.
160#[derive(Clone, Debug, PartialEq, Eq)]
161pub struct LogEventBytes {
162    pub kind: String,
163    pub payload: Bytes,
164    pub headers: BTreeMap<String, String>,
165    pub occurred_at_ms: i64,
166}
167
168impl LogEventBytes {
169    pub fn payload_json(&self) -> Result<serde_json::Value, LogError> {
170        serde_json::from_slice(&self.payload)
171            .map_err(|error| LogError::Serde(format!("event log payload parse error: {error}")))
172    }
173
174    pub fn into_log_event(self) -> Result<LogEvent, LogError> {
175        Ok(LogEvent {
176            kind: self.kind,
177            payload: serde_json::from_slice(&self.payload).map_err(|error| {
178                LogError::Serde(format!("event log payload parse error: {error}"))
179            })?,
180            headers: self.headers,
181            occurred_at_ms: self.occurred_at_ms,
182        })
183    }
184}
185
186impl TryFrom<LogEvent> for LogEventBytes {
187    type Error = LogError;
188
189    fn try_from(event: LogEvent) -> Result<Self, Self::Error> {
190        let payload = serde_json::to_vec(&event.payload)
191            .map_err(|error| LogError::Serde(format!("event log payload encode error: {error}")))?;
192        Ok(Self {
193            kind: event.kind,
194            payload: Bytes::from(payload),
195            headers: event.headers,
196            occurred_at_ms: event.occurred_at_ms,
197        })
198    }
199}
200
201#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
202pub struct CompactReport {
203    pub removed: usize,
204    pub remaining: usize,
205    pub latest: Option<EventId>,
206    pub checkpointed: bool,
207}
208
209#[derive(Clone, Debug, PartialEq)]
210pub struct AppendOutcome {
211    pub event_id: EventId,
212    pub event: LogEvent,
213    pub inserted: bool,
214}
215
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct EventLogDescription {
218    pub backend: EventLogBackendKind,
219    pub location: Option<PathBuf>,
220    pub size_bytes: Option<u64>,
221    pub queue_depth: usize,
222}
223
224#[derive(Debug)]
225pub enum LogError {
226    Config(String),
227    InvalidTopic(String),
228    InvalidConsumer(String),
229    Io(String),
230    Serde(String),
231    Sqlite(String),
232    ConsumerLagged(EventId),
233}
234
235impl fmt::Display for LogError {
236    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237        match self {
238            Self::Config(message)
239            | Self::InvalidTopic(message)
240            | Self::InvalidConsumer(message)
241            | Self::Io(message)
242            | Self::Serde(message)
243            | Self::Sqlite(message) => message.fmt(f),
244            Self::ConsumerLagged(last_id) => {
245                write!(f, "subscriber lagged behind after event {last_id}")
246            }
247        }
248    }
249}
250
251impl std::error::Error for LogError {}
252
253#[allow(async_fn_in_trait)]
254pub trait EventLog: Send + Sync {
255    fn describe(&self) -> EventLogDescription;
256
257    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError>;
258
259    async fn flush(&self) -> Result<(), LogError>;
260
261    /// Read events strictly after `from`. `None` starts from the
262    /// beginning of the topic.
263    async fn read_range(
264        &self,
265        topic: &Topic,
266        from: Option<EventId>,
267        limit: usize,
268    ) -> Result<Vec<(EventId, LogEvent)>, LogError>;
269
270    async fn read_range_bytes(
271        &self,
272        topic: &Topic,
273        from: Option<EventId>,
274        limit: usize,
275    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
276        let events = self.read_range(topic, from, limit).await?;
277        events
278            .into_iter()
279            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
280            .collect()
281    }
282
283    /// `async fn` keeps the ergonomic generic surface; the boxed stream
284    /// preserves dyn-dispatch for callers that store `Arc<dyn EventLog>`.
285    async fn subscribe(
286        self: Arc<Self>,
287        topic: &Topic,
288        from: Option<EventId>,
289    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError>;
290
291    async fn ack(
292        &self,
293        topic: &Topic,
294        consumer: &ConsumerId,
295        up_to: EventId,
296    ) -> Result<(), LogError>;
297
298    async fn consumer_cursor(
299        &self,
300        topic: &Topic,
301        consumer: &ConsumerId,
302    ) -> Result<Option<EventId>, LogError>;
303
304    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError>;
305
306    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError>;
307}
308
309#[derive(Clone, Debug)]
310pub struct EventLogConfig {
311    pub backend: EventLogBackendKind,
312    pub file_dir: PathBuf,
313    pub sqlite_path: PathBuf,
314    pub queue_depth: usize,
315}
316
317impl EventLogConfig {
318    pub fn for_base_dir(base_dir: &Path) -> Result<Self, LogError> {
319        let backend = std::env::var(HARN_EVENT_LOG_BACKEND_ENV)
320            .ok()
321            .map(|value| value.parse())
322            .transpose()?
323            .unwrap_or(EventLogBackendKind::Sqlite);
324        let queue_depth = std::env::var(HARN_EVENT_LOG_QUEUE_DEPTH_ENV)
325            .ok()
326            .and_then(|value| value.parse::<usize>().ok())
327            .unwrap_or(128)
328            .max(1);
329
330        let file_dir = match std::env::var(HARN_EVENT_LOG_DIR_ENV) {
331            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
332            _ => crate::runtime_paths::event_log_dir(base_dir),
333        };
334        let sqlite_path = match std::env::var(HARN_EVENT_LOG_SQLITE_PATH_ENV) {
335            Ok(value) if !value.trim().is_empty() => resolve_path(base_dir, &value),
336            _ => crate::runtime_paths::event_log_sqlite_path(base_dir),
337        };
338
339        Ok(Self {
340            backend,
341            file_dir,
342            sqlite_path,
343            queue_depth,
344        })
345    }
346
347    pub fn location(&self) -> Option<PathBuf> {
348        match self.backend {
349            EventLogBackendKind::Memory => None,
350            EventLogBackendKind::File => Some(self.file_dir.clone()),
351            EventLogBackendKind::Sqlite => Some(self.sqlite_path.clone()),
352        }
353    }
354}
355
356thread_local! {
357    static ACTIVE_EVENT_LOG: RefCell<Option<Arc<AnyEventLog>>> = const { RefCell::new(None) };
358    static PENDING_DEFAULT_EVENT_LOG: RefCell<Option<EventLogConfig>> = const { RefCell::new(None) };
359}
360
361pub fn install_default_for_base_dir(base_dir: &Path) -> Result<Arc<AnyEventLog>, LogError> {
362    let config = EventLogConfig::for_base_dir(base_dir)?;
363    let log = open_event_log(&config)?;
364    ACTIVE_EVENT_LOG.with(|slot| {
365        *slot.borrow_mut() = Some(log.clone());
366    });
367    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
368        *slot.borrow_mut() = None;
369    });
370    Ok(log)
371}
372
373pub fn install_lazy_default_for_base_dir(base_dir: &Path) -> Result<(), LogError> {
374    let config = EventLogConfig::for_base_dir(base_dir)?;
375    let has_active = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some());
376    if !has_active {
377        PENDING_DEFAULT_EVENT_LOG.with(|slot| {
378            *slot.borrow_mut() = Some(config);
379        });
380    }
381    Ok(())
382}
383
384pub fn install_memory_for_current_thread(queue_depth: usize) -> Arc<AnyEventLog> {
385    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(queue_depth.max(1))));
386    ACTIVE_EVENT_LOG.with(|slot| {
387        *slot.borrow_mut() = Some(log.clone());
388    });
389    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
390        *slot.borrow_mut() = None;
391    });
392    log
393}
394
395pub fn install_active_event_log(log: Arc<AnyEventLog>) -> Arc<AnyEventLog> {
396    ACTIVE_EVENT_LOG.with(|slot| {
397        *slot.borrow_mut() = Some(log.clone());
398    });
399    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
400        *slot.borrow_mut() = None;
401    });
402    log
403}
404
405pub fn active_event_log() -> Option<Arc<AnyEventLog>> {
406    if let Some(log) = ACTIVE_EVENT_LOG.with(|slot| slot.borrow().clone()) {
407        return Some(log);
408    }
409
410    let config = PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow_mut().take())?;
411    match open_event_log(&config) {
412        Ok(log) => Some(install_active_event_log(log)),
413        Err(error) => {
414            crate::events::log_warn("event_log.init", &error.to_string());
415            None
416        }
417    }
418}
419
420pub fn reset_active_event_log() {
421    ACTIVE_EVENT_LOG.with(|slot| {
422        *slot.borrow_mut() = None;
423    });
424    PENDING_DEFAULT_EVENT_LOG.with(|slot| {
425        *slot.borrow_mut() = None;
426    });
427}
428
429pub fn describe_for_base_dir(base_dir: &Path) -> Result<EventLogDescription, LogError> {
430    let config = EventLogConfig::for_base_dir(base_dir)?;
431    let description = match config.backend {
432        EventLogBackendKind::Memory => EventLogDescription {
433            backend: EventLogBackendKind::Memory,
434            location: None,
435            size_bytes: None,
436            queue_depth: config.queue_depth,
437        },
438        EventLogBackendKind::File => EventLogDescription {
439            backend: EventLogBackendKind::File,
440            size_bytes: Some(dir_size_bytes(&config.file_dir)),
441            location: Some(config.file_dir),
442            queue_depth: config.queue_depth,
443        },
444        EventLogBackendKind::Sqlite => EventLogDescription {
445            backend: EventLogBackendKind::Sqlite,
446            size_bytes: Some(sqlite_size_bytes(&config.sqlite_path)),
447            location: Some(config.sqlite_path),
448            queue_depth: config.queue_depth,
449        },
450    };
451    Ok(description)
452}
453
454pub fn open_event_log(config: &EventLogConfig) -> Result<Arc<AnyEventLog>, LogError> {
455    match config.backend {
456        EventLogBackendKind::Memory => Ok(Arc::new(AnyEventLog::Memory(MemoryEventLog::new(
457            config.queue_depth,
458        )))),
459        EventLogBackendKind::File => Ok(Arc::new(AnyEventLog::File(FileEventLog::open(
460            config.file_dir.clone(),
461            config.queue_depth,
462        )?))),
463        EventLogBackendKind::Sqlite => Ok(Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(
464            config.sqlite_path.clone(),
465            config.queue_depth,
466        )?))),
467    }
468}
469
470pub enum AnyEventLog {
471    Memory(MemoryEventLog),
472    File(FileEventLog),
473    Sqlite(SqliteEventLog),
474}
475
476impl AnyEventLog {
477    pub async fn topics(&self) -> Result<Vec<Topic>, LogError> {
478        match self {
479            Self::Memory(log) => log.topics().await,
480            Self::File(log) => log.topics(),
481            Self::Sqlite(log) => log.topics(),
482        }
483    }
484
485    pub async fn append_idempotent_by_header(
486        &self,
487        topic: &Topic,
488        header: &str,
489        value: &str,
490        event: LogEvent,
491    ) -> Result<AppendOutcome, LogError> {
492        if header.trim().is_empty() {
493            return Err(LogError::Config(
494                "idempotent append header cannot be empty".to_string(),
495            ));
496        }
497        match self {
498            Self::Memory(log) => {
499                log.append_idempotent_by_header(topic, header, value, event)
500                    .await
501            }
502            Self::File(log) => log.append_idempotent_by_header(topic, header, value, event),
503            Self::Sqlite(log) => log.append_idempotent_by_header(topic, header, value, event),
504        }
505    }
506}
507
508impl EventLog for AnyEventLog {
509    fn describe(&self) -> EventLogDescription {
510        match self {
511            Self::Memory(log) => log.describe(),
512            Self::File(log) => log.describe(),
513            Self::Sqlite(log) => log.describe(),
514        }
515    }
516
517    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
518        match self {
519            Self::Memory(log) => log.append(topic, event).await,
520            Self::File(log) => log.append(topic, event).await,
521            Self::Sqlite(log) => log.append(topic, event).await,
522        }
523    }
524
525    async fn flush(&self) -> Result<(), LogError> {
526        match self {
527            Self::Memory(log) => log.flush().await,
528            Self::File(log) => log.flush().await,
529            Self::Sqlite(log) => log.flush().await,
530        }
531    }
532
533    async fn read_range(
534        &self,
535        topic: &Topic,
536        from: Option<EventId>,
537        limit: usize,
538    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
539        match self {
540            Self::Memory(log) => log.read_range(topic, from, limit).await,
541            Self::File(log) => log.read_range(topic, from, limit).await,
542            Self::Sqlite(log) => log.read_range(topic, from, limit).await,
543        }
544    }
545
546    async fn read_range_bytes(
547        &self,
548        topic: &Topic,
549        from: Option<EventId>,
550        limit: usize,
551    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
552        match self {
553            Self::Memory(log) => log.read_range_bytes(topic, from, limit).await,
554            Self::File(log) => log.read_range_bytes(topic, from, limit).await,
555            Self::Sqlite(log) => log.read_range_bytes(topic, from, limit).await,
556        }
557    }
558
559    async fn subscribe(
560        self: Arc<Self>,
561        topic: &Topic,
562        from: Option<EventId>,
563    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
564        let (rx, queue_depth) = match self.as_ref() {
565            Self::Memory(log) => (
566                log.broadcasts.subscribe(topic, log.queue_depth),
567                log.queue_depth,
568            ),
569            Self::File(log) => (
570                log.broadcasts.subscribe(topic, log.queue_depth),
571                log.queue_depth,
572            ),
573            Self::Sqlite(log) => (
574                log.broadcasts.subscribe(topic, log.queue_depth),
575                log.queue_depth,
576            ),
577        };
578        let history = self.read_range(topic, from, usize::MAX).await?;
579        Ok(stream_from_broadcast(history, from, rx, queue_depth))
580    }
581
582    async fn ack(
583        &self,
584        topic: &Topic,
585        consumer: &ConsumerId,
586        up_to: EventId,
587    ) -> Result<(), LogError> {
588        match self {
589            Self::Memory(log) => log.ack(topic, consumer, up_to).await,
590            Self::File(log) => log.ack(topic, consumer, up_to).await,
591            Self::Sqlite(log) => log.ack(topic, consumer, up_to).await,
592        }
593    }
594
595    async fn consumer_cursor(
596        &self,
597        topic: &Topic,
598        consumer: &ConsumerId,
599    ) -> Result<Option<EventId>, LogError> {
600        match self {
601            Self::Memory(log) => log.consumer_cursor(topic, consumer).await,
602            Self::File(log) => log.consumer_cursor(topic, consumer).await,
603            Self::Sqlite(log) => log.consumer_cursor(topic, consumer).await,
604        }
605    }
606
607    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
608        match self {
609            Self::Memory(log) => log.latest(topic).await,
610            Self::File(log) => log.latest(topic).await,
611            Self::Sqlite(log) => log.latest(topic).await,
612        }
613    }
614
615    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
616        match self {
617            Self::Memory(log) => log.compact(topic, before).await,
618            Self::File(log) => log.compact(topic, before).await,
619            Self::Sqlite(log) => log.compact(topic, before).await,
620        }
621    }
622}
623
624#[derive(Default)]
625struct BroadcastMap(Mutex<HashMap<String, broadcast::Sender<(EventId, LogEvent)>>>);
626
627impl BroadcastMap {
628    fn subscribe(
629        &self,
630        topic: &Topic,
631        capacity: usize,
632    ) -> broadcast::Receiver<(EventId, LogEvent)> {
633        self.sender(topic, capacity).subscribe()
634    }
635
636    fn publish(&self, topic: &Topic, capacity: usize, record: (EventId, LogEvent)) {
637        let _ = self.sender(topic, capacity).send(record);
638    }
639
640    fn sender(&self, topic: &Topic, capacity: usize) -> broadcast::Sender<(EventId, LogEvent)> {
641        let mut map = self.0.lock().expect("event log broadcast map poisoned");
642        map.entry(topic.as_str().to_string())
643            .or_insert_with(|| broadcast::channel(capacity.max(1)).0)
644            .clone()
645    }
646}
647
648fn stream_from_broadcast(
649    history: Vec<(EventId, LogEvent)>,
650    from: Option<EventId>,
651    mut live_rx: broadcast::Receiver<(EventId, LogEvent)>,
652    queue_depth: usize,
653) -> BoxStream<'static, Result<(EventId, LogEvent), LogError>> {
654    let (tx, rx) = mpsc::channel(queue_depth.max(1));
655    // Run the subscription forwarder as a tokio task rather than a detached
656    // OS thread. A dedicated thread running under `futures::executor::block_on`
657    // is invisible to the tokio runtime, so tests that use `start_paused = true`
658    // race against auto-advanced timers while the thread catches up in real
659    // time. Spawning on tokio makes the forwarder participate in runtime
660    // scheduling (including paused-time quiescence) and ties its lifetime to
661    // the runtime's shutdown.
662    tokio::spawn(async move {
663        let mut last_seen = from.unwrap_or(0);
664        for (event_id, event) in history {
665            last_seen = event_id;
666            if tx.send(Ok((event_id, event))).await.is_err() {
667                return;
668            }
669        }
670
671        loop {
672            tokio::select! {
673                _ = tx.closed() => return,
674                received = live_rx.recv() => {
675                    match received {
676                        Ok((event_id, event)) if event_id > last_seen => {
677                            last_seen = event_id;
678                            if tx.send(Ok((event_id, event))).await.is_err() {
679                                return;
680                            }
681                        }
682                        Ok(_) => {}
683                        Err(broadcast::error::RecvError::Closed) => return,
684                        Err(broadcast::error::RecvError::Lagged(_)) => {
685                            let _ = tx.try_send(Err(LogError::ConsumerLagged(last_seen)));
686                            return;
687                        }
688                    }
689                }
690            }
691        }
692    });
693    Box::pin(ReceiverStream::new(rx))
694}
695
696fn prepare_event_after(
697    topic: &Topic,
698    event_id: EventId,
699    previous: Option<(EventId, &LogEvent)>,
700    event: LogEvent,
701) -> Result<LogEvent, LogError> {
702    let previous_hash = previous
703        .map(|(previous_id, previous_event)| {
704            crate::provenance::event_record_hash_from_headers(
705                topic.as_str(),
706                previous_id,
707                previous_event,
708            )
709        })
710        .transpose()?;
711    crate::provenance::prepare_event_for_append(topic.as_str(), event_id, previous_hash, event)
712}
713
714#[derive(Default)]
715struct MemoryState {
716    topics: HashMap<String, VecDeque<(EventId, LogEvent)>>,
717    latest: HashMap<String, EventId>,
718    consumers: HashMap<(String, String), EventId>,
719}
720
721pub struct MemoryEventLog {
722    state: Mutex<MemoryState>,
723    broadcasts: BroadcastMap,
724    queue_depth: usize,
725}
726
727impl MemoryEventLog {
728    pub fn new(queue_depth: usize) -> Self {
729        Self {
730            state: Mutex::new(MemoryState::default()),
731            broadcasts: BroadcastMap::default(),
732            queue_depth: queue_depth.max(1),
733        }
734    }
735
736    fn state(&self) -> Result<std::sync::MutexGuard<'_, MemoryState>, LogError> {
737        self.state
738            .lock()
739            .map_err(|_| LogError::Io("memory event log state poisoned".to_string()))
740    }
741
742    async fn topics(&self) -> Result<Vec<Topic>, LogError> {
743        let state = self.state()?;
744        let mut topics = state
745            .topics
746            .keys()
747            .map(|topic| Topic::new(topic.clone()))
748            .collect::<Result<Vec<_>, _>>()?;
749        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
750        Ok(topics)
751    }
752
753    async fn append_idempotent_by_header(
754        &self,
755        topic: &Topic,
756        header: &str,
757        value: &str,
758        event: LogEvent,
759    ) -> Result<AppendOutcome, LogError> {
760        let mut state = self.state()?;
761        if let Some((event_id, existing)) = state
762            .topics
763            .get(topic.as_str())
764            .into_iter()
765            .flat_map(|events| events.iter())
766            .find(|(_, event)| {
767                event
768                    .headers
769                    .get(header)
770                    .is_some_and(|found| found == value)
771            })
772        {
773            return Ok(AppendOutcome {
774                event_id: *event_id,
775                event: existing.clone(),
776                inserted: false,
777            });
778        }
779
780        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
781        let previous = state
782            .topics
783            .get(topic.as_str())
784            .and_then(|events| events.back())
785            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
786        let event = prepare_event_after(topic, event_id, previous, event)?;
787        state.latest.insert(topic.as_str().to_string(), event_id);
788        state
789            .topics
790            .entry(topic.as_str().to_string())
791            .or_default()
792            .push_back((event_id, event.clone()));
793        drop(state);
794        self.broadcasts
795            .publish(topic, self.queue_depth, (event_id, event.clone()));
796        Ok(AppendOutcome {
797            event_id,
798            event,
799            inserted: true,
800        })
801    }
802}
803
804impl EventLog for MemoryEventLog {
805    fn describe(&self) -> EventLogDescription {
806        EventLogDescription {
807            backend: EventLogBackendKind::Memory,
808            location: None,
809            size_bytes: None,
810            queue_depth: self.queue_depth,
811        }
812    }
813
814    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
815        let mut state = self.state()?;
816        let event_id = state.latest.get(topic.as_str()).copied().unwrap_or(0) + 1;
817        let previous = state
818            .topics
819            .get(topic.as_str())
820            .and_then(|events| events.back())
821            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
822        let event = prepare_event_after(topic, event_id, previous, event)?;
823        state.latest.insert(topic.as_str().to_string(), event_id);
824        state
825            .topics
826            .entry(topic.as_str().to_string())
827            .or_default()
828            .push_back((event_id, event.clone()));
829        drop(state);
830        self.broadcasts
831            .publish(topic, self.queue_depth, (event_id, event));
832        Ok(event_id)
833    }
834
835    async fn flush(&self) -> Result<(), LogError> {
836        Ok(())
837    }
838
839    async fn read_range(
840        &self,
841        topic: &Topic,
842        from: Option<EventId>,
843        limit: usize,
844    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
845        let from = from.unwrap_or(0);
846        let state = self.state()?;
847        let events = state
848            .topics
849            .get(topic.as_str())
850            .into_iter()
851            .flat_map(|events| events.iter())
852            .filter(|(event_id, _)| *event_id > from)
853            .take(limit)
854            .map(|(event_id, event)| (*event_id, event.clone()))
855            .collect();
856        Ok(events)
857    }
858
859    async fn subscribe(
860        self: Arc<Self>,
861        topic: &Topic,
862        from: Option<EventId>,
863    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
864        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
865        let history = self.read_range(topic, from, usize::MAX).await?;
866        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
867    }
868
869    async fn ack(
870        &self,
871        topic: &Topic,
872        consumer: &ConsumerId,
873        up_to: EventId,
874    ) -> Result<(), LogError> {
875        let mut state = self.state()?;
876        state.consumers.insert(
877            (topic.as_str().to_string(), consumer.as_str().to_string()),
878            up_to,
879        );
880        Ok(())
881    }
882
883    async fn consumer_cursor(
884        &self,
885        topic: &Topic,
886        consumer: &ConsumerId,
887    ) -> Result<Option<EventId>, LogError> {
888        let state = self.state()?;
889        Ok(state
890            .consumers
891            .get(&(topic.as_str().to_string(), consumer.as_str().to_string()))
892            .copied())
893    }
894
895    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
896        let state = self.state()?;
897        Ok(state.latest.get(topic.as_str()).copied())
898    }
899
900    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
901        let mut state = self.state()?;
902        let Some(events) = state.topics.get_mut(topic.as_str()) else {
903            return Ok(CompactReport::default());
904        };
905        let removed = events
906            .iter()
907            .take_while(|(event_id, _)| *event_id <= before)
908            .count();
909        for _ in 0..removed {
910            events.pop_front();
911        }
912        Ok(CompactReport {
913            removed,
914            remaining: events.len(),
915            latest: state.latest.get(topic.as_str()).copied(),
916            checkpointed: false,
917        })
918    }
919}
920
921#[derive(Serialize, Deserialize)]
922struct FileRecord {
923    id: EventId,
924    event: LogEvent,
925}
926
927pub struct FileEventLog {
928    root: PathBuf,
929    latest_ids: Mutex<HashMap<String, EventId>>,
930    write_lock: Mutex<()>,
931    broadcasts: BroadcastMap,
932    queue_depth: usize,
933}
934
935impl FileEventLog {
936    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
937        std::fs::create_dir_all(root.join("topics"))
938            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
939        std::fs::create_dir_all(root.join("consumers"))
940            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
941        Ok(Self {
942            root,
943            latest_ids: Mutex::new(HashMap::new()),
944            write_lock: Mutex::new(()),
945            broadcasts: BroadcastMap::default(),
946            queue_depth: queue_depth.max(1),
947        })
948    }
949
950    fn topic_path(&self, topic: &Topic) -> PathBuf {
951        self.root
952            .join("topics")
953            .join(format!("{}.jsonl", topic.as_str()))
954    }
955
956    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
957        self.root.join("consumers").join(format!(
958            "{}__{}.json",
959            topic.as_str(),
960            sanitize_filename(consumer.as_str())
961        ))
962    }
963
964    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
965        if let Some(event_id) = self
966            .latest_ids
967            .lock()
968            .expect("file event log latest ids poisoned")
969            .get(topic.as_str())
970            .copied()
971        {
972            return Ok(event_id);
973        }
974
975        let mut latest = 0;
976        let path = self.topic_path(topic);
977        if path.is_file() {
978            for record in read_file_records(&path)? {
979                latest = record.id;
980            }
981        }
982        self.latest_ids
983            .lock()
984            .expect("file event log latest ids poisoned")
985            .insert(topic.as_str().to_string(), latest);
986        Ok(latest)
987    }
988
989    fn read_range_sync(
990        &self,
991        topic: &Topic,
992        from: Option<EventId>,
993        limit: usize,
994    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
995        let path = self.topic_path(topic);
996        if !path.is_file() {
997            return Ok(Vec::new());
998        }
999        let from = from.unwrap_or(0);
1000        let mut events = Vec::new();
1001        for record in read_file_records(&path)? {
1002            if record.id > from {
1003                events.push((record.id, record.event));
1004            }
1005            if events.len() >= limit {
1006                break;
1007            }
1008        }
1009        Ok(events)
1010    }
1011
1012    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1013        let topics_dir = self.root.join("topics");
1014        if !topics_dir.is_dir() {
1015            return Ok(Vec::new());
1016        }
1017        let mut topics = Vec::new();
1018        for entry in std::fs::read_dir(&topics_dir)
1019            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
1020        {
1021            let entry = entry
1022                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
1023            let path = entry.path();
1024            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
1025                continue;
1026            }
1027            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1028                continue;
1029            };
1030            topics.push(Topic::new(stem.to_string())?);
1031        }
1032        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
1033        Ok(topics)
1034    }
1035
1036    fn append_idempotent_by_header(
1037        &self,
1038        topic: &Topic,
1039        header: &str,
1040        value: &str,
1041        event: LogEvent,
1042    ) -> Result<AppendOutcome, LogError> {
1043        let _guard = self
1044            .write_lock
1045            .lock()
1046            .expect("file event log write lock poisoned");
1047        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1048        if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
1049            event
1050                .headers
1051                .get(header)
1052                .is_some_and(|found| found == value)
1053        }) {
1054            return Ok(AppendOutcome {
1055                event_id: *event_id,
1056                event: existing.clone(),
1057                inserted: false,
1058            });
1059        }
1060
1061        let next_id = self.latest_id_for_topic(topic)? + 1;
1062        let previous = existing_events
1063            .last()
1064            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1065        let event = prepare_event_after(topic, next_id, previous, event)?;
1066        self.append_record_locked(topic, next_id, event)
1067    }
1068
1069    fn append_record_locked(
1070        &self,
1071        topic: &Topic,
1072        event_id: EventId,
1073        event: LogEvent,
1074    ) -> Result<AppendOutcome, LogError> {
1075        let record = FileRecord {
1076            id: event_id,
1077            event: event.clone(),
1078        };
1079        let path = self.topic_path(topic);
1080        if let Some(parent) = path.parent() {
1081            std::fs::create_dir_all(parent)
1082                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1083        }
1084        let line = serde_json::to_string(&record)
1085            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1086        use std::io::Write as _;
1087        let mut file = std::fs::OpenOptions::new()
1088            .create(true)
1089            .append(true)
1090            .open(&path)
1091            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1092        writeln!(file, "{line}")
1093            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
1094        self.latest_ids
1095            .lock()
1096            .expect("file event log latest ids poisoned")
1097            .insert(topic.as_str().to_string(), event_id);
1098        self.broadcasts
1099            .publish(topic, self.queue_depth, (event_id, event.clone()));
1100        Ok(AppendOutcome {
1101            event_id,
1102            event,
1103            inserted: true,
1104        })
1105    }
1106}
1107
1108fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
1109    let file = std::fs::File::open(path)
1110        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
1111    let mut reader = std::io::BufReader::new(file);
1112    let mut records = Vec::new();
1113    let mut line = Vec::new();
1114    loop {
1115        line.clear();
1116        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
1117            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
1118        if bytes_read == 0 {
1119            break;
1120        }
1121        if line.iter().all(u8::is_ascii_whitespace) {
1122            continue;
1123        }
1124        let complete_line = line.ends_with(b"\n");
1125        match serde_json::from_slice::<FileRecord>(&line) {
1126            Ok(record) => records.push(record),
1127            Err(_) if !complete_line => break,
1128            Err(error) => {
1129                return Err(LogError::Serde(format!("event log parse error: {error}")));
1130            }
1131        }
1132    }
1133    Ok(records)
1134}
1135
1136impl EventLog for FileEventLog {
1137    fn describe(&self) -> EventLogDescription {
1138        EventLogDescription {
1139            backend: EventLogBackendKind::File,
1140            location: Some(self.root.clone()),
1141            size_bytes: Some(dir_size_bytes(&self.root)),
1142            queue_depth: self.queue_depth,
1143        }
1144    }
1145
1146    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1147        let _guard = self
1148            .write_lock
1149            .lock()
1150            .expect("file event log write lock poisoned");
1151        let next_id = self.latest_id_for_topic(topic)? + 1;
1152        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
1153        let previous = existing_events
1154            .last()
1155            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
1156        let event = prepare_event_after(topic, next_id, previous, event)?;
1157        self.append_record_locked(topic, next_id, event)
1158            .map(|outcome| outcome.event_id)
1159    }
1160
1161    async fn flush(&self) -> Result<(), LogError> {
1162        sync_tree(&self.root)
1163    }
1164
1165    async fn read_range(
1166        &self,
1167        topic: &Topic,
1168        from: Option<EventId>,
1169        limit: usize,
1170    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1171        self.read_range_sync(topic, from, limit)
1172    }
1173
1174    async fn read_range_bytes(
1175        &self,
1176        topic: &Topic,
1177        from: Option<EventId>,
1178        limit: usize,
1179    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1180        self.read_range_sync(topic, from, limit)?
1181            .into_iter()
1182            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
1183            .collect()
1184    }
1185
1186    async fn subscribe(
1187        self: Arc<Self>,
1188        topic: &Topic,
1189        from: Option<EventId>,
1190    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1191        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1192        let history = self.read_range_sync(topic, from, usize::MAX)?;
1193        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1194    }
1195
1196    async fn ack(
1197        &self,
1198        topic: &Topic,
1199        consumer: &ConsumerId,
1200        up_to: EventId,
1201    ) -> Result<(), LogError> {
1202        let path = self.consumer_path(topic, consumer);
1203        let payload = serde_json::json!({
1204            "topic": topic.as_str(),
1205            "consumer_id": consumer.as_str(),
1206            "cursor": up_to,
1207            "updated_at_ms": now_ms(),
1208        });
1209        write_json_atomically(&path, &payload)
1210    }
1211
1212    async fn consumer_cursor(
1213        &self,
1214        topic: &Topic,
1215        consumer: &ConsumerId,
1216    ) -> Result<Option<EventId>, LogError> {
1217        let path = self.consumer_path(topic, consumer);
1218        if !path.is_file() {
1219            return Ok(None);
1220        }
1221        let raw = std::fs::read_to_string(&path)
1222            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
1223        let payload: serde_json::Value = serde_json::from_str(&raw)
1224            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
1225        let cursor = payload
1226            .get("cursor")
1227            .and_then(serde_json::Value::as_u64)
1228            .ok_or_else(|| {
1229                LogError::Serde("event log consumer record missing numeric cursor".to_string())
1230            })?;
1231        Ok(Some(cursor))
1232    }
1233
1234    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1235        let latest = self.latest_id_for_topic(topic)?;
1236        if latest == 0 {
1237            Ok(None)
1238        } else {
1239            Ok(Some(latest))
1240        }
1241    }
1242
1243    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1244        let _guard = self
1245            .write_lock
1246            .lock()
1247            .expect("file event log write lock poisoned");
1248        let path = self.topic_path(topic);
1249        if !path.is_file() {
1250            return Ok(CompactReport::default());
1251        }
1252        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
1253        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
1254        if retained.is_empty() {
1255            let _ = std::fs::remove_file(&path);
1256        } else {
1257            crate::atomic_io::atomic_write_with(&path, |writer| {
1258                use std::io::Write as _;
1259                for (event_id, event) in &retained {
1260                    let line = serde_json::to_string(&FileRecord {
1261                        id: *event_id,
1262                        event: event.clone(),
1263                    })
1264                    .map_err(|error| std::io::Error::other(error.to_string()))?;
1265                    writeln!(writer, "{line}")?;
1266                }
1267                Ok(())
1268            })
1269            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
1270        }
1271        let latest = retained.last().map(|(event_id, _)| *event_id);
1272        self.latest_ids
1273            .lock()
1274            .expect("file event log latest ids poisoned")
1275            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
1276        Ok(CompactReport {
1277            removed,
1278            remaining: retained.len(),
1279            latest,
1280            checkpointed: false,
1281        })
1282    }
1283}
1284
1285pub struct SqliteEventLog {
1286    path: PathBuf,
1287    connection: Mutex<Connection>,
1288    broadcasts: BroadcastMap,
1289    queue_depth: usize,
1290}
1291
1292impl SqliteEventLog {
1293    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
1294        if let Some(parent) = path.parent() {
1295            std::fs::create_dir_all(parent)
1296                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
1297        }
1298        let connection = Connection::open(&path)
1299            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
1300        // Set busy_timeout BEFORE the WAL pragma so SQLite waits out transient
1301        // SQLITE_BUSY from a previous test's connection that hasn't finished
1302        // dropping yet (parallel `cargo test` on the same process, distinct
1303        // paths, still contends on SQLite's own global mutex under WAL-mode
1304        // promotion). Without this, `journal_mode = WAL` fails fast with
1305        // "database is locked" instead of retrying.
1306        connection
1307            .busy_timeout(std::time::Duration::from_secs(5))
1308            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
1309        connection
1310            .pragma_update(None, "journal_mode", "WAL")
1311            .map_err(|error| LogError::Sqlite(format!("event log WAL pragma error: {error}")))?;
1312        connection
1313            .pragma_update(None, "synchronous", "NORMAL")
1314            .map_err(|error| LogError::Sqlite(format!("event log sync pragma error: {error}")))?;
1315        connection
1316            .execute_batch(
1317                "CREATE TABLE IF NOT EXISTS topic_heads (
1318                    topic TEXT PRIMARY KEY,
1319                    last_id INTEGER NOT NULL
1320                );
1321                CREATE TABLE IF NOT EXISTS events (
1322                    topic TEXT NOT NULL,
1323                    event_id INTEGER NOT NULL,
1324                    kind TEXT NOT NULL,
1325                    payload BLOB NOT NULL,
1326                    headers TEXT NOT NULL,
1327                    occurred_at_ms INTEGER NOT NULL,
1328                    PRIMARY KEY (topic, event_id)
1329                );
1330                CREATE TABLE IF NOT EXISTS consumers (
1331                    topic TEXT NOT NULL,
1332                    consumer_id TEXT NOT NULL,
1333                    cursor INTEGER NOT NULL,
1334                    updated_at_ms INTEGER NOT NULL,
1335                    PRIMARY KEY (topic, consumer_id)
1336                );
1337                CREATE TABLE IF NOT EXISTS event_idempotency_keys (
1338                    topic TEXT NOT NULL,
1339                    key TEXT NOT NULL,
1340                    value TEXT NOT NULL,
1341                    event_id INTEGER NOT NULL,
1342                    PRIMARY KEY (topic, key, value),
1343                    FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
1344                );",
1345            )
1346            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
1347        Ok(Self {
1348            path,
1349            connection: Mutex::new(connection),
1350            broadcasts: BroadcastMap::default(),
1351            queue_depth: queue_depth.max(1),
1352        })
1353    }
1354
1355    fn topics(&self) -> Result<Vec<Topic>, LogError> {
1356        let connection = self
1357            .connection
1358            .lock()
1359            .expect("sqlite event log connection poisoned");
1360        let mut statement = connection
1361            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
1362            .map_err(|error| {
1363                LogError::Sqlite(format!("event log topics prepare error: {error}"))
1364            })?;
1365        let rows = statement
1366            .query_map([], |row| row.get::<_, String>(0))
1367            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
1368        let mut topics = Vec::new();
1369        for row in rows {
1370            topics.push(Topic::new(row.map_err(|error| {
1371                LogError::Sqlite(format!("event log topic row error: {error}"))
1372            })?)?);
1373        }
1374        Ok(topics)
1375    }
1376
1377    fn append_idempotent_by_header(
1378        &self,
1379        topic: &Topic,
1380        header: &str,
1381        value: &str,
1382        event: LogEvent,
1383    ) -> Result<AppendOutcome, LogError> {
1384        let mut connection = self
1385            .connection
1386            .lock()
1387            .expect("sqlite event log connection poisoned");
1388        let tx = connection
1389            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1390            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1391
1392        let existing = tx
1393            .query_row(
1394                "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
1395                 FROM event_idempotency_keys k
1396                 JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
1397                 WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3",
1398                params![topic.as_str(), header, value],
1399                |row| {
1400                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1401                    let headers: String = row.get(3)?;
1402                    Ok((
1403                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1404                        LogEvent {
1405                            kind: row.get(1)?,
1406                            payload: serde_json::from_slice(&payload).map_err(|error| {
1407                                rusqlite::Error::FromSqlConversionFailure(
1408                                    payload.len(),
1409                                    rusqlite::types::Type::Blob,
1410                                    Box::new(error),
1411                                )
1412                            })?,
1413                            headers: serde_json::from_str(&headers).map_err(|error| {
1414                                rusqlite::Error::FromSqlConversionFailure(
1415                                    headers.len(),
1416                                    rusqlite::types::Type::Text,
1417                                    Box::new(error),
1418                                )
1419                            })?,
1420                            occurred_at_ms: row.get(4)?,
1421                        },
1422                    ))
1423                },
1424            )
1425            .optional()
1426            .map_err(|error| {
1427                LogError::Sqlite(format!("event log idempotency read error: {error}"))
1428            })?;
1429
1430        if let Some((event_id, event)) = existing {
1431            return Ok(AppendOutcome {
1432                event_id,
1433                event,
1434                inserted: false,
1435            });
1436        }
1437
1438        tx.execute(
1439            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1440            params![topic.as_str()],
1441        )
1442        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1443        tx.execute(
1444            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1445            params![topic.as_str()],
1446        )
1447        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1448        let event_id = tx
1449            .query_row(
1450                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1451                params![topic.as_str()],
1452                |row| row.get::<_, i64>(0),
1453            )
1454            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1455            .and_then(sqlite_i64_to_event_id)?;
1456        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1457        let previous = tx
1458            .query_row(
1459                "SELECT event_id, kind, payload, headers, occurred_at_ms
1460                 FROM events
1461                 WHERE topic = ?1 AND event_id < ?2
1462                 ORDER BY event_id DESC
1463                 LIMIT 1",
1464                params![topic.as_str(), event_id_sql],
1465                |row| {
1466                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1467                    let headers: String = row.get(3)?;
1468                    Ok((
1469                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1470                        LogEvent {
1471                            kind: row.get(1)?,
1472                            payload: serde_json::from_slice(&payload).map_err(|error| {
1473                                rusqlite::Error::FromSqlConversionFailure(
1474                                    payload.len(),
1475                                    rusqlite::types::Type::Blob,
1476                                    Box::new(error),
1477                                )
1478                            })?,
1479                            headers: serde_json::from_str(&headers).map_err(|error| {
1480                                rusqlite::Error::FromSqlConversionFailure(
1481                                    headers.len(),
1482                                    rusqlite::types::Type::Text,
1483                                    Box::new(error),
1484                                )
1485                            })?,
1486                            occurred_at_ms: row.get(4)?,
1487                        },
1488                    ))
1489                },
1490            )
1491            .optional()
1492            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1493        let event = prepare_event_after(
1494            topic,
1495            event_id,
1496            previous
1497                .as_ref()
1498                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1499            event,
1500        )?;
1501        tx.execute(
1502            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1503             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1504            params![
1505                topic.as_str(),
1506                event_id_sql,
1507                event.kind,
1508                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1509                    "event log payload encode error: {error}"
1510                )))?,
1511                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1512                    "event log headers encode error: {error}"
1513                )))?,
1514                event.occurred_at_ms
1515            ],
1516        )
1517        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1518        tx.execute(
1519            "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
1520             VALUES (?1, ?2, ?3, ?4)",
1521            params![topic.as_str(), header, value, event_id_sql],
1522        )
1523        .map_err(|error| {
1524            LogError::Sqlite(format!("event log idempotency insert error: {error}"))
1525        })?;
1526        tx.commit()
1527            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1528        self.broadcasts
1529            .publish(topic, self.queue_depth, (event_id, event.clone()));
1530        Ok(AppendOutcome {
1531            event_id,
1532            event,
1533            inserted: true,
1534        })
1535    }
1536}
1537
1538impl EventLog for SqliteEventLog {
1539    fn describe(&self) -> EventLogDescription {
1540        EventLogDescription {
1541            backend: EventLogBackendKind::Sqlite,
1542            location: Some(self.path.clone()),
1543            size_bytes: Some(sqlite_size_bytes(&self.path)),
1544            queue_depth: self.queue_depth,
1545        }
1546    }
1547
1548    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
1549        let mut connection = self
1550            .connection
1551            .lock()
1552            .expect("sqlite event log connection poisoned");
1553        let tx = connection
1554            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1555            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
1556        tx.execute(
1557            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
1558            params![topic.as_str()],
1559        )
1560        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
1561        tx.execute(
1562            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
1563            params![topic.as_str()],
1564        )
1565        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
1566        let event_id = tx
1567            .query_row(
1568                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1569                params![topic.as_str()],
1570                |row| row.get::<_, i64>(0),
1571            )
1572            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
1573            .and_then(sqlite_i64_to_event_id)?;
1574        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
1575        let previous = tx
1576            .query_row(
1577                "SELECT event_id, kind, payload, headers, occurred_at_ms
1578                 FROM events
1579                 WHERE topic = ?1 AND event_id < ?2
1580                 ORDER BY event_id DESC
1581                 LIMIT 1",
1582                params![topic.as_str(), event_id_sql],
1583                |row| {
1584                    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1585                    let headers: String = row.get(3)?;
1586                    Ok((
1587                        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
1588                        LogEvent {
1589                            kind: row.get(1)?,
1590                            payload: serde_json::from_slice(&payload).map_err(|error| {
1591                                rusqlite::Error::FromSqlConversionFailure(
1592                                    payload.len(),
1593                                    rusqlite::types::Type::Blob,
1594                                    Box::new(error),
1595                                )
1596                            })?,
1597                            headers: serde_json::from_str(&headers).map_err(|error| {
1598                                rusqlite::Error::FromSqlConversionFailure(
1599                                    headers.len(),
1600                                    rusqlite::types::Type::Text,
1601                                    Box::new(error),
1602                                )
1603                            })?,
1604                            occurred_at_ms: row.get(4)?,
1605                        },
1606                    ))
1607                },
1608            )
1609            .optional()
1610            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
1611        let event = prepare_event_after(
1612            topic,
1613            event_id,
1614            previous
1615                .as_ref()
1616                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
1617            event,
1618        )?;
1619        tx.execute(
1620            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
1621             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1622            params![
1623                topic.as_str(),
1624                event_id_sql,
1625                event.kind,
1626                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
1627                    "event log payload encode error: {error}"
1628                )))?,
1629                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
1630                    "event log headers encode error: {error}"
1631                )))?,
1632                event.occurred_at_ms
1633            ],
1634        )
1635        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
1636        tx.commit()
1637            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
1638        self.broadcasts
1639            .publish(topic, self.queue_depth, (event_id, event.clone()));
1640        Ok(event_id)
1641    }
1642
1643    async fn flush(&self) -> Result<(), LogError> {
1644        let connection = self
1645            .connection
1646            .lock()
1647            .expect("sqlite event log connection poisoned");
1648        connection
1649            .execute_batch("PRAGMA wal_checkpoint(FULL);")
1650            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1651        Ok(())
1652    }
1653
1654    async fn read_range(
1655        &self,
1656        topic: &Topic,
1657        from: Option<EventId>,
1658        limit: usize,
1659    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
1660        let connection = self
1661            .connection
1662            .lock()
1663            .expect("sqlite event log connection poisoned");
1664        let mut statement = connection
1665            .prepare(
1666                "SELECT event_id, kind, payload, headers, occurred_at_ms
1667                 FROM events
1668                 WHERE topic = ?1 AND event_id > ?2
1669                 ORDER BY event_id ASC
1670                 LIMIT ?3",
1671            )
1672            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1673        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1674        let rows = statement
1675            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1676                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1677                let headers: String = row.get(3)?;
1678                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1679                Ok((
1680                    event_id,
1681                    LogEvent {
1682                        kind: row.get(1)?,
1683                        payload: serde_json::from_slice(&payload).map_err(|error| {
1684                            rusqlite::Error::FromSqlConversionFailure(
1685                                payload.len(),
1686                                rusqlite::types::Type::Blob,
1687                                Box::new(error),
1688                            )
1689                        })?,
1690                        headers: serde_json::from_str(&headers).map_err(|error| {
1691                            rusqlite::Error::FromSqlConversionFailure(
1692                                headers.len(),
1693                                rusqlite::types::Type::Text,
1694                                Box::new(error),
1695                            )
1696                        })?,
1697                        occurred_at_ms: row.get(4)?,
1698                    },
1699                ))
1700            })
1701            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1702        let mut events = Vec::new();
1703        for row in rows {
1704            events.push(
1705                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1706            );
1707        }
1708        Ok(events)
1709    }
1710
1711    async fn read_range_bytes(
1712        &self,
1713        topic: &Topic,
1714        from: Option<EventId>,
1715        limit: usize,
1716    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
1717        let connection = self
1718            .connection
1719            .lock()
1720            .expect("sqlite event log connection poisoned");
1721        let mut statement = connection
1722            .prepare(
1723                "SELECT event_id, kind, payload, headers, occurred_at_ms
1724                 FROM events
1725                 WHERE topic = ?1 AND event_id > ?2
1726                 ORDER BY event_id ASC
1727                 LIMIT ?3",
1728            )
1729            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
1730        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
1731        let rows = statement
1732            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
1733                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
1734                let headers: String = row.get(3)?;
1735                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
1736                Ok((
1737                    event_id,
1738                    LogEventBytes {
1739                        kind: row.get(1)?,
1740                        payload: Bytes::from(payload),
1741                        headers: serde_json::from_str(&headers).map_err(|error| {
1742                            rusqlite::Error::FromSqlConversionFailure(
1743                                headers.len(),
1744                                rusqlite::types::Type::Text,
1745                                Box::new(error),
1746                            )
1747                        })?,
1748                        occurred_at_ms: row.get(4)?,
1749                    },
1750                ))
1751            })
1752            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
1753        let mut events = Vec::new();
1754        for row in rows {
1755            events.push(
1756                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
1757            );
1758        }
1759        Ok(events)
1760    }
1761
1762    async fn subscribe(
1763        self: Arc<Self>,
1764        topic: &Topic,
1765        from: Option<EventId>,
1766    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
1767        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
1768        let history = self.read_range(topic, from, usize::MAX).await?;
1769        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
1770    }
1771
1772    async fn ack(
1773        &self,
1774        topic: &Topic,
1775        consumer: &ConsumerId,
1776        up_to: EventId,
1777    ) -> Result<(), LogError> {
1778        let connection = self
1779            .connection
1780            .lock()
1781            .expect("sqlite event log connection poisoned");
1782        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
1783        connection
1784            .execute(
1785                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
1786                 VALUES (?1, ?2, ?3, ?4)
1787                 ON CONFLICT(topic, consumer_id)
1788                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
1789                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
1790            )
1791            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
1792        Ok(())
1793    }
1794
1795    async fn consumer_cursor(
1796        &self,
1797        topic: &Topic,
1798        consumer: &ConsumerId,
1799    ) -> Result<Option<EventId>, LogError> {
1800        let connection = self
1801            .connection
1802            .lock()
1803            .expect("sqlite event log connection poisoned");
1804        connection
1805            .query_row(
1806                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
1807                params![topic.as_str(), consumer.as_str()],
1808                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1809            )
1810            .optional()
1811            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
1812    }
1813
1814    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
1815        let connection = self
1816            .connection
1817            .lock()
1818            .expect("sqlite event log connection poisoned");
1819        connection
1820            .query_row(
1821                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1822                params![topic.as_str()],
1823                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1824            )
1825            .optional()
1826            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
1827    }
1828
1829    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
1830        let connection = self
1831            .connection
1832            .lock()
1833            .expect("sqlite event log connection poisoned");
1834        let before_sql = event_id_to_sqlite_i64(before)?;
1835        connection
1836            .execute(
1837                "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
1838                params![topic.as_str(), before_sql],
1839            )
1840            .map_err(|error| {
1841                LogError::Sqlite(format!("event log idempotency compact error: {error}"))
1842            })?;
1843        let removed = connection
1844            .execute(
1845                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
1846                params![topic.as_str(), before_sql],
1847            )
1848            .map_err(|error| {
1849                LogError::Sqlite(format!("event log compact delete error: {error}"))
1850            })?;
1851        let remaining = connection
1852            .query_row(
1853                "SELECT COUNT(*) FROM events WHERE topic = ?1",
1854                params![topic.as_str()],
1855                |row| row.get::<_, i64>(0),
1856            )
1857            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
1858            .and_then(sqlite_i64_to_usize)?;
1859        let latest = connection
1860            .query_row(
1861                "SELECT last_id FROM topic_heads WHERE topic = ?1",
1862                params![topic.as_str()],
1863                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
1864            )
1865            .optional()
1866            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
1867        connection
1868            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1869            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
1870        Ok(CompactReport {
1871            removed,
1872            remaining,
1873            latest,
1874            checkpointed: true,
1875        })
1876    }
1877}
1878
1879fn resolve_path(base_dir: &Path, value: &str) -> PathBuf {
1880    let candidate = PathBuf::from(value);
1881    if candidate.is_absolute() {
1882        candidate
1883    } else {
1884        base_dir.join(candidate)
1885    }
1886}
1887
1888fn write_json_atomically(path: &Path, payload: &serde_json::Value) -> Result<(), LogError> {
1889    let encoded = serde_json::to_vec_pretty(payload)
1890        .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
1891    crate::atomic_io::atomic_write(path, &encoded)
1892        .map_err(|error| LogError::Io(format!("event log write error: {error}")))
1893}
1894
1895fn sanitize_filename(value: &str) -> String {
1896    sanitize_topic_component(value)
1897}
1898
1899pub fn sanitize_topic_component(value: &str) -> String {
1900    value
1901        .chars()
1902        .map(|ch| {
1903            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
1904                ch
1905            } else {
1906                '_'
1907            }
1908        })
1909        .collect()
1910}
1911
1912fn dir_size_bytes(path: &Path) -> u64 {
1913    if !path.exists() {
1914        return 0;
1915    }
1916    let mut total = 0;
1917    if let Ok(entries) = std::fs::read_dir(path) {
1918        for entry in entries.flatten() {
1919            let path = entry.path();
1920            if path.is_dir() {
1921                total += dir_size_bytes(&path);
1922            } else if let Ok(metadata) = entry.metadata() {
1923                total += metadata.len();
1924            }
1925        }
1926    }
1927    total
1928}
1929
1930fn sqlite_size_bytes(path: &Path) -> u64 {
1931    let mut total = file_size(path);
1932    total += file_size(&PathBuf::from(format!("{}-wal", path.display())));
1933    total += file_size(&PathBuf::from(format!("{}-shm", path.display())));
1934    total
1935}
1936
1937fn file_size(path: &Path) -> u64 {
1938    std::fs::metadata(path)
1939        .map(|metadata| metadata.len())
1940        .unwrap_or(0)
1941}
1942
1943fn sync_tree(root: &Path) -> Result<(), LogError> {
1944    if !root.exists() {
1945        return Ok(());
1946    }
1947    for entry in std::fs::read_dir(root)
1948        .map_err(|error| LogError::Io(format!("event log read_dir error: {error}")))?
1949    {
1950        let entry = entry.map_err(|error| LogError::Io(format!("event log dir error: {error}")))?;
1951        let path = entry.path();
1952        if path.is_dir() {
1953            sync_tree(&path)?;
1954            continue;
1955        }
1956        std::fs::File::open(&path)
1957            .and_then(|file| file.sync_all())
1958            .map_err(|error| LogError::Io(format!("event log sync error: {error}")))?;
1959    }
1960    Ok(())
1961}
1962
1963fn now_ms() -> i64 {
1964    std::time::SystemTime::now()
1965        .duration_since(std::time::UNIX_EPOCH)
1966        .map(|duration| duration.as_millis() as i64)
1967        .unwrap_or(0)
1968}
1969
1970fn event_id_to_sqlite_i64(event_id: EventId) -> Result<i64, LogError> {
1971    i64::try_from(event_id)
1972        .map_err(|_| LogError::Sqlite(format!("event id {event_id} exceeds sqlite INTEGER range")))
1973}
1974
1975fn sqlite_i64_to_event_id(value: i64) -> Result<EventId, LogError> {
1976    u64::try_from(value)
1977        .map_err(|_| LogError::Sqlite(format!("sqlite event id {value} is negative")))
1978}
1979
1980fn sqlite_i64_to_event_id_for_row(value: i64) -> rusqlite::Result<EventId> {
1981    u64::try_from(value).map_err(|_| {
1982        rusqlite::Error::FromSqlConversionFailure(
1983            std::mem::size_of::<i64>(),
1984            rusqlite::types::Type::Integer,
1985            "sqlite event id is negative".into(),
1986        )
1987    })
1988}
1989
1990fn sqlite_json_bytes_for_row(
1991    row: &rusqlite::Row<'_>,
1992    index: usize,
1993    name: &str,
1994) -> rusqlite::Result<Vec<u8>> {
1995    let value = row.get_ref(index)?;
1996    match value {
1997        rusqlite::types::ValueRef::Text(bytes) | rusqlite::types::ValueRef::Blob(bytes) => {
1998            Ok(bytes.to_vec())
1999        }
2000        other => Err(rusqlite::Error::InvalidColumnType(
2001            index,
2002            name.to_string(),
2003            other.data_type(),
2004        )),
2005    }
2006}
2007
2008fn sqlite_i64_to_usize(value: i64) -> Result<usize, LogError> {
2009    usize::try_from(value)
2010        .map_err(|_| LogError::Sqlite(format!("sqlite count {value} is negative")))
2011}
2012
2013#[cfg(test)]
2014mod tests {
2015    use super::*;
2016    use futures::StreamExt;
2017    use rand::{rngs::StdRng, RngExt, SeedableRng};
2018
2019    #[test]
2020    fn lazy_default_event_log_opens_on_first_access() {
2021        reset_active_event_log();
2022        let dir = tempfile::tempdir().unwrap();
2023
2024        install_lazy_default_for_base_dir(dir.path()).unwrap();
2025        assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2026        assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2027
2028        let _log = active_event_log().expect("lazy event log should open on demand");
2029        assert!(ACTIVE_EVENT_LOG.with(|slot| slot.borrow().is_some()));
2030        assert!(PENDING_DEFAULT_EVENT_LOG.with(|slot| slot.borrow().is_none()));
2031        reset_active_event_log();
2032    }
2033
2034    async fn exercise_basic_backend(log: Arc<AnyEventLog>) {
2035        let topic = Topic::new("trigger.inbox").unwrap();
2036        for i in 0..10_000 {
2037            log.append(
2038                &topic,
2039                LogEvent::new("append", serde_json::json!({ "i": i })),
2040            )
2041            .await
2042            .unwrap();
2043        }
2044        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2045        assert_eq!(events.len(), 10_000);
2046        assert_eq!(events.first().unwrap().0, 1);
2047        assert_eq!(events.last().unwrap().0, 10_000);
2048    }
2049
2050    async fn exercise_idempotent_append(log: Arc<AnyEventLog>) {
2051        let topic = Topic::new("channel.tenant.default.pr").unwrap();
2052        let mut first_headers = BTreeMap::new();
2053        first_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2054        let first = log
2055            .append_idempotent_by_header(
2056                &topic,
2057                "harn.channel.id",
2058                "event-1",
2059                LogEvent::new("channel.emit", serde_json::json!({"n": 1}))
2060                    .with_headers(first_headers),
2061            )
2062            .await
2063            .unwrap();
2064        assert!(first.inserted);
2065        assert_eq!(first.event_id, 1);
2066
2067        let mut duplicate_headers = BTreeMap::new();
2068        duplicate_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2069        let duplicate = log
2070            .append_idempotent_by_header(
2071                &topic,
2072                "harn.channel.id",
2073                "event-1",
2074                LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2075                    .with_headers(duplicate_headers),
2076            )
2077            .await
2078            .unwrap();
2079        assert!(!duplicate.inserted);
2080        assert_eq!(duplicate.event_id, first.event_id);
2081        assert_eq!(duplicate.event.payload, serde_json::json!({"n": 1}));
2082
2083        let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
2084        assert_eq!(events.len(), 1);
2085        assert_eq!(events[0].0, first.event_id);
2086    }
2087
2088    #[tokio::test(flavor = "current_thread")]
2089    async fn memory_backend_supports_append_read_subscribe_and_compact() {
2090        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2091        exercise_basic_backend(log.clone()).await;
2092
2093        let topic = Topic::new("agent.transcript.demo").unwrap();
2094        let mut stream = log.clone().subscribe(&topic, None).await.unwrap();
2095        let first = log
2096            .append(
2097                &topic,
2098                LogEvent::new("message", serde_json::json!({"text":"one"})),
2099            )
2100            .await
2101            .unwrap();
2102        let second = log
2103            .append(
2104                &topic,
2105                LogEvent::new("message", serde_json::json!({"text":"two"})),
2106            )
2107            .await
2108            .unwrap();
2109        let seen: Vec<_> = stream.by_ref().take(2).collect().await;
2110        assert_eq!(seen[0].as_ref().unwrap().0, first);
2111        assert_eq!(seen[1].as_ref().unwrap().0, second);
2112
2113        log.ack(&topic, &ConsumerId::new("worker").unwrap(), second)
2114            .await
2115            .unwrap();
2116        let compact = log.compact(&topic, first).await.unwrap();
2117        assert_eq!(compact.removed, 1);
2118        assert_eq!(compact.remaining, 1);
2119    }
2120
2121    #[tokio::test(flavor = "current_thread")]
2122    async fn memory_backend_idempotent_append_returns_original_event() {
2123        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(8)));
2124        exercise_idempotent_append(log).await;
2125    }
2126
2127    #[tokio::test(flavor = "current_thread")]
2128    async fn file_backend_persists_across_reopen_and_compacts() {
2129        let dir = tempfile::tempdir().unwrap();
2130        let topic = Topic::new("trigger.outbox").unwrap();
2131        let first_log = Arc::new(AnyEventLog::File(
2132            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2133        ));
2134        first_log
2135            .append(
2136                &topic,
2137                LogEvent::new("dispatch_pending", serde_json::json!({"n":1})),
2138            )
2139            .await
2140            .unwrap();
2141        first_log
2142            .append(
2143                &topic,
2144                LogEvent::new("dispatch_complete", serde_json::json!({"n":2})),
2145            )
2146            .await
2147            .unwrap();
2148        drop(first_log);
2149
2150        let reopened = Arc::new(AnyEventLog::File(
2151            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2152        ));
2153        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2154        assert_eq!(events.len(), 2);
2155        let compact = reopened.compact(&topic, 1).await.unwrap();
2156        assert_eq!(compact.removed, 1);
2157        assert_eq!(
2158            reopened
2159                .read_range(&topic, None, usize::MAX)
2160                .await
2161                .unwrap()
2162                .len(),
2163            1
2164        );
2165    }
2166
2167    #[tokio::test(flavor = "current_thread")]
2168    async fn file_backend_skips_torn_tail_on_restart() {
2169        let dir = tempfile::tempdir().unwrap();
2170        let topic = Topic::new("trigger.inbox").unwrap();
2171        let first_log = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2172        first_log
2173            .append(
2174                &topic,
2175                LogEvent::new("accepted", serde_json::json!({"id": "ok"})),
2176            )
2177            .await
2178            .unwrap();
2179        drop(first_log);
2180
2181        let topic_path = dir.path().join("topics").join("trigger.inbox.jsonl");
2182        use std::io::Write as _;
2183        let mut file = std::fs::OpenOptions::new()
2184            .append(true)
2185            .open(&topic_path)
2186            .unwrap();
2187        write!(file, "{{\"id\":2,\"event\":{{\"kind\":\"partial\"").unwrap();
2188        drop(file);
2189
2190        let reopened = FileEventLog::open(dir.path().to_path_buf(), 8).unwrap();
2191        let events = reopened.read_range(&topic, None, usize::MAX).await.unwrap();
2192        assert_eq!(events.len(), 1);
2193        assert_eq!(events[0].0, 1);
2194        assert_eq!(reopened.latest(&topic).await.unwrap(), Some(1));
2195    }
2196
2197    #[tokio::test(flavor = "current_thread")]
2198    async fn file_backend_idempotent_append_returns_original_event() {
2199        let dir = tempfile::tempdir().unwrap();
2200        let log = Arc::new(AnyEventLog::File(
2201            FileEventLog::open(dir.path().to_path_buf(), 8).unwrap(),
2202        ));
2203        exercise_idempotent_append(log).await;
2204    }
2205
2206    #[tokio::test(flavor = "current_thread")]
2207    async fn sqlite_backend_persists_and_checkpoints_after_compact() {
2208        let dir = tempfile::tempdir().unwrap();
2209        let path = dir.path().join("events.sqlite");
2210        let topic = Topic::new("daemon.demo.state").unwrap();
2211        let first_log = Arc::new(AnyEventLog::Sqlite(
2212            SqliteEventLog::open(path.clone(), 8).unwrap(),
2213        ));
2214        first_log
2215            .append(
2216                &topic,
2217                LogEvent::new("state", serde_json::json!({"state":"idle"})),
2218            )
2219            .await
2220            .unwrap();
2221        first_log
2222            .append(
2223                &topic,
2224                LogEvent::new("state", serde_json::json!({"state":"active"})),
2225            )
2226            .await
2227            .unwrap();
2228        drop(first_log);
2229
2230        let reopened = Arc::new(AnyEventLog::Sqlite(
2231            SqliteEventLog::open(path.clone(), 8).unwrap(),
2232        ));
2233        assert_eq!(
2234            reopened
2235                .read_range(&topic, None, usize::MAX)
2236                .await
2237                .unwrap()
2238                .len(),
2239            2
2240        );
2241        let compact = reopened.compact(&topic, 1).await.unwrap();
2242        assert!(compact.checkpointed);
2243        let wal = PathBuf::from(format!("{}-wal", path.display()));
2244        assert!(file_size(&wal) == 0 || !wal.exists());
2245    }
2246
2247    #[tokio::test(flavor = "current_thread")]
2248    async fn sqlite_backend_idempotent_append_returns_original_event() {
2249        let dir = tempfile::tempdir().unwrap();
2250        let path = dir.path().join("events.sqlite");
2251        let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2252        exercise_idempotent_append(log).await;
2253    }
2254
2255    #[tokio::test(flavor = "current_thread")]
2256    async fn sqlite_backend_compacts_idempotency_keys_with_events() {
2257        let dir = tempfile::tempdir().unwrap();
2258        let path = dir.path().join("events.sqlite");
2259        let log = Arc::new(AnyEventLog::Sqlite(SqliteEventLog::open(path, 8).unwrap()));
2260        let topic = Topic::new("channel.tenant.default.compacted").unwrap();
2261        let mut headers = BTreeMap::new();
2262        headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2263        let first = log
2264            .append_idempotent_by_header(
2265                &topic,
2266                "harn.channel.id",
2267                "event-1",
2268                LogEvent::new("channel.emit", serde_json::json!({"n": 1})).with_headers(headers),
2269            )
2270            .await
2271            .unwrap();
2272        log.compact(&topic, first.event_id).await.unwrap();
2273
2274        let mut replacement_headers = BTreeMap::new();
2275        replacement_headers.insert("harn.channel.id".to_string(), "event-1".to_string());
2276        let replacement = log
2277            .append_idempotent_by_header(
2278                &topic,
2279                "harn.channel.id",
2280                "event-1",
2281                LogEvent::new("channel.emit", serde_json::json!({"n": 2}))
2282                    .with_headers(replacement_headers),
2283            )
2284            .await
2285            .unwrap();
2286        assert!(replacement.inserted);
2287        assert!(replacement.event_id > first.event_id);
2288        assert_eq!(replacement.event.payload, serde_json::json!({"n": 2}));
2289    }
2290
2291    #[tokio::test(flavor = "current_thread")]
2292    async fn sqlite_bytes_read_preserves_payload_without_value_materialization() {
2293        let dir = tempfile::tempdir().unwrap();
2294        let path = dir.path().join("events.sqlite");
2295        let topic = Topic::new("observability.action_graph").unwrap();
2296        let log = SqliteEventLog::open(path, 8).unwrap();
2297        let event_id = log
2298            .append(
2299                &topic,
2300                LogEvent::new(
2301                    "snapshot",
2302                    serde_json::json!({"nodes":[{"id":"a"}],"edges":[]}),
2303                ),
2304            )
2305            .await
2306            .unwrap();
2307
2308        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2309        assert_eq!(events.len(), 1);
2310        assert_eq!(events[0].0, event_id);
2311        assert_eq!(
2312            events[0].1.payload_json().unwrap(),
2313            serde_json::json!({"nodes":[{"id":"a"}],"edges":[]})
2314        );
2315    }
2316
2317    #[tokio::test(flavor = "current_thread")]
2318    async fn sqlite_bytes_read_accepts_legacy_text_payload_rows() {
2319        let dir = tempfile::tempdir().unwrap();
2320        let path = dir.path().join("events.sqlite");
2321        let topic = Topic::new("agent.transcript.legacy").unwrap();
2322        let log = SqliteEventLog::open(path, 8).unwrap();
2323        {
2324            let connection = log.connection.lock().unwrap();
2325            connection
2326                .execute(
2327                    "INSERT INTO topic_heads(topic, last_id) VALUES (?1, 1)",
2328                    params![topic.as_str()],
2329                )
2330                .unwrap();
2331            connection
2332                .execute(
2333                    "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
2334                     VALUES (?1, 1, 'legacy', ?2, '{}', 1)",
2335                    params![topic.as_str(), "{\"text\":\"old\"}"],
2336                )
2337                .unwrap();
2338        }
2339
2340        let events = log.read_range_bytes(&topic, None, 1).await.unwrap();
2341        assert_eq!(
2342            events[0].1.payload_json().unwrap(),
2343            serde_json::json!({"text": "old"})
2344        );
2345        assert_eq!(
2346            log.read_range(&topic, None, 1).await.unwrap()[0].1.kind,
2347            "legacy"
2348        );
2349    }
2350
2351    #[tokio::test(flavor = "current_thread")]
2352    async fn broadcast_forwarder_reports_lag_when_receiver_overflows() {
2353        let (sender, rx) = broadcast::channel(2);
2354        for i in 0..10 {
2355            sender
2356                .send((i + 1, LogEvent::new("tick", serde_json::json!({"i": i}))))
2357                .unwrap();
2358        }
2359        let mut stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2360
2361        match stream.next().await {
2362            Some(Err(LogError::ConsumerLagged(last_seen))) => assert_eq!(last_seen, 0),
2363            other => panic!("subscriber should surface lag, got {other:?}"),
2364        }
2365    }
2366
2367    #[tokio::test(flavor = "current_thread")]
2368    async fn broadcast_forwarder_stops_when_consumer_drops_stream() {
2369        let (sender, rx) = broadcast::channel(2);
2370        let stream = stream_from_broadcast(Vec::new(), None, rx, 2);
2371        assert_eq!(sender.receiver_count(), 1);
2372        drop(stream);
2373
2374        tokio::time::timeout(std::time::Duration::from_millis(100), async {
2375            while sender.receiver_count() != 0 {
2376                tokio::task::yield_now().await;
2377            }
2378        })
2379        .await
2380        .expect("subscription receiver should close after consumer drop");
2381    }
2382
2383    #[tokio::test(flavor = "current_thread")]
2384    async fn randomized_reader_sequences_stay_monotonic() {
2385        let log = Arc::new(MemoryEventLog::new(32));
2386        let topic = Topic::new("fuzz.demo").unwrap();
2387        let mut readers = vec![
2388            log.clone().subscribe(&topic, None).await.unwrap(),
2389            log.clone().subscribe(&topic, Some(5)).await.unwrap(),
2390            log.clone().subscribe(&topic, Some(10)).await.unwrap(),
2391        ];
2392        let mut rng = StdRng::seed_from_u64(7);
2393        for _ in 0..64 {
2394            let value = rng.random_range(0..1000);
2395            log.append(
2396                &topic,
2397                LogEvent::new("rand", serde_json::json!({"value": value})),
2398            )
2399            .await
2400            .unwrap();
2401        }
2402
2403        let mut sequences = Vec::new();
2404        for reader in &mut readers {
2405            let mut ids = Vec::new();
2406            while let Some(item) = reader.next().await {
2407                match item {
2408                    Ok((event_id, _)) => {
2409                        ids.push(event_id);
2410                        if ids.len() >= 16 {
2411                            break;
2412                        }
2413                    }
2414                    Err(LogError::ConsumerLagged(_)) => break,
2415                    Err(error) => panic!("unexpected subscription error: {error}"),
2416                }
2417            }
2418            sequences.push(ids);
2419        }
2420
2421        for ids in sequences {
2422            assert!(ids.windows(2).all(|pair| pair[0] < pair[1]));
2423        }
2424    }
2425}