camel_core/lifecycle/adapters/
redb_journal.rs1use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
18use serde::{Deserialize, Serialize};
19
20use camel_api::CamelError;
21
22use crate::lifecycle::domain::{DomainError, RuntimeEvent};
23use crate::lifecycle::ports::RuntimeEventJournalPort;
24
25const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
28const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
29
30#[derive(Debug, Clone, PartialEq, Default)]
34pub enum JournalDurability {
35 #[default]
37 Immediate,
38 Eventual,
40}
41
42#[derive(Debug, Clone)]
44pub struct RedbJournalOptions {
45 pub durability: JournalDurability,
46 pub compaction_threshold_events: u64,
48}
49
50impl Default for RedbJournalOptions {
51 fn default() -> Self {
52 Self {
53 durability: JournalDurability::Immediate,
54 compaction_threshold_events: 10_000,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct JournalEntry {
62 pub seq: u64,
63 pub timestamp_ms: i64,
64 pub event: RuntimeEvent,
65}
66
67pub struct JournalInspectFilter {
69 pub route_id: Option<String>,
70 pub limit: usize,
71}
72
73#[derive(Clone)]
80pub struct RedbRuntimeEventJournal {
81 db: Arc<Database>,
82 options: RedbJournalOptions,
83}
84
85impl RedbRuntimeEventJournal {
86 pub async fn new(
92 path: impl Into<PathBuf>,
93 options: RedbJournalOptions,
94 ) -> Result<Self, CamelError> {
95 let path = path.into();
96 let db = tokio::task::spawn_blocking(move || {
97 if let Some(parent) = path.parent() {
98 std::fs::create_dir_all(parent).map_err(|e| {
99 CamelError::Io(format!(
100 "failed to create journal directory '{}': {e}",
101 parent.display()
102 ))
103 })?;
104 }
105 let db = Database::create(&path).map_err(|e| {
106 CamelError::Io(format!(
107 "failed to open journal at '{}': {e}",
108 path.display()
109 ))
110 })?;
111 let tx = db
113 .begin_write()
114 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
115 tx.open_table(EVENTS_TABLE)
116 .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
117 tx.open_table(COMMAND_IDS_TABLE)
118 .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
119 tx.commit()
120 .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
121 Ok::<_, CamelError>(db)
122 })
123 .await
124 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
125
126 Ok(Self {
127 db: Arc::new(db),
128 options,
129 })
130 }
131
132 pub async fn inspect(
137 path: impl Into<PathBuf>,
138 filter: JournalInspectFilter,
139 ) -> Result<Vec<JournalEntry>, CamelError> {
140 let path = path.into();
141 let limit = filter.limit;
142 let route_id = filter.route_id;
143 tokio::task::spawn_blocking(move || {
144 if !path.exists() {
145 return Err(CamelError::Io(format!(
146 "journal file not found: {}",
147 path.display()
148 )));
149 }
150 let db = Database::open(&path)
151 .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
152 let tx = db
153 .begin_read()
154 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
155 let table = tx
156 .open_table(EVENTS_TABLE)
157 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
158
159 let mut entries: Vec<JournalEntry> = Vec::new();
164 for result in table
165 .iter()
166 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
167 .rev()
168 {
169 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
170 let entry: JournalEntry = serde_json::from_slice(v.value())
171 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
172 if let Some(ref rid) = route_id
173 && entry.event.route_id() != rid.as_str()
174 {
175 continue;
176 }
177 if entries.len() >= limit {
178 break;
179 }
180 entries.push(entry);
181 }
182 Ok(entries)
183 })
184 .await
185 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
186 }
187
188 fn redb_durability(&self) -> redb::Durability {
191 match self.options.durability {
192 JournalDurability::Immediate => redb::Durability::Immediate,
193 JournalDurability::Eventual => redb::Durability::Eventual,
194 }
195 }
196
197 fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
200 match table
201 .iter()
202 .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
203 .next_back()
204 {
205 Some(Ok((k, _))) => Ok(k.value() + 1),
206 Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
207 None => Ok(0),
208 }
209 }
210
211 fn event_count(&self) -> Result<u64, CamelError> {
213 let tx = self
214 .db
215 .begin_read()
216 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
217 let table = tx
218 .open_table(EVENTS_TABLE)
219 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
220 table
221 .len()
222 .map_err(|e| CamelError::Io(format!("redb len: {e}")))
223 }
224
225 fn compact(&self) -> Result<(), CamelError> {
227 let tx = self
228 .db
229 .begin_write()
230 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
231 {
232 let mut table = tx
233 .open_table(EVENTS_TABLE)
234 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
235
236 let mut last_removed_seq: std::collections::HashMap<String, u64> =
238 std::collections::HashMap::new();
239 for result in table
240 .iter()
241 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
242 {
243 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
244 let seq = k.value();
245 let entry: JournalEntry = serde_json::from_slice(v.value())
246 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
247 if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
248 last_removed_seq.insert(entry.event.route_id().to_string(), seq);
249 }
250 }
251
252 if last_removed_seq.is_empty() {
253 drop(table);
254 tx.commit()
255 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
256 return Ok(());
257 }
258
259 let mut to_delete: Vec<u64> = Vec::new();
261 for result in table
262 .iter()
263 .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
264 {
265 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
266 let seq = k.value();
267 let entry: JournalEntry = serde_json::from_slice(v.value())
268 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
269 let route_id = entry.event.route_id().to_string();
270 if let Some(&cutoff) = last_removed_seq.get(&route_id)
271 && seq <= cutoff
272 {
273 to_delete.push(seq);
274 }
275 }
276
277 for seq in to_delete {
278 table
279 .remove(&seq)
280 .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
281 }
282 }
283 tx.commit()
284 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
285 Ok(())
286 }
287}
288
289trait RuntimeEventExt {
293 fn route_id(&self) -> &str;
294}
295
296impl RuntimeEventExt for RuntimeEvent {
297 fn route_id(&self) -> &str {
298 match self {
299 RuntimeEvent::RouteRegistered { route_id }
300 | RuntimeEvent::RouteStartRequested { route_id }
301 | RuntimeEvent::RouteStarted { route_id }
302 | RuntimeEvent::RouteFailed { route_id, .. }
303 | RuntimeEvent::RouteStopped { route_id }
304 | RuntimeEvent::RouteSuspended { route_id }
305 | RuntimeEvent::RouteResumed { route_id }
306 | RuntimeEvent::RouteReloaded { route_id }
307 | RuntimeEvent::RouteRemoved { route_id } => route_id,
308 }
309 }
310}
311
312#[async_trait]
315impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
316 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
317 if events.is_empty() {
318 return Ok(());
319 }
320 let db = Arc::clone(&self.db);
321 let durability = self.redb_durability();
322 let events = events.to_vec();
323 let now_ms = chrono::Utc::now().timestamp_millis();
324
325 tokio::task::spawn_blocking(move || {
326 let mut tx = db
328 .begin_write()
329 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
330 tx.set_durability(durability);
331 {
332 let mut table = tx
333 .open_table(EVENTS_TABLE)
334 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
335 let mut next_seq = Self::next_seq(&table)?;
336 for event in events {
337 let entry = JournalEntry {
338 seq: next_seq,
339 timestamp_ms: now_ms,
340 event,
341 };
342 let bytes = serde_json::to_vec(&entry)
343 .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
344 table
345 .insert(&next_seq, bytes.as_slice())
346 .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
347 next_seq += 1;
348 }
349 }
350 tx.commit()
351 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
352 Ok::<_, CamelError>(())
353 })
354 .await
355 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
356 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
357
358 let journal_clone = self.clone();
361 let threshold = self.options.compaction_threshold_events;
362 tokio::task::spawn_blocking(move || match journal_clone.event_count() {
363 Ok(count) if count >= threshold => {
364 if let Err(e) = journal_clone.compact() {
365 tracing::warn!("journal compaction failed (non-fatal): {e}");
366 }
367 }
368 Ok(_) => {}
369 Err(e) => {
370 tracing::warn!("journal event count check failed (non-fatal): {e}");
371 }
372 })
373 .await
374 .ok(); Ok(())
377 }
378
379 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
380 let db = Arc::clone(&self.db);
381 tokio::task::spawn_blocking(move || {
382 let tx = db
383 .begin_read()
384 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
385 let table = tx
386 .open_table(EVENTS_TABLE)
387 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
388 let mut events = Vec::new();
389 for result in table
390 .iter()
391 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
392 {
393 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
394 let entry: JournalEntry = serde_json::from_slice(v.value())
395 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
396 events.push(entry.event);
397 }
398 Ok(events)
399 })
400 .await
401 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
402 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
403 }
404
405 async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
406 let db = Arc::clone(&self.db);
407 let durability = self.redb_durability();
408 let id = command_id.to_string();
409 tokio::task::spawn_blocking(move || {
410 let mut tx = db
412 .begin_write()
413 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
414 tx.set_durability(durability);
415 {
416 let mut table = tx
417 .open_table(COMMAND_IDS_TABLE)
418 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
419 table
420 .insert(id.as_str(), ())
421 .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
422 }
423 tx.commit()
424 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
425 Ok::<_, CamelError>(())
426 })
427 .await
428 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
429 .map_err(|e| DomainError::InvalidState(e.to_string()))
430 }
431
432 async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
433 let db = Arc::clone(&self.db);
434 let durability = self.redb_durability();
435 let id = command_id.to_string();
436 tokio::task::spawn_blocking(move || {
437 let mut tx = db
439 .begin_write()
440 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
441 tx.set_durability(durability);
442 {
443 let mut table = tx
444 .open_table(COMMAND_IDS_TABLE)
445 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
446 table
447 .remove(id.as_str())
448 .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
449 }
450 tx.commit()
451 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
452 Ok::<_, CamelError>(())
453 })
454 .await
455 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
456 .map_err(|e| DomainError::InvalidState(e.to_string()))
457 }
458
459 async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
460 let db = Arc::clone(&self.db);
461 tokio::task::spawn_blocking(move || {
462 let tx = db
463 .begin_read()
464 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
465 let table = tx
466 .open_table(COMMAND_IDS_TABLE)
467 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
468 let mut ids = Vec::new();
469 for result in table
470 .iter()
471 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
472 {
473 let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
474 ids.push(k.value().to_string());
475 }
476 Ok(ids)
477 })
478 .await
479 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
480 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
481 }
482}
483
484#[cfg(test)]
487mod tests {
488 use super::*;
489 use tempfile::tempdir;
490
491 async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
492 RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
493 .await
494 .unwrap()
495 }
496
497 #[tokio::test]
498 async fn redb_journal_roundtrip() {
499 let dir = tempdir().unwrap();
500 let journal = new_journal(&dir).await;
501
502 let events = vec![
503 RuntimeEvent::RouteRegistered {
504 route_id: "r1".to_string(),
505 },
506 RuntimeEvent::RouteStarted {
507 route_id: "r1".to_string(),
508 },
509 ];
510 journal.append_batch(&events).await.unwrap();
511
512 let loaded = journal.load_all().await.unwrap();
513 assert_eq!(loaded, events);
514 }
515
516 #[tokio::test]
517 async fn redb_journal_command_id_lifecycle() {
518 let dir = tempdir().unwrap();
519 let journal = new_journal(&dir).await;
520
521 journal.append_command_id("c1").await.unwrap();
522 journal.append_command_id("c2").await.unwrap();
523 journal.remove_command_id("c1").await.unwrap();
524
525 let ids = journal.load_command_ids().await.unwrap();
526 assert_eq!(ids, vec!["c2".to_string()]);
527 }
528
529 #[tokio::test]
530 async fn redb_journal_compaction_removes_completed_routes() {
531 let dir = tempdir().unwrap();
532 let journal = RedbRuntimeEventJournal::new(
534 dir.path().join("compact.db"),
535 RedbJournalOptions {
536 durability: JournalDurability::Eventual,
537 compaction_threshold_events: 1,
538 },
539 )
540 .await
541 .unwrap();
542
543 journal
545 .append_batch(&[RuntimeEvent::RouteRegistered {
546 route_id: "old".to_string(),
547 }])
548 .await
549 .unwrap();
550 journal
551 .append_batch(&[RuntimeEvent::RouteRemoved {
552 route_id: "old".to_string(),
553 }])
554 .await
555 .unwrap();
556
557 journal
559 .append_batch(&[RuntimeEvent::RouteRegistered {
560 route_id: "live".to_string(),
561 }])
562 .await
563 .unwrap();
564
565 let loaded = journal.load_all().await.unwrap();
566 assert!(
567 !loaded.iter().any(
568 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
569 ),
570 "old route events must be compacted"
571 );
572 assert!(
573 loaded.iter().any(
574 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
575 ),
576 "live route events must survive compaction"
577 );
578 }
579
580 #[tokio::test]
581 async fn redb_journal_compaction_preserves_reregistered_route() {
582 let dir = tempdir().unwrap();
583 let journal = RedbRuntimeEventJournal::new(
584 dir.path().join("rereg.db"),
585 RedbJournalOptions {
586 durability: JournalDurability::Eventual,
587 compaction_threshold_events: 1,
588 },
589 )
590 .await
591 .unwrap();
592
593 journal
594 .append_batch(&[RuntimeEvent::RouteRegistered {
595 route_id: "rereg".to_string(),
596 }])
597 .await
598 .unwrap();
599 journal
600 .append_batch(&[RuntimeEvent::RouteRemoved {
601 route_id: "rereg".to_string(),
602 }])
603 .await
604 .unwrap();
605 journal
606 .append_batch(&[RuntimeEvent::RouteRegistered {
607 route_id: "rereg".to_string(),
608 }])
609 .await
610 .unwrap();
611
612 let loaded = journal.load_all().await.unwrap();
613 let rereg_count = loaded
614 .iter()
615 .filter(
616 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
617 )
618 .count();
619 assert_eq!(
620 rereg_count, 1,
621 "re-registered route must have exactly one event after compaction"
622 );
623 }
624
625 #[tokio::test]
626 async fn redb_journal_durability_eventual() {
627 let dir = tempdir().unwrap();
628 let journal = RedbRuntimeEventJournal::new(
629 dir.path().join("eventual.db"),
630 RedbJournalOptions {
631 durability: JournalDurability::Eventual,
632 compaction_threshold_events: 10_000,
633 },
634 )
635 .await
636 .unwrap();
637
638 journal
639 .append_batch(&[RuntimeEvent::RouteRegistered {
640 route_id: "ev".to_string(),
641 }])
642 .await
643 .unwrap();
644 let loaded = journal.load_all().await.unwrap();
645 assert_eq!(loaded.len(), 1);
646 }
647
648 #[tokio::test]
649 async fn redb_journal_clone_shares_db() {
650 let dir = tempdir().unwrap();
651 let j1 = new_journal(&dir).await;
652 let j2 = j1.clone();
653
654 j1.append_batch(&[RuntimeEvent::RouteRegistered {
655 route_id: "shared".to_string(),
656 }])
657 .await
658 .unwrap();
659
660 let loaded = j2.load_all().await.unwrap();
662 assert_eq!(loaded.len(), 1);
663 }
664}