Skip to main content

entelix_session/
log.rs

1// Mutex guards in this module are short-lived and never held across an
2// `await`; the `tightening` lint's recommendation to early-drop them
3// would force opaque closures over every method body.
4#![allow(clippy::significant_drop_tightening)]
5
6//! `SessionLog` trait — the persistent companion to [`SessionGraph`].
7//!
8//! `SessionGraph` is in-memory; `SessionLog` is the durable audit
9//! store backing it. Concrete impls live elsewhere:
10//! - [`InMemorySessionLog`] (this crate) — in-process default for tests
11//!   and single-binary deployments
12//! - `entelix_persistence::postgres::PostgresSessionLog` — Postgres
13//! - `entelix_persistence::redis::RedisSessionLog` — Redis
14//!
15//! All implementations share these guarantees:
16//!
17//! - **Append-only**: appended events never mutate, never reorder
18//!   (invariant 1).
19//! - **Per-thread monotonic ordinal**: `append` returns the next
20//!   integer; `load_since(cursor)` returns events with ordinal >
21//!   `cursor`.
22//! - **Tenant-scoped**: every method takes `tenant_id` (invariant 11)
23//!   so cross-tenant reads / writes are structurally impossible.
24//! - **Archival watermark monotonic**: once `archive_before(w)` is
25//!   called, `w` only ever moves forward.
26
27use std::collections::HashMap;
28
29use async_trait::async_trait;
30use entelix_core::{Result, ThreadKey};
31use parking_lot::Mutex;
32
33use crate::event::GraphEvent;
34
35/// Persistent durable session-event log.
36///
37/// `SessionGraph::events` is the in-memory shape; this trait is the
38/// durable companion that survives process restarts. A `SessionGraph`
39/// can be hydrated from a `SessionLog` by replaying every event the
40/// log returns for `key`.
41///
42/// Every method is keyed by [`ThreadKey`], the canonical
43/// `(tenant_id, thread_id)` tuple — a backend cannot accidentally
44/// drop the tenant scope because the tenant component is
45/// syntactically required by the type signature. Same isolation
46/// pattern that `entelix_graph::Checkpointer` uses (Invariant 11 /
47/// F2).
48#[async_trait]
49pub trait SessionLog: Send + Sync + 'static {
50    /// Append `events` for `key`. Returns the highest ordinal
51    /// assigned (1-based, so an empty log becomes ordinal 1 after
52    /// appending the first event).
53    async fn append(&self, key: &ThreadKey, events: &[GraphEvent]) -> Result<u64>;
54
55    /// Load every event with ordinal `> cursor`. Pass `0` for "from
56    /// the beginning". Returns events in ordinal-ascending order.
57    async fn load_since(&self, key: &ThreadKey, cursor: u64) -> Result<Vec<GraphEvent>>;
58
59    /// Advance the archival watermark to `watermark`. Events with
60    /// ordinal `<= watermark` may be moved to cold storage at the
61    /// implementation's discretion. Returns the number of events
62    /// archived. The watermark is monotonic per `key`; calls with
63    /// a value `<=` the current watermark are no-ops.
64    async fn archive_before(&self, key: &ThreadKey, watermark: u64) -> Result<usize>;
65}
66
67/// In-process [`SessionLog`] for single-binary deployments and tests.
68///
69/// Backed by a per-[`ThreadKey`] `Vec<GraphEvent>`. Production
70/// multi-pod deployments use the Postgres or Redis impls in
71/// `entelix-persistence`.
72#[derive(Default)]
73pub struct InMemorySessionLog {
74    inner: Mutex<HashMap<ThreadKey, ThreadLog>>,
75}
76
77#[derive(Default)]
78struct ThreadLog {
79    events: Vec<GraphEvent>,
80    archival_watermark: u64,
81}
82
83impl InMemorySessionLog {
84    /// Build an empty in-memory log.
85    pub fn new() -> Self {
86        Self::default()
87    }
88}
89
90#[async_trait]
91impl SessionLog for InMemorySessionLog {
92    async fn append(&self, key: &ThreadKey, events: &[GraphEvent]) -> Result<u64> {
93        let len = {
94            let mut guard = self.inner.lock();
95            let log = guard.entry(key.clone()).or_default();
96            log.events.extend(events.iter().cloned());
97            log.events.len()
98        };
99        Ok(len as u64)
100    }
101
102    async fn load_since(&self, key: &ThreadKey, cursor: u64) -> Result<Vec<GraphEvent>> {
103        let snapshot = {
104            let guard = self.inner.lock();
105            let Some(log) = guard.get(key) else {
106                return Ok(Vec::new());
107            };
108            // Honor archival semantics: events with ordinal
109            // `<= archival_watermark` are conceptually gone, even
110            // though this in-memory impl retains them in the Vec
111            // for replay tooling. Postgres `archive_before` DELETEs
112            // those rows, and Redis `LTRIM`s them, so both backends
113            // skip ordinals at or below the watermark regardless of
114            // the cursor. Clamping `start` here keeps the in-memory
115            // backend's observable behaviour consistent with the
116            // durable backends — otherwise a caller archiving and
117            // then reading from cursor `0` would see archived
118            // events on the in-memory backend but not on Postgres
119            // or Redis, silently breaking cross-backend tests.
120            let effective_start = cursor.max(log.archival_watermark);
121            let start = usize::try_from(effective_start).unwrap_or(usize::MAX);
122            log.events.get(start..).unwrap_or(&[]).to_vec()
123        };
124        Ok(snapshot)
125    }
126
127    async fn archive_before(&self, key: &ThreadKey, watermark: u64) -> Result<usize> {
128        let archived = {
129            let mut guard = self.inner.lock();
130            let Some(log) = guard.get_mut(key) else {
131                return Ok(0);
132            };
133            if watermark <= log.archival_watermark {
134                return Ok(0);
135            }
136            let prior = log.archival_watermark;
137            log.archival_watermark = watermark;
138            watermark.saturating_sub(prior)
139        };
140        Ok(usize::try_from(archived).unwrap_or(usize::MAX))
141    }
142}