Skip to main content

camel_core/lifecycle/adapters/
redb_journal.rs

1//! Redb-backed runtime event journal.
2//!
3//! # Schema
4//!
5//! - Table `events`: `u64 → &[u8]`  (seq → serde_json bytes of `JournalEntry`)
6//! - Table `command_ids`: `&str → ()`  (presence = alive)
7//!
8//! # Sequence numbers
9//!
10//! No autoincrement in redb. Each `append_batch` derives `next_seq` by reading
11//! the last key via `iter().next_back()` and adding 1; defaults to 0 if empty.
12
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
18use serde::{Deserialize, Serialize};
19
20use camel_api::CamelError;
21
22use crate::lifecycle::domain::RuntimeEvent;
23use crate::lifecycle::ports::RuntimeEventJournalPort;
24
25// ── Table definitions ─────────────────────────────────────────────────────────
26
27const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
28const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
29
30// ── Public types ──────────────────────────────────────────────────────────────
31
32/// Durability mode for journal writes.
33#[derive(Debug, Clone, PartialEq, Default)]
34pub enum JournalDurability {
35    /// fsync on every commit — protects against kernel crash and power loss (default).
36    #[default]
37    Immediate,
38    /// No fsync — OS decides flush timing. Suitable for dev/test.
39    Eventual,
40}
41
42/// Options for `RedbRuntimeEventJournal`.
43#[derive(Debug, Clone)]
44pub struct RedbJournalOptions {
45    pub durability: JournalDurability,
46    /// Trigger compaction after this many events in the table. Default: 10_000.
47    pub compaction_threshold_events: u64,
48}
49
50impl Default for RedbJournalOptions {
51    fn default() -> Self {
52        Self {
53            durability: JournalDurability::Immediate,
54            compaction_threshold_events: 10_000,
55        }
56    }
57}
58
59/// Internal wire format stored as redb value bytes (serde_json).
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct JournalEntry {
62    pub seq: u64,
63    pub timestamp_ms: i64,
64    pub event: RuntimeEvent,
65}
66
67/// Filter for `RedbRuntimeEventJournal::inspect`.
68pub struct JournalInspectFilter {
69    pub route_id: Option<String>,
70    pub limit: usize,
71}
72
73// ── Adapter ───────────────────────────────────────────────────────────────────
74
75/// Redb-backed implementation of `RuntimeEventJournalPort`.
76///
77/// `Arc<Database>` allows cheap cloning — all clones share the same underlying
78/// redb file handle. `redb::Database` is `Send + Sync`.
79#[derive(Clone)]
80pub struct RedbRuntimeEventJournal {
81    db: Arc<Database>,
82    options: RedbJournalOptions,
83}
84
85impl RedbRuntimeEventJournal {
86    /// Open (or create) the redb database at `path`.
87    ///
88    /// Parent directories are created if they do not exist.
89    /// Both tables are initialised on first open.
90    /// Uses `tokio::task::spawn_blocking` because `Database::open` is blocking.
91    pub async fn new(
92        path: impl Into<PathBuf>,
93        options: RedbJournalOptions,
94    ) -> Result<Self, CamelError> {
95        let path = path.into();
96        let db = tokio::task::spawn_blocking(move || {
97            if let Some(parent) = path.parent() {
98                std::fs::create_dir_all(parent).map_err(|e| {
99                    CamelError::Io(format!(
100                        "failed to create journal directory '{}': {e}",
101                        parent.display()
102                    ))
103                })?;
104            }
105            let db = Database::create(&path).map_err(|e| {
106                CamelError::Io(format!(
107                    "failed to open journal at '{}': {e}",
108                    path.display()
109                ))
110            })?;
111            // Initialise tables so they exist before any reads.
112            let tx = db
113                .begin_write()
114                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
115            tx.open_table(EVENTS_TABLE)
116                .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
117            tx.open_table(COMMAND_IDS_TABLE)
118                .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
119            tx.commit()
120                .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
121            Ok::<_, CamelError>(db)
122        })
123        .await
124        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
125
126        Ok(Self {
127            db: Arc::new(db),
128            options,
129        })
130    }
131
132    /// Open an existing database at `path` and return entries (newest-first, up to `filter.limit`).
133    ///
134    /// Uses `Database::open` + `begin_read` — concurrent with a live writer on the same file.
135    /// `inspect` is an offline utility: it does NOT require a live `RedbRuntimeEventJournal` instance.
136    pub async fn inspect(
137        path: impl Into<PathBuf>,
138        filter: JournalInspectFilter,
139    ) -> Result<Vec<JournalEntry>, CamelError> {
140        let path = path.into();
141        let limit = filter.limit;
142        let route_id = filter.route_id;
143        tokio::task::spawn_blocking(move || {
144            if !path.exists() {
145                return Err(CamelError::Io(format!(
146                    "journal file not found: {}",
147                    path.display()
148                )));
149            }
150            let db = Database::open(&path)
151                .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
152            let tx = db
153                .begin_read()
154                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
155            let table = tx
156                .open_table(EVENTS_TABLE)
157                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
158
159            // Collect in descending order (newest first).
160            // Filter by route_id FIRST, then apply limit — ensures we return
161            // `limit` matching entries, not `limit` total entries where most may
162            // not match the filter.
163            let mut entries: Vec<JournalEntry> = Vec::new();
164            for result in table
165                .iter()
166                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
167                .rev()
168            {
169                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
170                let entry: JournalEntry = serde_json::from_slice(v.value())
171                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
172                if let Some(ref rid) = route_id
173                    && entry.event.route_id() != rid.as_str()
174                {
175                    continue;
176                }
177                if entries.len() >= limit {
178                    break;
179                }
180                entries.push(entry);
181            }
182            Ok(entries)
183        })
184        .await
185        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
186    }
187
188    // ── Internal helpers ──────────────────────────────────────────────────────
189
190    fn redb_durability(&self) -> redb::Durability {
191        match self.options.durability {
192            JournalDurability::Immediate => redb::Durability::Immediate,
193            JournalDurability::Eventual => redb::Durability::Eventual,
194        }
195    }
196
197    /// Derive next sequence number from the last key in the events table.
198    /// Must be called inside a write transaction with the table already open.
199    fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
200        match table
201            .iter()
202            .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
203            .next_back()
204        {
205            Some(Ok((k, _))) => Ok(k.value() + 1),
206            Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
207            None => Ok(0),
208        }
209    }
210
211    /// Count rows in the events table (read transaction).
212    fn event_count(&self) -> Result<u64, CamelError> {
213        let tx = self
214            .db
215            .begin_read()
216            .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
217        let table = tx
218            .open_table(EVENTS_TABLE)
219            .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
220        table
221            .len()
222            .map_err(|e| CamelError::Io(format!("redb len: {e}")))
223    }
224
225    /// Compact the events table: remove events for routes that have been fully removed.
226    fn compact(&self) -> Result<(), CamelError> {
227        let tx = self
228            .db
229            .begin_write()
230            .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
231        {
232            let mut table = tx
233                .open_table(EVENTS_TABLE)
234                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
235
236            // Pass 1: read all events in key order, find last RouteRemoved seq per route.
237            let mut last_removed_seq: std::collections::HashMap<String, u64> =
238                std::collections::HashMap::new();
239            for result in table
240                .iter()
241                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
242            {
243                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
244                let seq = k.value();
245                let entry: JournalEntry = serde_json::from_slice(v.value())
246                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
247                if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
248                    last_removed_seq.insert(entry.event.route_id().to_string(), seq);
249                }
250            }
251
252            if last_removed_seq.is_empty() {
253                drop(table);
254                tx.commit()
255                    .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
256                return Ok(());
257            }
258
259            // Pass 2: collect seqs to delete.
260            let mut to_delete: Vec<u64> = Vec::new();
261            for result in table
262                .iter()
263                .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
264            {
265                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
266                let seq = k.value();
267                let entry: JournalEntry = serde_json::from_slice(v.value())
268                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
269                let route_id = entry.event.route_id().to_string();
270                if let Some(&cutoff) = last_removed_seq.get(&route_id)
271                    && seq <= cutoff
272                {
273                    to_delete.push(seq);
274                }
275            }
276
277            for seq in to_delete {
278                table
279                    .remove(&seq)
280                    .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
281            }
282        }
283        tx.commit()
284            .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
285        Ok(())
286    }
287}
288
289// ── RuntimeEvent helper ───────────────────────────────────────────────────────
290
291/// Extension to extract the `route_id` field from any `RuntimeEvent` variant.
292trait RuntimeEventExt {
293    fn route_id(&self) -> &str;
294}
295
296impl RuntimeEventExt for RuntimeEvent {
297    fn route_id(&self) -> &str {
298        match self {
299            RuntimeEvent::RouteRegistered { route_id }
300            | RuntimeEvent::RouteStartRequested { route_id }
301            | RuntimeEvent::RouteStarted { route_id }
302            | RuntimeEvent::RouteFailed { route_id, .. }
303            | RuntimeEvent::RouteStopped { route_id }
304            | RuntimeEvent::RouteSuspended { route_id }
305            | RuntimeEvent::RouteResumed { route_id }
306            | RuntimeEvent::RouteReloaded { route_id }
307            | RuntimeEvent::RouteRemoved { route_id } => route_id,
308        }
309    }
310}
311
312// ── RuntimeEventJournalPort impl ──────────────────────────────────────────────
313
314#[async_trait]
315impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
316    async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
317        if events.is_empty() {
318            return Ok(());
319        }
320        let db = Arc::clone(&self.db);
321        let durability = self.redb_durability();
322        let events = events.to_vec();
323        let now_ms = chrono::Utc::now().timestamp_millis();
324
325        tokio::task::spawn_blocking(move || {
326            // NOTE: `mut` is required — `set_durability` takes `&mut self` in redb v2.
327            let mut tx = db
328                .begin_write()
329                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
330            tx.set_durability(durability);
331            {
332                let mut table = tx
333                    .open_table(EVENTS_TABLE)
334                    .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
335                let mut next_seq = Self::next_seq(&table)?;
336                for event in events {
337                    let entry = JournalEntry {
338                        seq: next_seq,
339                        timestamp_ms: now_ms,
340                        event,
341                    };
342                    let bytes = serde_json::to_vec(&entry)
343                        .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
344                    table
345                        .insert(&next_seq, bytes.as_slice())
346                        .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
347                    next_seq += 1;
348                }
349            }
350            tx.commit()
351                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
352            Ok::<_, CamelError>(())
353        })
354        .await
355        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
356
357        // Trigger compaction if threshold exceeded. Non-fatal if it fails.
358        // Both event_count() and compact() do blocking redb I/O — run in spawn_blocking.
359        let journal_clone = self.clone();
360        let threshold = self.options.compaction_threshold_events;
361        tokio::task::spawn_blocking(move || match journal_clone.event_count() {
362            Ok(count) if count >= threshold => {
363                if let Err(e) = journal_clone.compact() {
364                    tracing::warn!("journal compaction failed (non-fatal): {e}");
365                }
366            }
367            Ok(_) => {}
368            Err(e) => {
369                tracing::warn!("journal event count check failed (non-fatal): {e}");
370            }
371        })
372        .await
373        .ok(); // Non-fatal: if spawn_blocking panics, we ignore it
374
375        Ok(())
376    }
377
378    async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
379        let db = Arc::clone(&self.db);
380        tokio::task::spawn_blocking(move || {
381            let tx = db
382                .begin_read()
383                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
384            let table = tx
385                .open_table(EVENTS_TABLE)
386                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
387            let mut events = Vec::new();
388            for result in table
389                .iter()
390                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
391            {
392                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
393                let entry: JournalEntry = serde_json::from_slice(v.value())
394                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
395                events.push(entry.event);
396            }
397            Ok(events)
398        })
399        .await
400        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
401    }
402
403    async fn append_command_id(&self, command_id: &str) -> Result<(), CamelError> {
404        let db = Arc::clone(&self.db);
405        let durability = self.redb_durability();
406        let id = command_id.to_string();
407        tokio::task::spawn_blocking(move || {
408            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
409            let mut tx = db
410                .begin_write()
411                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
412            tx.set_durability(durability);
413            {
414                let mut table = tx
415                    .open_table(COMMAND_IDS_TABLE)
416                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
417                table
418                    .insert(id.as_str(), ())
419                    .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
420            }
421            tx.commit()
422                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
423            Ok::<_, CamelError>(())
424        })
425        .await
426        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
427    }
428
429    async fn remove_command_id(&self, command_id: &str) -> Result<(), CamelError> {
430        let db = Arc::clone(&self.db);
431        let durability = self.redb_durability();
432        let id = command_id.to_string();
433        tokio::task::spawn_blocking(move || {
434            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
435            let mut tx = db
436                .begin_write()
437                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
438            tx.set_durability(durability);
439            {
440                let mut table = tx
441                    .open_table(COMMAND_IDS_TABLE)
442                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
443                table
444                    .remove(id.as_str())
445                    .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
446            }
447            tx.commit()
448                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
449            Ok::<_, CamelError>(())
450        })
451        .await
452        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
453    }
454
455    async fn load_command_ids(&self) -> Result<Vec<String>, CamelError> {
456        let db = Arc::clone(&self.db);
457        tokio::task::spawn_blocking(move || {
458            let tx = db
459                .begin_read()
460                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
461            let table = tx
462                .open_table(COMMAND_IDS_TABLE)
463                .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
464            let mut ids = Vec::new();
465            for result in table
466                .iter()
467                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
468            {
469                let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
470                ids.push(k.value().to_string());
471            }
472            Ok(ids)
473        })
474        .await
475        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
476    }
477}
478
479// ── Unit tests ────────────────────────────────────────────────────────────────
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use tempfile::tempdir;
485
486    async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
487        RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
488            .await
489            .unwrap()
490    }
491
492    #[tokio::test]
493    async fn redb_journal_roundtrip() {
494        let dir = tempdir().unwrap();
495        let journal = new_journal(&dir).await;
496
497        let events = vec![
498            RuntimeEvent::RouteRegistered {
499                route_id: "r1".to_string(),
500            },
501            RuntimeEvent::RouteStarted {
502                route_id: "r1".to_string(),
503            },
504        ];
505        journal.append_batch(&events).await.unwrap();
506
507        let loaded = journal.load_all().await.unwrap();
508        assert_eq!(loaded, events);
509    }
510
511    #[tokio::test]
512    async fn redb_journal_command_id_lifecycle() {
513        let dir = tempdir().unwrap();
514        let journal = new_journal(&dir).await;
515
516        journal.append_command_id("c1").await.unwrap();
517        journal.append_command_id("c2").await.unwrap();
518        journal.remove_command_id("c1").await.unwrap();
519
520        let ids = journal.load_command_ids().await.unwrap();
521        assert_eq!(ids, vec!["c2".to_string()]);
522    }
523
524    #[tokio::test]
525    async fn redb_journal_compaction_removes_completed_routes() {
526        let dir = tempdir().unwrap();
527        // Threshold of 1 triggers compaction on every append.
528        let journal = RedbRuntimeEventJournal::new(
529            dir.path().join("compact.db"),
530            RedbJournalOptions {
531                durability: JournalDurability::Eventual,
532                compaction_threshold_events: 1,
533            },
534        )
535        .await
536        .unwrap();
537
538        // Removed route — full lifecycle.
539        journal
540            .append_batch(&[RuntimeEvent::RouteRegistered {
541                route_id: "old".to_string(),
542            }])
543            .await
544            .unwrap();
545        journal
546            .append_batch(&[RuntimeEvent::RouteRemoved {
547                route_id: "old".to_string(),
548            }])
549            .await
550            .unwrap();
551
552        // Active route — no RouteRemoved.
553        journal
554            .append_batch(&[RuntimeEvent::RouteRegistered {
555                route_id: "live".to_string(),
556            }])
557            .await
558            .unwrap();
559
560        let loaded = journal.load_all().await.unwrap();
561        assert!(
562            !loaded.iter().any(
563                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
564            ),
565            "old route events must be compacted"
566        );
567        assert!(
568            loaded.iter().any(
569                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
570            ),
571            "live route events must survive compaction"
572        );
573    }
574
575    #[tokio::test]
576    async fn redb_journal_compaction_preserves_reregistered_route() {
577        let dir = tempdir().unwrap();
578        let journal = RedbRuntimeEventJournal::new(
579            dir.path().join("rereg.db"),
580            RedbJournalOptions {
581                durability: JournalDurability::Eventual,
582                compaction_threshold_events: 1,
583            },
584        )
585        .await
586        .unwrap();
587
588        journal
589            .append_batch(&[RuntimeEvent::RouteRegistered {
590                route_id: "rereg".to_string(),
591            }])
592            .await
593            .unwrap();
594        journal
595            .append_batch(&[RuntimeEvent::RouteRemoved {
596                route_id: "rereg".to_string(),
597            }])
598            .await
599            .unwrap();
600        journal
601            .append_batch(&[RuntimeEvent::RouteRegistered {
602                route_id: "rereg".to_string(),
603            }])
604            .await
605            .unwrap();
606
607        let loaded = journal.load_all().await.unwrap();
608        let rereg_count = loaded
609            .iter()
610            .filter(
611                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
612            )
613            .count();
614        assert_eq!(
615            rereg_count, 1,
616            "re-registered route must have exactly one event after compaction"
617        );
618    }
619
620    #[tokio::test]
621    async fn redb_journal_durability_eventual() {
622        let dir = tempdir().unwrap();
623        let journal = RedbRuntimeEventJournal::new(
624            dir.path().join("eventual.db"),
625            RedbJournalOptions {
626                durability: JournalDurability::Eventual,
627                compaction_threshold_events: 10_000,
628            },
629        )
630        .await
631        .unwrap();
632
633        journal
634            .append_batch(&[RuntimeEvent::RouteRegistered {
635                route_id: "ev".to_string(),
636            }])
637            .await
638            .unwrap();
639        let loaded = journal.load_all().await.unwrap();
640        assert_eq!(loaded.len(), 1);
641    }
642
643    #[tokio::test]
644    async fn redb_journal_clone_shares_db() {
645        let dir = tempdir().unwrap();
646        let j1 = new_journal(&dir).await;
647        let j2 = j1.clone();
648
649        j1.append_batch(&[RuntimeEvent::RouteRegistered {
650            route_id: "shared".to_string(),
651        }])
652        .await
653        .unwrap();
654
655        // j2 must see j1's write since they share the same Arc<Database>.
656        let loaded = j2.load_all().await.unwrap();
657        assert_eq!(loaded.len(), 1);
658    }
659}