Skip to main content

camel_core/lifecycle/adapters/
redb_journal.rs

1//! Redb-backed runtime event journal.
2//!
3//! # Schema
4//!
5//! - Table `events`: `u64 → &[u8]`  (seq → serde_json bytes of `JournalEntry`)
6//! - Table `command_ids`: `&str → ()`  (presence = alive)
7//!
8//! # Sequence numbers
9//!
10//! No autoincrement in redb. Each `append_batch` derives `next_seq` by reading
11//! the last key via `iter().next_back()` and adding 1; defaults to 0 if empty.
12
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{
18    Database, Durability, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition,
19};
20use serde::{Deserialize, Serialize};
21
22use camel_api::CamelError;
23
24use crate::lifecycle::domain::{DomainError, RuntimeEvent};
25use crate::lifecycle::ports::RuntimeEventJournalPort;
26
27// ── Table definitions ─────────────────────────────────────────────────────────
28
29const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
30const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
31
32// ── Public types ──────────────────────────────────────────────────────────────
33
34/// Durability mode for journal writes.
35#[derive(Debug, Clone, PartialEq, Default)]
36pub enum JournalDurability {
37    /// fsync on every commit — protects against kernel crash and power loss (default).
38    #[default]
39    Immediate,
40    /// No fsync — OS decides flush timing. Suitable for dev/test.
41    Eventual,
42}
43
44/// Options for `RedbRuntimeEventJournal`.
45#[derive(Debug, Clone)]
46pub struct RedbJournalOptions {
47    pub durability: JournalDurability,
48    /// Trigger compaction after this many events in the table. Default: 10_000.
49    pub compaction_threshold_events: u64,
50}
51
52impl Default for RedbJournalOptions {
53    fn default() -> Self {
54        Self {
55            durability: JournalDurability::Immediate,
56            compaction_threshold_events: 10_000,
57        }
58    }
59}
60
61/// Internal wire format stored as redb value bytes (serde_json).
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JournalEntry {
64    pub seq: u64,
65    pub timestamp_ms: i64,
66    pub event: RuntimeEvent,
67}
68
69/// Filter for `RedbRuntimeEventJournal::inspect`.
70pub struct JournalInspectFilter {
71    pub route_id: Option<String>,
72    pub limit: usize,
73}
74
75// ── Adapter ───────────────────────────────────────────────────────────────────
76
77/// Redb-backed implementation of `RuntimeEventJournalPort`.
78///
79/// `Arc<Database>` allows cheap cloning — all clones share the same underlying
80/// redb file handle. `redb::Database` is `Send + Sync`.
81#[derive(Clone)]
82pub struct RedbRuntimeEventJournal {
83    db: Arc<Database>,
84    options: RedbJournalOptions,
85}
86
87impl RedbRuntimeEventJournal {
88    /// Open (or create) the redb database at `path`.
89    ///
90    /// Parent directories are created if they do not exist.
91    /// Both tables are initialised on first open.
92    /// Uses `tokio::task::spawn_blocking` because `Database::open` is blocking.
93    pub async fn new(
94        path: impl Into<PathBuf>,
95        options: RedbJournalOptions,
96    ) -> Result<Self, CamelError> {
97        let path = path.into();
98        let db = tokio::task::spawn_blocking(move || {
99            if let Some(parent) = path.parent() {
100                std::fs::create_dir_all(parent).map_err(|e| {
101                    CamelError::Io(format!(
102                        "failed to create journal directory '{}': {e}",
103                        parent.display()
104                    ))
105                })?;
106            }
107            let db = Database::create(&path).map_err(|e| {
108                CamelError::Io(format!(
109                    "failed to open journal at '{}': {e}",
110                    path.display()
111                ))
112            })?;
113            // Initialise tables so they exist before any reads.
114            let tx = db
115                .begin_write()
116                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
117            tx.open_table(EVENTS_TABLE)
118                .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
119            tx.open_table(COMMAND_IDS_TABLE)
120                .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
121            tx.commit()
122                .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
123            Ok::<_, CamelError>(db)
124        })
125        .await
126        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
127
128        Ok(Self {
129            db: Arc::new(db),
130            options,
131        })
132    }
133
134    /// Open an existing database at `path` and return entries (newest-first, up to `filter.limit`).
135    ///
136    /// Uses `Database::open` + `begin_read` — concurrent with a live writer on the same file.
137    /// `inspect` is an offline utility: it does NOT require a live `RedbRuntimeEventJournal` instance.
138    pub async fn inspect(
139        path: impl Into<PathBuf>,
140        filter: JournalInspectFilter,
141    ) -> Result<Vec<JournalEntry>, CamelError> {
142        let path = path.into();
143        let limit = filter.limit;
144        let route_id = filter.route_id;
145        tokio::task::spawn_blocking(move || {
146            if !path.exists() {
147                return Err(CamelError::Io(format!(
148                    "journal file not found: {}",
149                    path.display()
150                )));
151            }
152            let db = Database::open(&path)
153                .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
154            let tx = db
155                .begin_read()
156                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
157            let table = tx
158                .open_table(EVENTS_TABLE)
159                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
160
161            // Collect in descending order (newest first).
162            // Filter by route_id FIRST, then apply limit — ensures we return
163            // `limit` matching entries, not `limit` total entries where most may
164            // not match the filter.
165            let mut entries: Vec<JournalEntry> = Vec::new();
166            for result in table
167                .iter()
168                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
169                .rev()
170            {
171                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
172                let entry: JournalEntry = serde_json::from_slice(v.value())
173                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
174                if let Some(ref rid) = route_id
175                    && entry.event.route_id() != rid.as_str()
176                {
177                    continue;
178                }
179                if entries.len() >= limit {
180                    break;
181                }
182                entries.push(entry);
183            }
184            Ok(entries)
185        })
186        .await
187        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
188    }
189
190    // ── Internal helpers ──────────────────────────────────────────────────────
191
192    fn redb_durability(&self) -> Durability {
193        match self.options.durability {
194            JournalDurability::Immediate => Durability::Immediate,
195            JournalDurability::Eventual => Durability::None,
196        }
197    }
198
199    /// Derive next sequence number from the last key in the events table.
200    /// Must be called inside a write transaction with the table already open.
201    fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
202        match table
203            .iter()
204            .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
205            .next_back()
206        {
207            Some(Ok((k, _))) => Ok(k.value() + 1),
208            Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
209            None => Ok(0),
210        }
211    }
212
213    /// Count rows in the events table (read transaction).
214    fn event_count(&self) -> Result<u64, CamelError> {
215        let tx = self
216            .db
217            .begin_read()
218            .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
219        let table = tx
220            .open_table(EVENTS_TABLE)
221            .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
222        table
223            .len()
224            .map_err(|e| CamelError::Io(format!("redb len: {e}")))
225    }
226
227    /// Compact the events table: remove events for routes that have been fully removed.
228    fn compact(&self) -> Result<(), CamelError> {
229        let tx = self
230            .db
231            .begin_write()
232            .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
233        {
234            let mut table = tx
235                .open_table(EVENTS_TABLE)
236                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
237
238            // Pass 1: read all events in key order, find last RouteRemoved seq per route.
239            let mut last_removed_seq: std::collections::HashMap<String, u64> =
240                std::collections::HashMap::new();
241            for result in table
242                .iter()
243                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
244            {
245                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
246                let seq = k.value();
247                let entry: JournalEntry = serde_json::from_slice(v.value())
248                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
249                if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
250                    last_removed_seq.insert(entry.event.route_id().to_string(), seq);
251                }
252            }
253
254            if last_removed_seq.is_empty() {
255                drop(table);
256                tx.commit()
257                    .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
258                return Ok(());
259            }
260
261            // Pass 2: collect seqs to delete.
262            let mut to_delete: Vec<u64> = Vec::new();
263            for result in table
264                .iter()
265                .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
266            {
267                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
268                let seq = k.value();
269                let entry: JournalEntry = serde_json::from_slice(v.value())
270                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
271                let route_id = entry.event.route_id().to_string();
272                if let Some(&cutoff) = last_removed_seq.get(&route_id)
273                    && seq <= cutoff
274                {
275                    to_delete.push(seq);
276                }
277            }
278
279            for seq in to_delete {
280                table
281                    .remove(&seq)
282                    .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
283            }
284        }
285        tx.commit()
286            .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
287        Ok(())
288    }
289}
290
291// ── RuntimeEvent helper ───────────────────────────────────────────────────────
292
293/// Extension to extract the `route_id` field from any `RuntimeEvent` variant.
294trait RuntimeEventExt {
295    fn route_id(&self) -> &str;
296}
297
298impl RuntimeEventExt for RuntimeEvent {
299    fn route_id(&self) -> &str {
300        match self {
301            RuntimeEvent::RouteRegistered { route_id }
302            | RuntimeEvent::RouteStartRequested { route_id }
303            | RuntimeEvent::RouteStarted { route_id }
304            | RuntimeEvent::RouteFailed { route_id, .. }
305            | RuntimeEvent::RouteStopped { route_id }
306            | RuntimeEvent::RouteSuspended { route_id }
307            | RuntimeEvent::RouteResumed { route_id }
308            | RuntimeEvent::RouteReloaded { route_id }
309            | RuntimeEvent::RouteRemoved { route_id } => route_id,
310        }
311    }
312}
313
314// ── RuntimeEventJournalPort impl ──────────────────────────────────────────────
315
316#[async_trait]
317impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
318    async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
319        if events.is_empty() {
320            return Ok(());
321        }
322        let db = Arc::clone(&self.db);
323        let durability = self.redb_durability();
324        let events = events.to_vec();
325        let now_ms = chrono::Utc::now().timestamp_millis();
326
327        tokio::task::spawn_blocking(move || {
328            // NOTE: `mut` is required — `set_durability` takes `&mut self` in redb v2.
329            let mut tx = db
330                .begin_write()
331                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
332            tx.set_durability(durability)
333                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
334            {
335                let mut table = tx
336                    .open_table(EVENTS_TABLE)
337                    .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
338                let start_seq = Self::next_seq(&table)?;
339                for (next_seq, event) in (start_seq..).zip(events) {
340                    let entry = JournalEntry {
341                        seq: next_seq,
342                        timestamp_ms: now_ms,
343                        event,
344                    };
345                    let bytes = serde_json::to_vec(&entry)
346                        .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
347                    table
348                        .insert(&next_seq, bytes.as_slice())
349                        .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
350                }
351            }
352            tx.commit()
353                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
354            Ok::<_, CamelError>(())
355        })
356        .await
357        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
358        .map_err(|e| DomainError::InvalidState(e.to_string()))?;
359
360        // Trigger compaction if threshold exceeded. Non-fatal if it fails.
361        // Both event_count() and compact() do blocking redb I/O — run in spawn_blocking.
362        let journal_clone = self.clone();
363        let threshold = self.options.compaction_threshold_events;
364        tokio::task::spawn_blocking(move || match journal_clone.event_count() {
365            Ok(count) if count >= threshold => {
366                if let Err(e) = journal_clone.compact() {
367                    tracing::warn!("journal compaction failed (non-fatal): {e}");
368                }
369            }
370            Ok(_) => {}
371            Err(e) => {
372                tracing::warn!("journal event count check failed (non-fatal): {e}");
373            }
374        })
375        .await
376        .ok(); // Non-fatal: if spawn_blocking panics, we ignore it
377
378        Ok(())
379    }
380
381    async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
382        let db = Arc::clone(&self.db);
383        tokio::task::spawn_blocking(move || {
384            let tx = db
385                .begin_read()
386                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
387            let table = tx
388                .open_table(EVENTS_TABLE)
389                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
390            let mut events = Vec::new();
391            for result in table
392                .iter()
393                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
394            {
395                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
396                let entry: JournalEntry = serde_json::from_slice(v.value())
397                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
398                events.push(entry.event);
399            }
400            Ok(events)
401        })
402        .await
403        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
404        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
405    }
406
407    async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
408        let db = Arc::clone(&self.db);
409        let durability = self.redb_durability();
410        let id = command_id.to_string();
411        tokio::task::spawn_blocking(move || {
412            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
413            let mut tx = db
414                .begin_write()
415                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
416            tx.set_durability(durability)
417                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
418            {
419                let mut table = tx
420                    .open_table(COMMAND_IDS_TABLE)
421                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
422                table
423                    .insert(id.as_str(), ())
424                    .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
425            }
426            tx.commit()
427                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
428            Ok::<_, CamelError>(())
429        })
430        .await
431        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
432        .map_err(|e| DomainError::InvalidState(e.to_string()))
433    }
434
435    async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
436        let db = Arc::clone(&self.db);
437        let durability = self.redb_durability();
438        let id = command_id.to_string();
439        tokio::task::spawn_blocking(move || {
440            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
441            let mut tx = db
442                .begin_write()
443                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
444            tx.set_durability(durability)
445                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
446            {
447                let mut table = tx
448                    .open_table(COMMAND_IDS_TABLE)
449                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
450                table
451                    .remove(id.as_str())
452                    .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
453            }
454            tx.commit()
455                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
456            Ok::<_, CamelError>(())
457        })
458        .await
459        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
460        .map_err(|e| DomainError::InvalidState(e.to_string()))
461    }
462
463    async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
464        let db = Arc::clone(&self.db);
465        tokio::task::spawn_blocking(move || {
466            let tx = db
467                .begin_read()
468                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
469            let table = tx
470                .open_table(COMMAND_IDS_TABLE)
471                .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
472            let mut ids = Vec::new();
473            for result in table
474                .iter()
475                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
476            {
477                let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
478                ids.push(k.value().to_string());
479            }
480            Ok(ids)
481        })
482        .await
483        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
484        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
485    }
486}
487
488// ── Unit tests ────────────────────────────────────────────────────────────────
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use tempfile::tempdir;
494
495    async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
496        RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
497            .await
498            .unwrap()
499    }
500
501    #[tokio::test]
502    async fn redb_journal_roundtrip() {
503        let dir = tempdir().unwrap();
504        let journal = new_journal(&dir).await;
505
506        let events = vec![
507            RuntimeEvent::RouteRegistered {
508                route_id: "r1".to_string(),
509            },
510            RuntimeEvent::RouteStarted {
511                route_id: "r1".to_string(),
512            },
513        ];
514        journal.append_batch(&events).await.unwrap();
515
516        let loaded = journal.load_all().await.unwrap();
517        assert_eq!(loaded, events);
518    }
519
520    #[tokio::test]
521    async fn redb_journal_command_id_lifecycle() {
522        let dir = tempdir().unwrap();
523        let journal = new_journal(&dir).await;
524
525        journal.append_command_id("c1").await.unwrap();
526        journal.append_command_id("c2").await.unwrap();
527        journal.remove_command_id("c1").await.unwrap();
528
529        let ids = journal.load_command_ids().await.unwrap();
530        assert_eq!(ids, vec!["c2".to_string()]);
531    }
532
533    #[tokio::test]
534    async fn redb_journal_compaction_removes_completed_routes() {
535        let dir = tempdir().unwrap();
536        // Threshold of 1 triggers compaction on every append.
537        let journal = RedbRuntimeEventJournal::new(
538            dir.path().join("compact.db"),
539            RedbJournalOptions {
540                durability: JournalDurability::Eventual,
541                compaction_threshold_events: 1,
542            },
543        )
544        .await
545        .unwrap();
546
547        // Removed route — full lifecycle.
548        journal
549            .append_batch(&[RuntimeEvent::RouteRegistered {
550                route_id: "old".to_string(),
551            }])
552            .await
553            .unwrap();
554        journal
555            .append_batch(&[RuntimeEvent::RouteRemoved {
556                route_id: "old".to_string(),
557            }])
558            .await
559            .unwrap();
560
561        // Active route — no RouteRemoved.
562        journal
563            .append_batch(&[RuntimeEvent::RouteRegistered {
564                route_id: "live".to_string(),
565            }])
566            .await
567            .unwrap();
568
569        let loaded = journal.load_all().await.unwrap();
570        assert!(
571            !loaded.iter().any(
572                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
573            ),
574            "old route events must be compacted"
575        );
576        assert!(
577            loaded.iter().any(
578                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
579            ),
580            "live route events must survive compaction"
581        );
582    }
583
584    #[tokio::test]
585    async fn redb_journal_compaction_preserves_reregistered_route() {
586        let dir = tempdir().unwrap();
587        let journal = RedbRuntimeEventJournal::new(
588            dir.path().join("rereg.db"),
589            RedbJournalOptions {
590                durability: JournalDurability::Eventual,
591                compaction_threshold_events: 1,
592            },
593        )
594        .await
595        .unwrap();
596
597        journal
598            .append_batch(&[RuntimeEvent::RouteRegistered {
599                route_id: "rereg".to_string(),
600            }])
601            .await
602            .unwrap();
603        journal
604            .append_batch(&[RuntimeEvent::RouteRemoved {
605                route_id: "rereg".to_string(),
606            }])
607            .await
608            .unwrap();
609        journal
610            .append_batch(&[RuntimeEvent::RouteRegistered {
611                route_id: "rereg".to_string(),
612            }])
613            .await
614            .unwrap();
615
616        let loaded = journal.load_all().await.unwrap();
617        let rereg_count = loaded
618            .iter()
619            .filter(
620                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
621            )
622            .count();
623        assert_eq!(
624            rereg_count, 1,
625            "re-registered route must have exactly one event after compaction"
626        );
627    }
628
629    #[tokio::test]
630    async fn redb_journal_durability_eventual() {
631        let dir = tempdir().unwrap();
632        let journal = RedbRuntimeEventJournal::new(
633            dir.path().join("eventual.db"),
634            RedbJournalOptions {
635                durability: JournalDurability::Eventual,
636                compaction_threshold_events: 10_000,
637            },
638        )
639        .await
640        .unwrap();
641
642        journal
643            .append_batch(&[RuntimeEvent::RouteRegistered {
644                route_id: "ev".to_string(),
645            }])
646            .await
647            .unwrap();
648        let loaded = journal.load_all().await.unwrap();
649        assert_eq!(loaded.len(), 1);
650    }
651
652    #[tokio::test]
653    async fn redb_journal_clone_shares_db() {
654        let dir = tempdir().unwrap();
655        let j1 = new_journal(&dir).await;
656        let j2 = j1.clone();
657
658        j1.append_batch(&[RuntimeEvent::RouteRegistered {
659            route_id: "shared".to_string(),
660        }])
661        .await
662        .unwrap();
663
664        // j2 must see j1's write since they share the same Arc<Database>.
665        let loaded = j2.load_all().await.unwrap();
666        assert_eq!(loaded.len(), 1);
667    }
668}