Skip to main content

arcp_runtime/store/
eventlog.rs

1//! Append-only `SQLite` event log.
2//!
3//! The reader paths (`list`, `get_by_id`, `count`) hold the connection
4//! mutex for the duration of the SQL call inside `spawn_blocking`.
5//! Clippy's `significant_drop_tightening` lint asks us to release the
6//! mutex earlier, but the SQL call IS the only thing the closure does
7//! and there's no concurrent contention path that benefits — every
8//! caller goes through the same `spawn_blocking`. Suppressed
9//! module-wide with rationale.
10//!
11//! Three operations matter:
12//!
13//! - [`EventLog::append`] inserts an envelope row and returns whether it
14//!   was a new insert. A repeat insert with the same `(session_id, id)` is
15//!   silently absorbed (idempotency, RFC §6.4).
16//! - [`EventLog::list`] enumerates rows by `(session_id, after_rowid)` for
17//!   subscription backfill (§13.3) and resume (§19).
18//! - [`EventLog::get_by_id`] fetches a single row by message id.
19//!
20//! The synchronous `rusqlite` calls run inside `tokio::task::spawn_blocking`
21//! behind an async facade so the event log can be used from inside the
22//! `tokio` reactor without blocking it.
23
24#![allow(clippy::significant_drop_tightening)]
25
26use std::path::Path;
27use std::sync::Arc;
28
29use rusqlite::{params, Connection, OptionalExtension};
30use tokio::sync::Mutex;
31use tokio::task;
32
33use arcp_core::envelope::{Envelope, RawEnvelope};
34use arcp_core::error::ARCPError;
35
36const SCHEMA: &str = include_str!("schema.sql");
37
38/// Append-only `SQLite` event log.
39///
40/// Cheap to clone; internally holds an `Arc<Mutex<Connection>>` so that
41/// concurrent calls serialise on the underlying connection.
42#[derive(Clone)]
43pub struct EventLog {
44    inner: Arc<Mutex<Connection>>,
45}
46
47impl std::fmt::Debug for EventLog {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("EventLog").finish_non_exhaustive()
50    }
51}
52
53/// Result of an [`EventLog::append`] call.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum AppendOutcome {
56    /// Row was inserted.
57    Inserted,
58    /// A row with the same `(session_id, id)` already existed; the insert
59    /// was a no-op (transport idempotency, RFC §6.4).
60    Duplicate,
61}
62
63/// One persisted log row, returned from queries.
64#[derive(Debug, Clone)]
65pub struct LoggedEvent {
66    /// Auto-incrementing primary key; gives total replay order.
67    pub rowid: i64,
68    /// The envelope as stored.
69    pub envelope: RawEnvelope,
70}
71
72impl EventLog {
73    /// Open an in-memory event log. Convenient for tests.
74    ///
75    /// # Errors
76    ///
77    /// Returns [`ARCPError::Storage`] if `SQLite` cannot create the in-memory
78    /// database or apply the schema.
79    pub async fn in_memory() -> Result<Self, ARCPError> {
80        task::spawn_blocking(move || {
81            let conn = Connection::open_in_memory()?;
82            conn.execute_batch(SCHEMA)?;
83            Ok::<_, rusqlite::Error>(Self {
84                inner: Arc::new(Mutex::new(conn)),
85            })
86        })
87        .await
88        .map_err(|join| ARCPError::Internal {
89            detail: format!("event log spawn_blocking join: {join}"),
90        })?
91        .map_err(|e| ARCPError::Storage {
92            detail: e.to_string(),
93        })
94    }
95
96    /// Open (or create) an event log backed by `path`.
97    ///
98    /// # Errors
99    ///
100    /// Returns [`ARCPError::Storage`] if `SQLite` cannot open or create the
101    /// file or apply the schema.
102    pub async fn open(path: impl AsRef<Path>) -> Result<Self, ARCPError> {
103        let path = path.as_ref().to_path_buf();
104        task::spawn_blocking(move || {
105            let conn = Connection::open(&path)?;
106            conn.pragma_update(None, "journal_mode", "WAL")?;
107            conn.pragma_update(None, "synchronous", "NORMAL")?;
108            conn.execute_batch(SCHEMA)?;
109            Ok::<_, rusqlite::Error>(Self {
110                inner: Arc::new(Mutex::new(conn)),
111            })
112        })
113        .await
114        .map_err(|join| ARCPError::Internal {
115            detail: format!("event log spawn_blocking join: {join}"),
116        })?
117        .map_err(|e| ARCPError::Storage {
118            detail: e.to_string(),
119        })
120    }
121
122    /// Append one envelope to the log. Idempotent on `(session_id, id)`.
123    ///
124    /// # Errors
125    ///
126    /// Returns [`ARCPError::Serialization`] if the envelope cannot be
127    /// serialised, [`ARCPError::Storage`] for any underlying `SQLite` error.
128    pub async fn append(&self, envelope: &Envelope) -> Result<AppendOutcome, ARCPError> {
129        let raw = envelope.clone().into_raw()?;
130        let body = serde_json::to_string(&raw.payload)?;
131        let inner = Arc::clone(&self.inner);
132        task::spawn_blocking(move || {
133            let session_id_str = raw.session_id.as_ref().map(ToString::to_string);
134            let job_id_str = raw.job_id.as_ref().map(ToString::to_string);
135            let stream_id_str = raw.stream_id.as_ref().map(ToString::to_string);
136            let subscription_id_str = raw.subscription_id.as_ref().map(ToString::to_string);
137            let correlation_id_str = raw.correlation_id.as_ref().map(ToString::to_string);
138            let causation_id_str = raw.causation_id.as_ref().map(ToString::to_string);
139            let trace_id_str = raw.trace_id.as_ref().map(ToString::to_string);
140            let span_id_str = raw.span_id.as_ref().map(ToString::to_string);
141            let idempotency_key_str = raw.idempotency_key.as_ref().map(ToString::to_string);
142            let timestamp_str = raw.timestamp.to_rfc3339();
143            let priority_str = priority_str(raw.priority);
144
145            let conn = inner.blocking_lock();
146            let changed = conn.execute(
147                "INSERT OR IGNORE INTO events (
148                    id, session_id, job_id, stream_id, subscription_id,
149                    type_name, correlation_id, causation_id,
150                    trace_id, span_id, idempotency_key,
151                    timestamp_utc, priority, body
152                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
153                params![
154                    raw.id.to_string(),
155                    session_id_str,
156                    job_id_str,
157                    stream_id_str,
158                    subscription_id_str,
159                    raw.type_name,
160                    correlation_id_str,
161                    causation_id_str,
162                    trace_id_str,
163                    span_id_str,
164                    idempotency_key_str,
165                    timestamp_str,
166                    priority_str,
167                    body,
168                ],
169            )?;
170            Ok::<_, rusqlite::Error>(if changed == 1 {
171                AppendOutcome::Inserted
172            } else {
173                AppendOutcome::Duplicate
174            })
175        })
176        .await
177        .map_err(|join| ARCPError::Internal {
178            detail: format!("event log spawn_blocking join: {join}"),
179        })?
180        .map_err(|e| ARCPError::Storage {
181            detail: e.to_string(),
182        })
183    }
184
185    /// List rows for `session_id` strictly after `after_rowid`, in replay
186    /// order, capped at `limit` rows.
187    ///
188    /// Pass `after_rowid = 0` to start from the beginning.
189    ///
190    /// # Errors
191    ///
192    /// Returns [`ARCPError::Storage`] for any underlying `SQLite` error.
193    pub async fn list(
194        &self,
195        session_id: &str,
196        after_rowid: i64,
197        limit: i64,
198    ) -> Result<Vec<LoggedEvent>, ARCPError> {
199        let inner = Arc::clone(&self.inner);
200        let session_id = session_id.to_owned();
201        task::spawn_blocking(move || -> Result<Vec<LoggedEvent>, rusqlite::Error> {
202            let conn = inner.blocking_lock();
203            let mut stmt = conn.prepare(
204                "SELECT rowid, id, session_id, job_id, stream_id, subscription_id,
205                    type_name, correlation_id, causation_id,
206                    trace_id, span_id, idempotency_key,
207                    timestamp_utc, priority, body
208                 FROM events
209                 WHERE session_id = ?1 AND rowid > ?2
210                 ORDER BY rowid ASC
211                 LIMIT ?3",
212            )?;
213            let rows = stmt.query_map(params![session_id, after_rowid, limit], row_to_logged)?;
214            let mut out = Vec::new();
215            for row in rows {
216                out.push(row?);
217            }
218            Ok(out)
219        })
220        .await
221        .map_err(|join| ARCPError::Internal {
222            detail: format!("event log spawn_blocking join: {join}"),
223        })?
224        .map_err(|e| ARCPError::Storage {
225            detail: e.to_string(),
226        })
227    }
228
229    /// Fetch a single row by message id.
230    ///
231    /// # Errors
232    ///
233    /// Returns [`ARCPError::Storage`] for any underlying `SQLite` error.
234    pub async fn get_by_id(&self, id: &str) -> Result<Option<LoggedEvent>, ARCPError> {
235        let inner = Arc::clone(&self.inner);
236        let id = id.to_owned();
237        task::spawn_blocking(move || -> Result<Option<LoggedEvent>, rusqlite::Error> {
238            let conn = inner.blocking_lock();
239            let result = conn
240                .query_row(
241                    "SELECT rowid, id, session_id, job_id, stream_id, subscription_id,
242                        type_name, correlation_id, causation_id,
243                        trace_id, span_id, idempotency_key,
244                        timestamp_utc, priority, body
245                     FROM events WHERE id = ?1",
246                    params![id],
247                    row_to_logged,
248                )
249                .optional()?;
250            Ok(result)
251        })
252        .await
253        .map_err(|join| ARCPError::Internal {
254            detail: format!("event log spawn_blocking join: {join}"),
255        })?
256        .map_err(|e| ARCPError::Storage {
257            detail: e.to_string(),
258        })
259    }
260
261    /// Total event count (across all sessions). Useful for tests.
262    ///
263    /// # Errors
264    ///
265    /// Returns [`ARCPError::Storage`] for any underlying `SQLite` error.
266    pub async fn count(&self) -> Result<i64, ARCPError> {
267        let inner = Arc::clone(&self.inner);
268        task::spawn_blocking(move || -> Result<i64, rusqlite::Error> {
269            let conn = inner.blocking_lock();
270            let n: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
271            Ok(n)
272        })
273        .await
274        .map_err(|join| ARCPError::Internal {
275            detail: format!("event log spawn_blocking join: {join}"),
276        })?
277        .map_err(|e| ARCPError::Storage {
278            detail: e.to_string(),
279        })
280    }
281}
282
283const fn priority_str(p: arcp_core::envelope::Priority) -> &'static str {
284    match p {
285        arcp_core::envelope::Priority::Low => "low",
286        arcp_core::envelope::Priority::Normal => "normal",
287        arcp_core::envelope::Priority::High => "high",
288        arcp_core::envelope::Priority::Critical => "critical",
289    }
290}
291
292fn row_to_logged(row: &rusqlite::Row<'_>) -> rusqlite::Result<LoggedEvent> {
293    let rowid: i64 = row.get("rowid")?;
294    let id: String = row.get("id")?;
295    let session_id: Option<String> = row.get("session_id")?;
296    let job_id: Option<String> = row.get("job_id")?;
297    let stream_id: Option<String> = row.get("stream_id")?;
298    let subscription_id: Option<String> = row.get("subscription_id")?;
299    let type_name: String = row.get("type_name")?;
300    let correlation_id: Option<String> = row.get("correlation_id")?;
301    let causation_id: Option<String> = row.get("causation_id")?;
302    let trace_id: Option<String> = row.get("trace_id")?;
303    let span_id: Option<String> = row.get("span_id")?;
304    let idempotency_key: Option<String> = row.get("idempotency_key")?;
305    let timestamp_utc: String = row.get("timestamp_utc")?;
306    let priority: String = row.get("priority")?;
307    let body: String = row.get("body")?;
308
309    // We assemble a JSON Value of the raw envelope, then deserialise.
310    // This keeps the "raw" representation honest and centralises parsing.
311    let mut value = serde_json::Map::new();
312    value.insert(
313        "arcp".into(),
314        serde_json::Value::String(arcp_core::PROTOCOL_VERSION.into()),
315    );
316    value.insert("id".into(), serde_json::Value::String(id));
317    value.insert("timestamp".into(), serde_json::Value::String(timestamp_utc));
318    value.insert("type".into(), serde_json::Value::String(type_name));
319    let payload: serde_json::Value = serde_json::from_str(&body).map_err(|e| {
320        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
321    })?;
322    value.insert("payload".into(), payload);
323    insert_opt(&mut value, "session_id", session_id);
324    insert_opt(&mut value, "job_id", job_id);
325    insert_opt(&mut value, "stream_id", stream_id);
326    insert_opt(&mut value, "subscription_id", subscription_id);
327    insert_opt(&mut value, "correlation_id", correlation_id);
328    insert_opt(&mut value, "causation_id", causation_id);
329    insert_opt(&mut value, "trace_id", trace_id);
330    insert_opt(&mut value, "span_id", span_id);
331    insert_opt(&mut value, "idempotency_key", idempotency_key);
332    if priority != "normal" {
333        value.insert("priority".into(), serde_json::Value::String(priority));
334    }
335
336    let envelope: RawEnvelope =
337        serde_json::from_value(serde_json::Value::Object(value)).map_err(|e| {
338            rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
339        })?;
340
341    Ok(LoggedEvent { rowid, envelope })
342}
343
344fn insert_opt(
345    map: &mut serde_json::Map<String, serde_json::Value>,
346    key: &str,
347    value: Option<String>,
348) {
349    if let Some(v) = value {
350        map.insert(key.to_owned(), serde_json::Value::String(v));
351    }
352}
353
354#[cfg(test)]
355#[allow(
356    clippy::expect_used,
357    clippy::unwrap_used,
358    clippy::panic,
359    clippy::missing_panics_doc
360)]
361mod tests {
362    use super::*;
363    use arcp_core::envelope::Envelope;
364    use arcp_core::ids::SessionId;
365    use arcp_core::messages::{MessageType, PingPayload};
366
367    fn ping_envelope(session: &SessionId) -> Envelope {
368        let mut env = Envelope::new(MessageType::Ping(PingPayload::default()));
369        env.session_id = Some(session.clone());
370        env
371    }
372
373    #[tokio::test]
374    async fn append_and_list_round_trip() {
375        let log = EventLog::in_memory().await.expect("open");
376        let session = SessionId::new();
377        for _ in 0..3 {
378            let env = ping_envelope(&session);
379            assert_eq!(
380                log.append(&env).await.expect("append"),
381                AppendOutcome::Inserted
382            );
383        }
384        let rows = log.list(session.as_str(), 0, 10).await.expect("list");
385        assert_eq!(rows.len(), 3);
386        for w in rows.windows(2) {
387            assert!(w[0].rowid < w[1].rowid, "rows out of order");
388        }
389    }
390
391    #[tokio::test]
392    async fn append_dedups_on_id() {
393        let log = EventLog::in_memory().await.expect("open");
394        let session = SessionId::new();
395        let env = ping_envelope(&session);
396        assert_eq!(
397            log.append(&env).await.expect("first"),
398            AppendOutcome::Inserted
399        );
400        assert_eq!(
401            log.append(&env).await.expect("second"),
402            AppendOutcome::Duplicate
403        );
404        assert_eq!(log.count().await.expect("count"), 1);
405    }
406
407    #[tokio::test]
408    async fn list_respects_after_rowid_and_session_filter() {
409        let log = EventLog::in_memory().await.expect("open");
410        let session_a = SessionId::new();
411        let session_b = SessionId::new();
412        for _ in 0..2 {
413            log.append(&ping_envelope(&session_a)).await.expect("a");
414            log.append(&ping_envelope(&session_b)).await.expect("b");
415        }
416        let only_a = log.list(session_a.as_str(), 0, 100).await.expect("a only");
417        assert_eq!(only_a.len(), 2);
418        let after_first = log
419            .list(session_a.as_str(), only_a[0].rowid, 100)
420            .await
421            .expect("after first");
422        assert_eq!(after_first.len(), 1);
423        assert_eq!(after_first[0].rowid, only_a[1].rowid);
424    }
425
426    #[tokio::test]
427    async fn get_by_id_returns_stored_envelope() {
428        let log = EventLog::in_memory().await.expect("open");
429        let session = SessionId::new();
430        let env = ping_envelope(&session);
431        let original_id = env.id.clone();
432        log.append(&env).await.expect("append");
433        let got = log.get_by_id(original_id.as_str()).await.expect("get");
434        let logged = got.expect("found");
435        assert_eq!(logged.envelope.id, original_id);
436        assert_eq!(logged.envelope.type_name, "ping");
437    }
438
439    #[tokio::test]
440    async fn get_by_id_returns_none_for_unknown() {
441        let log = EventLog::in_memory().await.expect("open");
442        let got = log.get_by_id("msg_nonexistent01ABC").await.expect("get");
443        assert!(got.is_none());
444    }
445
446    #[tokio::test]
447    async fn open_creates_file_backed_log() {
448        let dir = tempfile::tempdir().expect("tempdir");
449        let path = dir.path().join("log.sqlite");
450        let log = EventLog::open(&path).await.expect("open");
451        let session = SessionId::new();
452        log.append(&ping_envelope(&session)).await.expect("append");
453        // Re-open the file and verify the row survives.
454        drop(log);
455        let log2 = EventLog::open(&path).await.expect("re-open");
456        assert_eq!(log2.count().await.expect("count"), 1);
457    }
458}