1#![allow(clippy::significant_drop_tightening)]
25
26use std::path::Path;
27use std::sync::Arc;
28
29use rusqlite::{params, Connection, OptionalExtension};
30use tokio::sync::Mutex;
31use tokio::task;
32
33use arcp_core::envelope::{Envelope, RawEnvelope};
34use arcp_core::error::ARCPError;
35
36const SCHEMA: &str = include_str!("schema.sql");
37
38#[derive(Clone)]
43pub struct EventLog {
44 inner: Arc<Mutex<Connection>>,
45}
46
47impl std::fmt::Debug for EventLog {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("EventLog").finish_non_exhaustive()
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum AppendOutcome {
56 Inserted,
58 Duplicate,
61}
62
63#[derive(Debug, Clone)]
65pub struct LoggedEvent {
66 pub rowid: i64,
68 pub envelope: RawEnvelope,
70}
71
72impl EventLog {
73 pub async fn in_memory() -> Result<Self, ARCPError> {
80 task::spawn_blocking(move || {
81 let conn = Connection::open_in_memory()?;
82 conn.execute_batch(SCHEMA)?;
83 Ok::<_, rusqlite::Error>(Self {
84 inner: Arc::new(Mutex::new(conn)),
85 })
86 })
87 .await
88 .map_err(|join| ARCPError::Internal {
89 detail: format!("event log spawn_blocking join: {join}"),
90 })?
91 .map_err(|e| ARCPError::Storage {
92 detail: e.to_string(),
93 })
94 }
95
96 pub async fn open(path: impl AsRef<Path>) -> Result<Self, ARCPError> {
103 let path = path.as_ref().to_path_buf();
104 task::spawn_blocking(move || {
105 let conn = Connection::open(&path)?;
106 conn.pragma_update(None, "journal_mode", "WAL")?;
107 conn.pragma_update(None, "synchronous", "NORMAL")?;
108 conn.execute_batch(SCHEMA)?;
109 Ok::<_, rusqlite::Error>(Self {
110 inner: Arc::new(Mutex::new(conn)),
111 })
112 })
113 .await
114 .map_err(|join| ARCPError::Internal {
115 detail: format!("event log spawn_blocking join: {join}"),
116 })?
117 .map_err(|e| ARCPError::Storage {
118 detail: e.to_string(),
119 })
120 }
121
122 pub async fn append(&self, envelope: &Envelope) -> Result<AppendOutcome, ARCPError> {
129 let raw = envelope.clone().into_raw()?;
130 let body = serde_json::to_string(&raw.payload)?;
131 let inner = Arc::clone(&self.inner);
132 task::spawn_blocking(move || {
133 let session_id_str = raw.session_id.as_ref().map(ToString::to_string);
134 let job_id_str = raw.job_id.as_ref().map(ToString::to_string);
135 let stream_id_str = raw.stream_id.as_ref().map(ToString::to_string);
136 let subscription_id_str = raw.subscription_id.as_ref().map(ToString::to_string);
137 let correlation_id_str = raw.correlation_id.as_ref().map(ToString::to_string);
138 let causation_id_str = raw.causation_id.as_ref().map(ToString::to_string);
139 let trace_id_str = raw.trace_id.as_ref().map(ToString::to_string);
140 let span_id_str = raw.span_id.as_ref().map(ToString::to_string);
141 let idempotency_key_str = raw.idempotency_key.as_ref().map(ToString::to_string);
142 let timestamp_str = raw.timestamp.to_rfc3339();
143 let priority_str = priority_str(raw.priority);
144
145 let conn = inner.blocking_lock();
146 let changed = conn.execute(
147 "INSERT OR IGNORE INTO events (
148 id, session_id, job_id, stream_id, subscription_id,
149 type_name, correlation_id, causation_id,
150 trace_id, span_id, idempotency_key,
151 timestamp_utc, priority, body
152 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
153 params![
154 raw.id.to_string(),
155 session_id_str,
156 job_id_str,
157 stream_id_str,
158 subscription_id_str,
159 raw.type_name,
160 correlation_id_str,
161 causation_id_str,
162 trace_id_str,
163 span_id_str,
164 idempotency_key_str,
165 timestamp_str,
166 priority_str,
167 body,
168 ],
169 )?;
170 Ok::<_, rusqlite::Error>(if changed == 1 {
171 AppendOutcome::Inserted
172 } else {
173 AppendOutcome::Duplicate
174 })
175 })
176 .await
177 .map_err(|join| ARCPError::Internal {
178 detail: format!("event log spawn_blocking join: {join}"),
179 })?
180 .map_err(|e| ARCPError::Storage {
181 detail: e.to_string(),
182 })
183 }
184
185 pub async fn list(
194 &self,
195 session_id: &str,
196 after_rowid: i64,
197 limit: i64,
198 ) -> Result<Vec<LoggedEvent>, ARCPError> {
199 let inner = Arc::clone(&self.inner);
200 let session_id = session_id.to_owned();
201 task::spawn_blocking(move || -> Result<Vec<LoggedEvent>, rusqlite::Error> {
202 let conn = inner.blocking_lock();
203 let mut stmt = conn.prepare(
204 "SELECT rowid, id, session_id, job_id, stream_id, subscription_id,
205 type_name, correlation_id, causation_id,
206 trace_id, span_id, idempotency_key,
207 timestamp_utc, priority, body
208 FROM events
209 WHERE session_id = ?1 AND rowid > ?2
210 ORDER BY rowid ASC
211 LIMIT ?3",
212 )?;
213 let rows = stmt.query_map(params![session_id, after_rowid, limit], row_to_logged)?;
214 let mut out = Vec::new();
215 for row in rows {
216 out.push(row?);
217 }
218 Ok(out)
219 })
220 .await
221 .map_err(|join| ARCPError::Internal {
222 detail: format!("event log spawn_blocking join: {join}"),
223 })?
224 .map_err(|e| ARCPError::Storage {
225 detail: e.to_string(),
226 })
227 }
228
229 pub async fn get_by_id(&self, id: &str) -> Result<Option<LoggedEvent>, ARCPError> {
235 let inner = Arc::clone(&self.inner);
236 let id = id.to_owned();
237 task::spawn_blocking(move || -> Result<Option<LoggedEvent>, rusqlite::Error> {
238 let conn = inner.blocking_lock();
239 let result = conn
240 .query_row(
241 "SELECT rowid, id, session_id, job_id, stream_id, subscription_id,
242 type_name, correlation_id, causation_id,
243 trace_id, span_id, idempotency_key,
244 timestamp_utc, priority, body
245 FROM events WHERE id = ?1",
246 params![id],
247 row_to_logged,
248 )
249 .optional()?;
250 Ok(result)
251 })
252 .await
253 .map_err(|join| ARCPError::Internal {
254 detail: format!("event log spawn_blocking join: {join}"),
255 })?
256 .map_err(|e| ARCPError::Storage {
257 detail: e.to_string(),
258 })
259 }
260
261 pub async fn count(&self) -> Result<i64, ARCPError> {
267 let inner = Arc::clone(&self.inner);
268 task::spawn_blocking(move || -> Result<i64, rusqlite::Error> {
269 let conn = inner.blocking_lock();
270 let n: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
271 Ok(n)
272 })
273 .await
274 .map_err(|join| ARCPError::Internal {
275 detail: format!("event log spawn_blocking join: {join}"),
276 })?
277 .map_err(|e| ARCPError::Storage {
278 detail: e.to_string(),
279 })
280 }
281}
282
283const fn priority_str(p: arcp_core::envelope::Priority) -> &'static str {
284 match p {
285 arcp_core::envelope::Priority::Low => "low",
286 arcp_core::envelope::Priority::Normal => "normal",
287 arcp_core::envelope::Priority::High => "high",
288 arcp_core::envelope::Priority::Critical => "critical",
289 }
290}
291
292fn row_to_logged(row: &rusqlite::Row<'_>) -> rusqlite::Result<LoggedEvent> {
293 let rowid: i64 = row.get("rowid")?;
294 let id: String = row.get("id")?;
295 let session_id: Option<String> = row.get("session_id")?;
296 let job_id: Option<String> = row.get("job_id")?;
297 let stream_id: Option<String> = row.get("stream_id")?;
298 let subscription_id: Option<String> = row.get("subscription_id")?;
299 let type_name: String = row.get("type_name")?;
300 let correlation_id: Option<String> = row.get("correlation_id")?;
301 let causation_id: Option<String> = row.get("causation_id")?;
302 let trace_id: Option<String> = row.get("trace_id")?;
303 let span_id: Option<String> = row.get("span_id")?;
304 let idempotency_key: Option<String> = row.get("idempotency_key")?;
305 let timestamp_utc: String = row.get("timestamp_utc")?;
306 let priority: String = row.get("priority")?;
307 let body: String = row.get("body")?;
308
309 let mut value = serde_json::Map::new();
312 value.insert(
313 "arcp".into(),
314 serde_json::Value::String(arcp_core::PROTOCOL_VERSION.into()),
315 );
316 value.insert("id".into(), serde_json::Value::String(id));
317 value.insert("timestamp".into(), serde_json::Value::String(timestamp_utc));
318 value.insert("type".into(), serde_json::Value::String(type_name));
319 let payload: serde_json::Value = serde_json::from_str(&body).map_err(|e| {
320 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
321 })?;
322 value.insert("payload".into(), payload);
323 insert_opt(&mut value, "session_id", session_id);
324 insert_opt(&mut value, "job_id", job_id);
325 insert_opt(&mut value, "stream_id", stream_id);
326 insert_opt(&mut value, "subscription_id", subscription_id);
327 insert_opt(&mut value, "correlation_id", correlation_id);
328 insert_opt(&mut value, "causation_id", causation_id);
329 insert_opt(&mut value, "trace_id", trace_id);
330 insert_opt(&mut value, "span_id", span_id);
331 insert_opt(&mut value, "idempotency_key", idempotency_key);
332 if priority != "normal" {
333 value.insert("priority".into(), serde_json::Value::String(priority));
334 }
335
336 let envelope: RawEnvelope =
337 serde_json::from_value(serde_json::Value::Object(value)).map_err(|e| {
338 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
339 })?;
340
341 Ok(LoggedEvent { rowid, envelope })
342}
343
344fn insert_opt(
345 map: &mut serde_json::Map<String, serde_json::Value>,
346 key: &str,
347 value: Option<String>,
348) {
349 if let Some(v) = value {
350 map.insert(key.to_owned(), serde_json::Value::String(v));
351 }
352}
353
354#[cfg(test)]
355#[allow(
356 clippy::expect_used,
357 clippy::unwrap_used,
358 clippy::panic,
359 clippy::missing_panics_doc
360)]
361mod tests {
362 use super::*;
363 use arcp_core::envelope::Envelope;
364 use arcp_core::ids::SessionId;
365 use arcp_core::messages::{MessageType, PingPayload};
366
367 fn ping_envelope(session: &SessionId) -> Envelope {
368 let mut env = Envelope::new(MessageType::Ping(PingPayload::default()));
369 env.session_id = Some(session.clone());
370 env
371 }
372
373 #[tokio::test]
374 async fn append_and_list_round_trip() {
375 let log = EventLog::in_memory().await.expect("open");
376 let session = SessionId::new();
377 for _ in 0..3 {
378 let env = ping_envelope(&session);
379 assert_eq!(
380 log.append(&env).await.expect("append"),
381 AppendOutcome::Inserted
382 );
383 }
384 let rows = log.list(session.as_str(), 0, 10).await.expect("list");
385 assert_eq!(rows.len(), 3);
386 for w in rows.windows(2) {
387 assert!(w[0].rowid < w[1].rowid, "rows out of order");
388 }
389 }
390
391 #[tokio::test]
392 async fn append_dedups_on_id() {
393 let log = EventLog::in_memory().await.expect("open");
394 let session = SessionId::new();
395 let env = ping_envelope(&session);
396 assert_eq!(
397 log.append(&env).await.expect("first"),
398 AppendOutcome::Inserted
399 );
400 assert_eq!(
401 log.append(&env).await.expect("second"),
402 AppendOutcome::Duplicate
403 );
404 assert_eq!(log.count().await.expect("count"), 1);
405 }
406
407 #[tokio::test]
408 async fn list_respects_after_rowid_and_session_filter() {
409 let log = EventLog::in_memory().await.expect("open");
410 let session_a = SessionId::new();
411 let session_b = SessionId::new();
412 for _ in 0..2 {
413 log.append(&ping_envelope(&session_a)).await.expect("a");
414 log.append(&ping_envelope(&session_b)).await.expect("b");
415 }
416 let only_a = log.list(session_a.as_str(), 0, 100).await.expect("a only");
417 assert_eq!(only_a.len(), 2);
418 let after_first = log
419 .list(session_a.as_str(), only_a[0].rowid, 100)
420 .await
421 .expect("after first");
422 assert_eq!(after_first.len(), 1);
423 assert_eq!(after_first[0].rowid, only_a[1].rowid);
424 }
425
426 #[tokio::test]
427 async fn get_by_id_returns_stored_envelope() {
428 let log = EventLog::in_memory().await.expect("open");
429 let session = SessionId::new();
430 let env = ping_envelope(&session);
431 let original_id = env.id.clone();
432 log.append(&env).await.expect("append");
433 let got = log.get_by_id(original_id.as_str()).await.expect("get");
434 let logged = got.expect("found");
435 assert_eq!(logged.envelope.id, original_id);
436 assert_eq!(logged.envelope.type_name, "ping");
437 }
438
439 #[tokio::test]
440 async fn get_by_id_returns_none_for_unknown() {
441 let log = EventLog::in_memory().await.expect("open");
442 let got = log.get_by_id("msg_nonexistent01ABC").await.expect("get");
443 assert!(got.is_none());
444 }
445
446 #[tokio::test]
447 async fn open_creates_file_backed_log() {
448 let dir = tempfile::tempdir().expect("tempdir");
449 let path = dir.path().join("log.sqlite");
450 let log = EventLog::open(&path).await.expect("open");
451 let session = SessionId::new();
452 log.append(&ping_envelope(&session)).await.expect("append");
453 drop(log);
455 let log2 = EventLog::open(&path).await.expect("re-open");
456 assert_eq!(log2.count().await.expect("count"), 1);
457 }
458}