Skip to main content

camel_core/lifecycle/adapters/
redb_journal.rs

1//! Redb-backed runtime event journal.
2//!
3//! # Schema
4//!
5//! - Table `events`: `u64 → &[u8]`  (seq → serde_json bytes of `JournalEntry`)
6//! - Table `command_ids`: `&str → ()`  (presence = alive)
7//!
8//! # Sequence numbers
9//!
10//! No autoincrement in redb. Each `append_batch` derives `next_seq` by reading
11//! the last key via `iter().next_back()` and adding 1; defaults to 0 if empty.
12
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{
18    Database, Durability, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition,
19};
20use serde::{Deserialize, Serialize};
21
22use camel_api::CamelError;
23
24use crate::lifecycle::domain::{DomainError, RuntimeEvent};
25use crate::lifecycle::ports::RuntimeEventJournalPort;
26
27// ── Table definitions ─────────────────────────────────────────────────────────
28
29const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
30const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
31
32// ── Public types ──────────────────────────────────────────────────────────────
33
34/// Durability mode for journal writes.
35#[derive(Debug, Clone, PartialEq, Default)]
36pub enum JournalDurability {
37    /// fsync on every commit — protects against kernel crash and power loss (default).
38    #[default]
39    Immediate,
40    /// No fsync — OS decides flush timing. Suitable for dev/test.
41    Eventual,
42}
43
44/// Options for `RedbRuntimeEventJournal`.
45#[derive(Debug, Clone)]
46pub struct RedbJournalOptions {
47    pub durability: JournalDurability,
48    /// Trigger compaction after this many events in the table. Default: 10_000.
49    pub compaction_threshold_events: u64,
50}
51
52impl Default for RedbJournalOptions {
53    fn default() -> Self {
54        Self {
55            durability: JournalDurability::Immediate,
56            compaction_threshold_events: 10_000,
57        }
58    }
59}
60
61/// Internal wire format stored as redb value bytes (serde_json).
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JournalEntry {
64    pub seq: u64,
65    pub timestamp_ms: i64,
66    pub event: RuntimeEvent,
67}
68
69/// Filter for `RedbRuntimeEventJournal::inspect`.
70pub struct JournalInspectFilter {
71    pub route_id: Option<String>,
72    pub limit: usize,
73}
74
75// ── Adapter ───────────────────────────────────────────────────────────────────
76
77/// Redb-backed implementation of `RuntimeEventJournalPort`.
78///
79/// `Arc<Database>` allows cheap cloning — all clones share the same underlying
80/// redb file handle. `redb::Database` is `Send + Sync`.
81#[derive(Clone)]
82pub struct RedbRuntimeEventJournal {
83    db: Arc<Database>,
84    options: RedbJournalOptions,
85}
86
87impl RedbRuntimeEventJournal {
88    /// Open (or create) the redb database at `path`.
89    ///
90    /// Parent directories are created if they do not exist.
91    /// Both tables are initialised on first open.
92    /// Uses `tokio::task::spawn_blocking` because `Database::open` is blocking.
93    pub async fn new(
94        path: impl Into<PathBuf>,
95        options: RedbJournalOptions,
96    ) -> Result<Self, CamelError> {
97        let path = path.into();
98        let db = tokio::task::spawn_blocking(move || {
99            if let Some(parent) = path.parent() {
100                std::fs::create_dir_all(parent).map_err(|e| {
101                    CamelError::Io(format!(
102                        "failed to create journal directory '{}': {e}",
103                        parent.display()
104                    ))
105                })?;
106            }
107            let db = Database::create(&path).map_err(|e| {
108                CamelError::Io(format!(
109                    "failed to open journal at '{}': {e}",
110                    path.display()
111                ))
112            })?;
113            // Initialise tables so they exist before any reads.
114            let tx = db
115                .begin_write()
116                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
117            tx.open_table(EVENTS_TABLE)
118                .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
119            tx.open_table(COMMAND_IDS_TABLE)
120                .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
121            tx.commit()
122                .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
123            Ok::<_, CamelError>(db)
124        })
125        .await
126        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
127
128        Ok(Self {
129            db: Arc::new(db),
130            options,
131        })
132    }
133
134    /// Open an existing database at `path` and return entries (newest-first, up to `filter.limit`).
135    ///
136    /// Uses `Database::open` + `begin_read` — concurrent with a live writer on the same file.
137    /// `inspect` is an offline utility: it does NOT require a live `RedbRuntimeEventJournal` instance.
138    pub async fn inspect(
139        path: impl Into<PathBuf>,
140        filter: JournalInspectFilter,
141    ) -> Result<Vec<JournalEntry>, CamelError> {
142        let path = path.into();
143        let limit = filter.limit;
144        let route_id = filter.route_id;
145        tokio::task::spawn_blocking(move || {
146            if !path.exists() {
147                return Err(CamelError::Io(format!(
148                    "journal file not found: {}",
149                    path.display()
150                )));
151            }
152            let db = Database::open(&path)
153                .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
154            let tx = db
155                .begin_read()
156                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
157            let table = tx
158                .open_table(EVENTS_TABLE)
159                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
160
161            // Collect in descending order (newest first).
162            // Filter by route_id FIRST, then apply limit — ensures we return
163            // `limit` matching entries, not `limit` total entries where most may
164            // not match the filter.
165            let mut entries: Vec<JournalEntry> = Vec::new();
166            for result in table
167                .iter()
168                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
169                .rev()
170            {
171                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
172                let entry: JournalEntry = serde_json::from_slice(v.value())
173                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
174                if let Some(ref rid) = route_id
175                    && entry.event.route_id() != rid.as_str()
176                {
177                    continue;
178                }
179                if entries.len() >= limit {
180                    break;
181                }
182                entries.push(entry);
183            }
184            Ok(entries)
185        })
186        .await
187        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
188    }
189
190    // ── Internal helpers ──────────────────────────────────────────────────────
191
192    fn redb_durability(&self) -> Durability {
193        match self.options.durability {
194            JournalDurability::Immediate => Durability::Immediate,
195            JournalDurability::Eventual => Durability::None,
196        }
197    }
198
199    /// Derive next sequence number from the last key in the events table.
200    /// Must be called inside a write transaction with the table already open.
201    fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
202        match table
203            .iter()
204            .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
205            .next_back()
206        {
207            Some(Ok((k, _))) => Ok(k.value() + 1),
208            Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
209            None => Ok(0),
210        }
211    }
212
213    /// Count rows in the events table (read transaction).
214    fn event_count(&self) -> Result<u64, CamelError> {
215        let tx = self
216            .db
217            .begin_read()
218            .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
219        let table = tx
220            .open_table(EVENTS_TABLE)
221            .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
222        table
223            .len()
224            .map_err(|e| CamelError::Io(format!("redb len: {e}")))
225    }
226
227    /// Compact the events table: remove events for routes that have been fully removed.
228    fn compact(&self) -> Result<(), CamelError> {
229        let tx = self
230            .db
231            .begin_write()
232            .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
233        {
234            let mut table = tx
235                .open_table(EVENTS_TABLE)
236                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
237
238            // Pass 1: read all events in key order, find last RouteRemoved seq per route.
239            let mut last_removed_seq: std::collections::HashMap<String, u64> =
240                std::collections::HashMap::new();
241            for result in table
242                .iter()
243                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
244            {
245                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
246                let seq = k.value();
247                let entry: JournalEntry = serde_json::from_slice(v.value())
248                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
249                if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
250                    last_removed_seq.insert(entry.event.route_id().to_string(), seq);
251                }
252            }
253
254            if last_removed_seq.is_empty() {
255                drop(table);
256                tx.commit()
257                    .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
258                return Ok(());
259            }
260
261            // Pass 2: collect seqs to delete.
262            let mut to_delete: Vec<u64> = Vec::new();
263            for result in table
264                .iter()
265                .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
266            {
267                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
268                let seq = k.value();
269                let entry: JournalEntry = serde_json::from_slice(v.value())
270                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
271                let route_id = entry.event.route_id().to_string();
272                if let Some(&cutoff) = last_removed_seq.get(&route_id)
273                    && seq <= cutoff
274                {
275                    to_delete.push(seq);
276                }
277            }
278
279            for seq in to_delete {
280                table
281                    .remove(&seq)
282                    .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
283            }
284        }
285        tx.commit()
286            .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
287        Ok(())
288    }
289}
290
291// ── RuntimeEvent helper ───────────────────────────────────────────────────────
292
293/// Extension to extract the `route_id` field from any `RuntimeEvent` variant.
294trait RuntimeEventExt {
295    fn route_id(&self) -> &str;
296}
297
298impl RuntimeEventExt for RuntimeEvent {
299    fn route_id(&self) -> &str {
300        match self {
301            RuntimeEvent::RouteRegistered { route_id }
302            | RuntimeEvent::RouteStartRequested { route_id }
303            | RuntimeEvent::RouteStarted { route_id }
304            | RuntimeEvent::RouteFailed { route_id, .. }
305            | RuntimeEvent::RouteStopped { route_id }
306            | RuntimeEvent::RouteSuspended { route_id }
307            | RuntimeEvent::RouteResumed { route_id }
308            | RuntimeEvent::RouteReloaded { route_id }
309            | RuntimeEvent::RouteRemoved { route_id } => route_id,
310        }
311    }
312}
313
314// ── RuntimeEventJournalPort impl ──────────────────────────────────────────────
315
316#[async_trait]
317impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
318    async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
319        if events.is_empty() {
320            return Ok(());
321        }
322        let db = Arc::clone(&self.db);
323        let durability = self.redb_durability();
324        let events = events.to_vec();
325        let now_ms = chrono::Utc::now().timestamp_millis();
326
327        tokio::task::spawn_blocking(move || {
328            // NOTE: `mut` is required — `set_durability` takes `&mut self` in redb v2.
329            let mut tx = db
330                .begin_write()
331                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
332            tx.set_durability(durability)
333                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
334            {
335                let mut table = tx
336                    .open_table(EVENTS_TABLE)
337                    .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
338                let mut next_seq = Self::next_seq(&table)?;
339                for event in events {
340                    let entry = JournalEntry {
341                        seq: next_seq,
342                        timestamp_ms: now_ms,
343                        event,
344                    };
345                    let bytes = serde_json::to_vec(&entry)
346                        .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
347                    table
348                        .insert(&next_seq, bytes.as_slice())
349                        .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
350                    next_seq += 1;
351                }
352            }
353            tx.commit()
354                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
355            Ok::<_, CamelError>(())
356        })
357        .await
358        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
359        .map_err(|e| DomainError::InvalidState(e.to_string()))?;
360
361        // Trigger compaction if threshold exceeded. Non-fatal if it fails.
362        // Both event_count() and compact() do blocking redb I/O — run in spawn_blocking.
363        let journal_clone = self.clone();
364        let threshold = self.options.compaction_threshold_events;
365        tokio::task::spawn_blocking(move || match journal_clone.event_count() {
366            Ok(count) if count >= threshold => {
367                if let Err(e) = journal_clone.compact() {
368                    tracing::warn!("journal compaction failed (non-fatal): {e}");
369                }
370            }
371            Ok(_) => {}
372            Err(e) => {
373                tracing::warn!("journal event count check failed (non-fatal): {e}");
374            }
375        })
376        .await
377        .ok(); // Non-fatal: if spawn_blocking panics, we ignore it
378
379        Ok(())
380    }
381
382    async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
383        let db = Arc::clone(&self.db);
384        tokio::task::spawn_blocking(move || {
385            let tx = db
386                .begin_read()
387                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
388            let table = tx
389                .open_table(EVENTS_TABLE)
390                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
391            let mut events = Vec::new();
392            for result in table
393                .iter()
394                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
395            {
396                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
397                let entry: JournalEntry = serde_json::from_slice(v.value())
398                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
399                events.push(entry.event);
400            }
401            Ok(events)
402        })
403        .await
404        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
405        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
406    }
407
408    async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
409        let db = Arc::clone(&self.db);
410        let durability = self.redb_durability();
411        let id = command_id.to_string();
412        tokio::task::spawn_blocking(move || {
413            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
414            let mut tx = db
415                .begin_write()
416                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
417            tx.set_durability(durability)
418                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
419            {
420                let mut table = tx
421                    .open_table(COMMAND_IDS_TABLE)
422                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
423                table
424                    .insert(id.as_str(), ())
425                    .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
426            }
427            tx.commit()
428                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
429            Ok::<_, CamelError>(())
430        })
431        .await
432        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
433        .map_err(|e| DomainError::InvalidState(e.to_string()))
434    }
435
436    async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
437        let db = Arc::clone(&self.db);
438        let durability = self.redb_durability();
439        let id = command_id.to_string();
440        tokio::task::spawn_blocking(move || {
441            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
442            let mut tx = db
443                .begin_write()
444                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
445            tx.set_durability(durability)
446                .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
447            {
448                let mut table = tx
449                    .open_table(COMMAND_IDS_TABLE)
450                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
451                table
452                    .remove(id.as_str())
453                    .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
454            }
455            tx.commit()
456                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
457            Ok::<_, CamelError>(())
458        })
459        .await
460        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
461        .map_err(|e| DomainError::InvalidState(e.to_string()))
462    }
463
464    async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
465        let db = Arc::clone(&self.db);
466        tokio::task::spawn_blocking(move || {
467            let tx = db
468                .begin_read()
469                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
470            let table = tx
471                .open_table(COMMAND_IDS_TABLE)
472                .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
473            let mut ids = Vec::new();
474            for result in table
475                .iter()
476                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
477            {
478                let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
479                ids.push(k.value().to_string());
480            }
481            Ok(ids)
482        })
483        .await
484        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
485        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
486    }
487}
488
489// ── Unit tests ────────────────────────────────────────────────────────────────
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use tempfile::tempdir;
495
496    async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
497        RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
498            .await
499            .unwrap()
500    }
501
502    #[tokio::test]
503    async fn redb_journal_roundtrip() {
504        let dir = tempdir().unwrap();
505        let journal = new_journal(&dir).await;
506
507        let events = vec![
508            RuntimeEvent::RouteRegistered {
509                route_id: "r1".to_string(),
510            },
511            RuntimeEvent::RouteStarted {
512                route_id: "r1".to_string(),
513            },
514        ];
515        journal.append_batch(&events).await.unwrap();
516
517        let loaded = journal.load_all().await.unwrap();
518        assert_eq!(loaded, events);
519    }
520
521    #[tokio::test]
522    async fn redb_journal_command_id_lifecycle() {
523        let dir = tempdir().unwrap();
524        let journal = new_journal(&dir).await;
525
526        journal.append_command_id("c1").await.unwrap();
527        journal.append_command_id("c2").await.unwrap();
528        journal.remove_command_id("c1").await.unwrap();
529
530        let ids = journal.load_command_ids().await.unwrap();
531        assert_eq!(ids, vec!["c2".to_string()]);
532    }
533
534    #[tokio::test]
535    async fn redb_journal_compaction_removes_completed_routes() {
536        let dir = tempdir().unwrap();
537        // Threshold of 1 triggers compaction on every append.
538        let journal = RedbRuntimeEventJournal::new(
539            dir.path().join("compact.db"),
540            RedbJournalOptions {
541                durability: JournalDurability::Eventual,
542                compaction_threshold_events: 1,
543            },
544        )
545        .await
546        .unwrap();
547
548        // Removed route — full lifecycle.
549        journal
550            .append_batch(&[RuntimeEvent::RouteRegistered {
551                route_id: "old".to_string(),
552            }])
553            .await
554            .unwrap();
555        journal
556            .append_batch(&[RuntimeEvent::RouteRemoved {
557                route_id: "old".to_string(),
558            }])
559            .await
560            .unwrap();
561
562        // Active route — no RouteRemoved.
563        journal
564            .append_batch(&[RuntimeEvent::RouteRegistered {
565                route_id: "live".to_string(),
566            }])
567            .await
568            .unwrap();
569
570        let loaded = journal.load_all().await.unwrap();
571        assert!(
572            !loaded.iter().any(
573                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
574            ),
575            "old route events must be compacted"
576        );
577        assert!(
578            loaded.iter().any(
579                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
580            ),
581            "live route events must survive compaction"
582        );
583    }
584
585    #[tokio::test]
586    async fn redb_journal_compaction_preserves_reregistered_route() {
587        let dir = tempdir().unwrap();
588        let journal = RedbRuntimeEventJournal::new(
589            dir.path().join("rereg.db"),
590            RedbJournalOptions {
591                durability: JournalDurability::Eventual,
592                compaction_threshold_events: 1,
593            },
594        )
595        .await
596        .unwrap();
597
598        journal
599            .append_batch(&[RuntimeEvent::RouteRegistered {
600                route_id: "rereg".to_string(),
601            }])
602            .await
603            .unwrap();
604        journal
605            .append_batch(&[RuntimeEvent::RouteRemoved {
606                route_id: "rereg".to_string(),
607            }])
608            .await
609            .unwrap();
610        journal
611            .append_batch(&[RuntimeEvent::RouteRegistered {
612                route_id: "rereg".to_string(),
613            }])
614            .await
615            .unwrap();
616
617        let loaded = journal.load_all().await.unwrap();
618        let rereg_count = loaded
619            .iter()
620            .filter(
621                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
622            )
623            .count();
624        assert_eq!(
625            rereg_count, 1,
626            "re-registered route must have exactly one event after compaction"
627        );
628    }
629
630    #[tokio::test]
631    async fn redb_journal_durability_eventual() {
632        let dir = tempdir().unwrap();
633        let journal = RedbRuntimeEventJournal::new(
634            dir.path().join("eventual.db"),
635            RedbJournalOptions {
636                durability: JournalDurability::Eventual,
637                compaction_threshold_events: 10_000,
638            },
639        )
640        .await
641        .unwrap();
642
643        journal
644            .append_batch(&[RuntimeEvent::RouteRegistered {
645                route_id: "ev".to_string(),
646            }])
647            .await
648            .unwrap();
649        let loaded = journal.load_all().await.unwrap();
650        assert_eq!(loaded.len(), 1);
651    }
652
653    #[tokio::test]
654    async fn redb_journal_clone_shares_db() {
655        let dir = tempdir().unwrap();
656        let j1 = new_journal(&dir).await;
657        let j2 = j1.clone();
658
659        j1.append_batch(&[RuntimeEvent::RouteRegistered {
660            route_id: "shared".to_string(),
661        }])
662        .await
663        .unwrap();
664
665        // j2 must see j1's write since they share the same Arc<Database>.
666        let loaded = j2.load_all().await.unwrap();
667        assert_eq!(loaded.len(), 1);
668    }
669}