Skip to main content

camel_core/lifecycle/adapters/
redb_journal.rs

1//! Redb-backed runtime event journal.
2//!
3//! # Schema
4//!
5//! - Table `events`: `u64 → &[u8]`  (seq → serde_json bytes of `JournalEntry`)
6//! - Table `command_ids`: `&str → ()`  (presence = alive)
7//!
8//! # Sequence numbers
9//!
10//! No autoincrement in redb. Each `append_batch` derives `next_seq` by reading
11//! the last key via `iter().next_back()` and adding 1; defaults to 0 if empty.
12
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
18use serde::{Deserialize, Serialize};
19
20use camel_api::CamelError;
21
22use crate::lifecycle::domain::{DomainError, RuntimeEvent};
23use crate::lifecycle::ports::RuntimeEventJournalPort;
24
25// ── Table definitions ─────────────────────────────────────────────────────────
26
27const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
28const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
29
30// ── Public types ──────────────────────────────────────────────────────────────
31
32/// Durability mode for journal writes.
33#[derive(Debug, Clone, PartialEq, Default)]
34pub enum JournalDurability {
35    /// fsync on every commit — protects against kernel crash and power loss (default).
36    #[default]
37    Immediate,
38    /// No fsync — OS decides flush timing. Suitable for dev/test.
39    Eventual,
40}
41
42/// Options for `RedbRuntimeEventJournal`.
43#[derive(Debug, Clone)]
44pub struct RedbJournalOptions {
45    pub durability: JournalDurability,
46    /// Trigger compaction after this many events in the table. Default: 10_000.
47    pub compaction_threshold_events: u64,
48}
49
50impl Default for RedbJournalOptions {
51    fn default() -> Self {
52        Self {
53            durability: JournalDurability::Immediate,
54            compaction_threshold_events: 10_000,
55        }
56    }
57}
58
59/// Internal wire format stored as redb value bytes (serde_json).
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct JournalEntry {
62    pub seq: u64,
63    pub timestamp_ms: i64,
64    pub event: RuntimeEvent,
65}
66
67/// Filter for `RedbRuntimeEventJournal::inspect`.
68pub struct JournalInspectFilter {
69    pub route_id: Option<String>,
70    pub limit: usize,
71}
72
73// ── Adapter ───────────────────────────────────────────────────────────────────
74
75/// Redb-backed implementation of `RuntimeEventJournalPort`.
76///
77/// `Arc<Database>` allows cheap cloning — all clones share the same underlying
78/// redb file handle. `redb::Database` is `Send + Sync`.
79#[derive(Clone)]
80pub struct RedbRuntimeEventJournal {
81    db: Arc<Database>,
82    options: RedbJournalOptions,
83}
84
85impl RedbRuntimeEventJournal {
86    /// Open (or create) the redb database at `path`.
87    ///
88    /// Parent directories are created if they do not exist.
89    /// Both tables are initialised on first open.
90    /// Uses `tokio::task::spawn_blocking` because `Database::open` is blocking.
91    pub async fn new(
92        path: impl Into<PathBuf>,
93        options: RedbJournalOptions,
94    ) -> Result<Self, CamelError> {
95        let path = path.into();
96        let db = tokio::task::spawn_blocking(move || {
97            if let Some(parent) = path.parent() {
98                std::fs::create_dir_all(parent).map_err(|e| {
99                    CamelError::Io(format!(
100                        "failed to create journal directory '{}': {e}",
101                        parent.display()
102                    ))
103                })?;
104            }
105            let db = Database::create(&path).map_err(|e| {
106                CamelError::Io(format!(
107                    "failed to open journal at '{}': {e}",
108                    path.display()
109                ))
110            })?;
111            // Initialise tables so they exist before any reads.
112            let tx = db
113                .begin_write()
114                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
115            tx.open_table(EVENTS_TABLE)
116                .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
117            tx.open_table(COMMAND_IDS_TABLE)
118                .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
119            tx.commit()
120                .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
121            Ok::<_, CamelError>(db)
122        })
123        .await
124        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
125
126        Ok(Self {
127            db: Arc::new(db),
128            options,
129        })
130    }
131
132    /// Open an existing database at `path` and return entries (newest-first, up to `filter.limit`).
133    ///
134    /// Uses `Database::open` + `begin_read` — concurrent with a live writer on the same file.
135    /// `inspect` is an offline utility: it does NOT require a live `RedbRuntimeEventJournal` instance.
136    pub async fn inspect(
137        path: impl Into<PathBuf>,
138        filter: JournalInspectFilter,
139    ) -> Result<Vec<JournalEntry>, CamelError> {
140        let path = path.into();
141        let limit = filter.limit;
142        let route_id = filter.route_id;
143        tokio::task::spawn_blocking(move || {
144            if !path.exists() {
145                return Err(CamelError::Io(format!(
146                    "journal file not found: {}",
147                    path.display()
148                )));
149            }
150            let db = Database::open(&path)
151                .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
152            let tx = db
153                .begin_read()
154                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
155            let table = tx
156                .open_table(EVENTS_TABLE)
157                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
158
159            // Collect in descending order (newest first).
160            // Filter by route_id FIRST, then apply limit — ensures we return
161            // `limit` matching entries, not `limit` total entries where most may
162            // not match the filter.
163            let mut entries: Vec<JournalEntry> = Vec::new();
164            for result in table
165                .iter()
166                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
167                .rev()
168            {
169                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
170                let entry: JournalEntry = serde_json::from_slice(v.value())
171                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
172                if let Some(ref rid) = route_id
173                    && entry.event.route_id() != rid.as_str()
174                {
175                    continue;
176                }
177                if entries.len() >= limit {
178                    break;
179                }
180                entries.push(entry);
181            }
182            Ok(entries)
183        })
184        .await
185        .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
186    }
187
188    // ── Internal helpers ──────────────────────────────────────────────────────
189
190    fn redb_durability(&self) -> redb::Durability {
191        match self.options.durability {
192            JournalDurability::Immediate => redb::Durability::Immediate,
193            JournalDurability::Eventual => redb::Durability::Eventual,
194        }
195    }
196
197    /// Derive next sequence number from the last key in the events table.
198    /// Must be called inside a write transaction with the table already open.
199    fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
200        match table
201            .iter()
202            .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
203            .next_back()
204        {
205            Some(Ok((k, _))) => Ok(k.value() + 1),
206            Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
207            None => Ok(0),
208        }
209    }
210
211    /// Count rows in the events table (read transaction).
212    fn event_count(&self) -> Result<u64, CamelError> {
213        let tx = self
214            .db
215            .begin_read()
216            .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
217        let table = tx
218            .open_table(EVENTS_TABLE)
219            .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
220        table
221            .len()
222            .map_err(|e| CamelError::Io(format!("redb len: {e}")))
223    }
224
225    /// Compact the events table: remove events for routes that have been fully removed.
226    fn compact(&self) -> Result<(), CamelError> {
227        let tx = self
228            .db
229            .begin_write()
230            .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
231        {
232            let mut table = tx
233                .open_table(EVENTS_TABLE)
234                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
235
236            // Pass 1: read all events in key order, find last RouteRemoved seq per route.
237            let mut last_removed_seq: std::collections::HashMap<String, u64> =
238                std::collections::HashMap::new();
239            for result in table
240                .iter()
241                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
242            {
243                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
244                let seq = k.value();
245                let entry: JournalEntry = serde_json::from_slice(v.value())
246                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
247                if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
248                    last_removed_seq.insert(entry.event.route_id().to_string(), seq);
249                }
250            }
251
252            if last_removed_seq.is_empty() {
253                drop(table);
254                tx.commit()
255                    .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
256                return Ok(());
257            }
258
259            // Pass 2: collect seqs to delete.
260            let mut to_delete: Vec<u64> = Vec::new();
261            for result in table
262                .iter()
263                .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
264            {
265                let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
266                let seq = k.value();
267                let entry: JournalEntry = serde_json::from_slice(v.value())
268                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
269                let route_id = entry.event.route_id().to_string();
270                if let Some(&cutoff) = last_removed_seq.get(&route_id)
271                    && seq <= cutoff
272                {
273                    to_delete.push(seq);
274                }
275            }
276
277            for seq in to_delete {
278                table
279                    .remove(&seq)
280                    .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
281            }
282        }
283        tx.commit()
284            .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
285        Ok(())
286    }
287}
288
289// ── RuntimeEvent helper ───────────────────────────────────────────────────────
290
291/// Extension to extract the `route_id` field from any `RuntimeEvent` variant.
292trait RuntimeEventExt {
293    fn route_id(&self) -> &str;
294}
295
296impl RuntimeEventExt for RuntimeEvent {
297    fn route_id(&self) -> &str {
298        match self {
299            RuntimeEvent::RouteRegistered { route_id }
300            | RuntimeEvent::RouteStartRequested { route_id }
301            | RuntimeEvent::RouteStarted { route_id }
302            | RuntimeEvent::RouteFailed { route_id, .. }
303            | RuntimeEvent::RouteStopped { route_id }
304            | RuntimeEvent::RouteSuspended { route_id }
305            | RuntimeEvent::RouteResumed { route_id }
306            | RuntimeEvent::RouteReloaded { route_id }
307            | RuntimeEvent::RouteRemoved { route_id } => route_id,
308        }
309    }
310}
311
312// ── RuntimeEventJournalPort impl ──────────────────────────────────────────────
313
314#[async_trait]
315impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
316    async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
317        if events.is_empty() {
318            return Ok(());
319        }
320        let db = Arc::clone(&self.db);
321        let durability = self.redb_durability();
322        let events = events.to_vec();
323        let now_ms = chrono::Utc::now().timestamp_millis();
324
325        tokio::task::spawn_blocking(move || {
326            // NOTE: `mut` is required — `set_durability` takes `&mut self` in redb v2.
327            let mut tx = db
328                .begin_write()
329                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
330            tx.set_durability(durability);
331            {
332                let mut table = tx
333                    .open_table(EVENTS_TABLE)
334                    .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
335                let mut next_seq = Self::next_seq(&table)?;
336                for event in events {
337                    let entry = JournalEntry {
338                        seq: next_seq,
339                        timestamp_ms: now_ms,
340                        event,
341                    };
342                    let bytes = serde_json::to_vec(&entry)
343                        .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
344                    table
345                        .insert(&next_seq, bytes.as_slice())
346                        .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
347                    next_seq += 1;
348                }
349            }
350            tx.commit()
351                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
352            Ok::<_, CamelError>(())
353        })
354        .await
355        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
356        .map_err(|e| DomainError::InvalidState(e.to_string()))?;
357
358        // Trigger compaction if threshold exceeded. Non-fatal if it fails.
359        // Both event_count() and compact() do blocking redb I/O — run in spawn_blocking.
360        let journal_clone = self.clone();
361        let threshold = self.options.compaction_threshold_events;
362        tokio::task::spawn_blocking(move || match journal_clone.event_count() {
363            Ok(count) if count >= threshold => {
364                if let Err(e) = journal_clone.compact() {
365                    tracing::warn!("journal compaction failed (non-fatal): {e}");
366                }
367            }
368            Ok(_) => {}
369            Err(e) => {
370                tracing::warn!("journal event count check failed (non-fatal): {e}");
371            }
372        })
373        .await
374        .ok(); // Non-fatal: if spawn_blocking panics, we ignore it
375
376        Ok(())
377    }
378
379    async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
380        let db = Arc::clone(&self.db);
381        tokio::task::spawn_blocking(move || {
382            let tx = db
383                .begin_read()
384                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
385            let table = tx
386                .open_table(EVENTS_TABLE)
387                .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
388            let mut events = Vec::new();
389            for result in table
390                .iter()
391                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
392            {
393                let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
394                let entry: JournalEntry = serde_json::from_slice(v.value())
395                    .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
396                events.push(entry.event);
397            }
398            Ok(events)
399        })
400        .await
401        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
402        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
403    }
404
405    async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
406        let db = Arc::clone(&self.db);
407        let durability = self.redb_durability();
408        let id = command_id.to_string();
409        tokio::task::spawn_blocking(move || {
410            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
411            let mut tx = db
412                .begin_write()
413                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
414            tx.set_durability(durability);
415            {
416                let mut table = tx
417                    .open_table(COMMAND_IDS_TABLE)
418                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
419                table
420                    .insert(id.as_str(), ())
421                    .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
422            }
423            tx.commit()
424                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
425            Ok::<_, CamelError>(())
426        })
427        .await
428        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
429        .map_err(|e| DomainError::InvalidState(e.to_string()))
430    }
431
432    async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
433        let db = Arc::clone(&self.db);
434        let durability = self.redb_durability();
435        let id = command_id.to_string();
436        tokio::task::spawn_blocking(move || {
437            // NOTE: `mut` required — `set_durability` takes `&mut self` in redb v2.
438            let mut tx = db
439                .begin_write()
440                .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
441            tx.set_durability(durability);
442            {
443                let mut table = tx
444                    .open_table(COMMAND_IDS_TABLE)
445                    .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
446                table
447                    .remove(id.as_str())
448                    .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
449            }
450            tx.commit()
451                .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
452            Ok::<_, CamelError>(())
453        })
454        .await
455        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
456        .map_err(|e| DomainError::InvalidState(e.to_string()))
457    }
458
459    async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
460        let db = Arc::clone(&self.db);
461        tokio::task::spawn_blocking(move || {
462            let tx = db
463                .begin_read()
464                .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
465            let table = tx
466                .open_table(COMMAND_IDS_TABLE)
467                .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
468            let mut ids = Vec::new();
469            for result in table
470                .iter()
471                .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
472            {
473                let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
474                ids.push(k.value().to_string());
475            }
476            Ok(ids)
477        })
478        .await
479        .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
480        .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
481    }
482}
483
484// ── Unit tests ────────────────────────────────────────────────────────────────
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use tempfile::tempdir;
490
491    async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
492        RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
493            .await
494            .unwrap()
495    }
496
497    #[tokio::test]
498    async fn redb_journal_roundtrip() {
499        let dir = tempdir().unwrap();
500        let journal = new_journal(&dir).await;
501
502        let events = vec![
503            RuntimeEvent::RouteRegistered {
504                route_id: "r1".to_string(),
505            },
506            RuntimeEvent::RouteStarted {
507                route_id: "r1".to_string(),
508            },
509        ];
510        journal.append_batch(&events).await.unwrap();
511
512        let loaded = journal.load_all().await.unwrap();
513        assert_eq!(loaded, events);
514    }
515
516    #[tokio::test]
517    async fn redb_journal_command_id_lifecycle() {
518        let dir = tempdir().unwrap();
519        let journal = new_journal(&dir).await;
520
521        journal.append_command_id("c1").await.unwrap();
522        journal.append_command_id("c2").await.unwrap();
523        journal.remove_command_id("c1").await.unwrap();
524
525        let ids = journal.load_command_ids().await.unwrap();
526        assert_eq!(ids, vec!["c2".to_string()]);
527    }
528
529    #[tokio::test]
530    async fn redb_journal_compaction_removes_completed_routes() {
531        let dir = tempdir().unwrap();
532        // Threshold of 1 triggers compaction on every append.
533        let journal = RedbRuntimeEventJournal::new(
534            dir.path().join("compact.db"),
535            RedbJournalOptions {
536                durability: JournalDurability::Eventual,
537                compaction_threshold_events: 1,
538            },
539        )
540        .await
541        .unwrap();
542
543        // Removed route — full lifecycle.
544        journal
545            .append_batch(&[RuntimeEvent::RouteRegistered {
546                route_id: "old".to_string(),
547            }])
548            .await
549            .unwrap();
550        journal
551            .append_batch(&[RuntimeEvent::RouteRemoved {
552                route_id: "old".to_string(),
553            }])
554            .await
555            .unwrap();
556
557        // Active route — no RouteRemoved.
558        journal
559            .append_batch(&[RuntimeEvent::RouteRegistered {
560                route_id: "live".to_string(),
561            }])
562            .await
563            .unwrap();
564
565        let loaded = journal.load_all().await.unwrap();
566        assert!(
567            !loaded.iter().any(
568                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
569            ),
570            "old route events must be compacted"
571        );
572        assert!(
573            loaded.iter().any(
574                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
575            ),
576            "live route events must survive compaction"
577        );
578    }
579
580    #[tokio::test]
581    async fn redb_journal_compaction_preserves_reregistered_route() {
582        let dir = tempdir().unwrap();
583        let journal = RedbRuntimeEventJournal::new(
584            dir.path().join("rereg.db"),
585            RedbJournalOptions {
586                durability: JournalDurability::Eventual,
587                compaction_threshold_events: 1,
588            },
589        )
590        .await
591        .unwrap();
592
593        journal
594            .append_batch(&[RuntimeEvent::RouteRegistered {
595                route_id: "rereg".to_string(),
596            }])
597            .await
598            .unwrap();
599        journal
600            .append_batch(&[RuntimeEvent::RouteRemoved {
601                route_id: "rereg".to_string(),
602            }])
603            .await
604            .unwrap();
605        journal
606            .append_batch(&[RuntimeEvent::RouteRegistered {
607                route_id: "rereg".to_string(),
608            }])
609            .await
610            .unwrap();
611
612        let loaded = journal.load_all().await.unwrap();
613        let rereg_count = loaded
614            .iter()
615            .filter(
616                |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
617            )
618            .count();
619        assert_eq!(
620            rereg_count, 1,
621            "re-registered route must have exactly one event after compaction"
622        );
623    }
624
625    #[tokio::test]
626    async fn redb_journal_durability_eventual() {
627        let dir = tempdir().unwrap();
628        let journal = RedbRuntimeEventJournal::new(
629            dir.path().join("eventual.db"),
630            RedbJournalOptions {
631                durability: JournalDurability::Eventual,
632                compaction_threshold_events: 10_000,
633            },
634        )
635        .await
636        .unwrap();
637
638        journal
639            .append_batch(&[RuntimeEvent::RouteRegistered {
640                route_id: "ev".to_string(),
641            }])
642            .await
643            .unwrap();
644        let loaded = journal.load_all().await.unwrap();
645        assert_eq!(loaded.len(), 1);
646    }
647
648    #[tokio::test]
649    async fn redb_journal_clone_shares_db() {
650        let dir = tempdir().unwrap();
651        let j1 = new_journal(&dir).await;
652        let j2 = j1.clone();
653
654        j1.append_batch(&[RuntimeEvent::RouteRegistered {
655            route_id: "shared".to_string(),
656        }])
657        .await
658        .unwrap();
659
660        // j2 must see j1's write since they share the same Arc<Database>.
661        let loaded = j2.load_all().await.unwrap();
662        assert_eq!(loaded.len(), 1);
663    }
664}