camel_core/lifecycle/adapters/
redb_journal.rs1use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{
18 Database, Durability, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition,
19};
20use serde::{Deserialize, Serialize};
21
22use camel_api::CamelError;
23
24use crate::lifecycle::domain::{DomainError, RuntimeEvent};
25use crate::lifecycle::ports::RuntimeEventJournalPort;
26
27const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
30const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
31
32#[derive(Debug, Clone, PartialEq, Default)]
36pub enum JournalDurability {
37 #[default]
39 Immediate,
40 Eventual,
42}
43
44#[derive(Debug, Clone)]
46pub struct RedbJournalOptions {
47 pub durability: JournalDurability,
48 pub compaction_threshold_events: u64,
50}
51
52impl Default for RedbJournalOptions {
53 fn default() -> Self {
54 Self {
55 durability: JournalDurability::Immediate,
56 compaction_threshold_events: 10_000,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JournalEntry {
64 pub seq: u64,
65 pub timestamp_ms: i64,
66 pub event: RuntimeEvent,
67}
68
69pub struct JournalInspectFilter {
71 pub route_id: Option<String>,
72 pub limit: usize,
73}
74
75#[derive(Clone)]
82pub struct RedbRuntimeEventJournal {
83 db: Arc<Database>,
84 options: RedbJournalOptions,
85}
86
87impl RedbRuntimeEventJournal {
88 pub async fn new(
94 path: impl Into<PathBuf>,
95 options: RedbJournalOptions,
96 ) -> Result<Self, CamelError> {
97 let path = path.into();
98 let db = tokio::task::spawn_blocking(move || {
99 if let Some(parent) = path.parent() {
100 std::fs::create_dir_all(parent).map_err(|e| {
101 CamelError::Io(format!(
102 "failed to create journal directory '{}': {e}",
103 parent.display()
104 ))
105 })?;
106 }
107 let db = Database::create(&path).map_err(|e| {
108 CamelError::Io(format!(
109 "failed to open journal at '{}': {e}",
110 path.display()
111 ))
112 })?;
113 let tx = db
115 .begin_write()
116 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
117 tx.open_table(EVENTS_TABLE)
118 .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
119 tx.open_table(COMMAND_IDS_TABLE)
120 .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
121 tx.commit()
122 .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
123 Ok::<_, CamelError>(db)
124 })
125 .await
126 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
127
128 Ok(Self {
129 db: Arc::new(db),
130 options,
131 })
132 }
133
134 pub async fn inspect(
139 path: impl Into<PathBuf>,
140 filter: JournalInspectFilter,
141 ) -> Result<Vec<JournalEntry>, CamelError> {
142 let path = path.into();
143 let limit = filter.limit;
144 let route_id = filter.route_id;
145 tokio::task::spawn_blocking(move || {
146 if !path.exists() {
147 return Err(CamelError::Io(format!(
148 "journal file not found: {}",
149 path.display()
150 )));
151 }
152 let db = Database::open(&path)
153 .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
154 let tx = db
155 .begin_read()
156 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
157 let table = tx
158 .open_table(EVENTS_TABLE)
159 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
160
161 let mut entries: Vec<JournalEntry> = Vec::new();
166 for result in table
167 .iter()
168 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
169 .rev()
170 {
171 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
172 let entry: JournalEntry = serde_json::from_slice(v.value())
173 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
174 if let Some(ref rid) = route_id
175 && entry.event.route_id() != rid.as_str()
176 {
177 continue;
178 }
179 if entries.len() >= limit {
180 break;
181 }
182 entries.push(entry);
183 }
184 Ok(entries)
185 })
186 .await
187 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
188 }
189
190 fn redb_durability(&self) -> Durability {
193 match self.options.durability {
194 JournalDurability::Immediate => Durability::Immediate,
195 JournalDurability::Eventual => Durability::None,
196 }
197 }
198
199 fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
202 match table
203 .iter()
204 .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
205 .next_back()
206 {
207 Some(Ok((k, _))) => Ok(k.value() + 1),
208 Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
209 None => Ok(0),
210 }
211 }
212
213 fn event_count(&self) -> Result<u64, CamelError> {
215 let tx = self
216 .db
217 .begin_read()
218 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
219 let table = tx
220 .open_table(EVENTS_TABLE)
221 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
222 table
223 .len()
224 .map_err(|e| CamelError::Io(format!("redb len: {e}")))
225 }
226
227 fn compact(&self) -> Result<(), CamelError> {
229 let tx = self
230 .db
231 .begin_write()
232 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
233 {
234 let mut table = tx
235 .open_table(EVENTS_TABLE)
236 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
237
238 let mut last_removed_seq: std::collections::HashMap<String, u64> =
240 std::collections::HashMap::new();
241 for result in table
242 .iter()
243 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
244 {
245 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
246 let seq = k.value();
247 let entry: JournalEntry = serde_json::from_slice(v.value())
248 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
249 if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
250 last_removed_seq.insert(entry.event.route_id().to_string(), seq);
251 }
252 }
253
254 if last_removed_seq.is_empty() {
255 drop(table);
256 tx.commit()
257 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
258 return Ok(());
259 }
260
261 let mut to_delete: Vec<u64> = Vec::new();
263 for result in table
264 .iter()
265 .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
266 {
267 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
268 let seq = k.value();
269 let entry: JournalEntry = serde_json::from_slice(v.value())
270 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
271 let route_id = entry.event.route_id().to_string();
272 if let Some(&cutoff) = last_removed_seq.get(&route_id)
273 && seq <= cutoff
274 {
275 to_delete.push(seq);
276 }
277 }
278
279 for seq in to_delete {
280 table
281 .remove(&seq)
282 .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
283 }
284 }
285 tx.commit()
286 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
287 Ok(())
288 }
289}
290
291trait RuntimeEventExt {
295 fn route_id(&self) -> &str;
296}
297
298impl RuntimeEventExt for RuntimeEvent {
299 fn route_id(&self) -> &str {
300 match self {
301 RuntimeEvent::RouteRegistered { route_id }
302 | RuntimeEvent::RouteStartRequested { route_id }
303 | RuntimeEvent::RouteStarted { route_id }
304 | RuntimeEvent::RouteFailed { route_id, .. }
305 | RuntimeEvent::RouteStopped { route_id }
306 | RuntimeEvent::RouteSuspended { route_id }
307 | RuntimeEvent::RouteResumed { route_id }
308 | RuntimeEvent::RouteReloaded { route_id }
309 | RuntimeEvent::RouteRemoved { route_id } => route_id,
310 }
311 }
312}
313
314#[async_trait]
317impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
318 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
319 if events.is_empty() {
320 return Ok(());
321 }
322 let db = Arc::clone(&self.db);
323 let durability = self.redb_durability();
324 let events = events.to_vec();
325 let now_ms = chrono::Utc::now().timestamp_millis();
326
327 tokio::task::spawn_blocking(move || {
328 let mut tx = db
330 .begin_write()
331 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
332 tx.set_durability(durability)
333 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
334 {
335 let mut table = tx
336 .open_table(EVENTS_TABLE)
337 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
338 let start_seq = Self::next_seq(&table)?;
339 for (next_seq, event) in (start_seq..).zip(events) {
340 let entry = JournalEntry {
341 seq: next_seq,
342 timestamp_ms: now_ms,
343 event,
344 };
345 let bytes = serde_json::to_vec(&entry)
346 .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
347 table
348 .insert(&next_seq, bytes.as_slice())
349 .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
350 }
351 }
352 tx.commit()
353 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
354 Ok::<_, CamelError>(())
355 })
356 .await
357 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
358 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
359
360 let journal_clone = self.clone();
363 let threshold = self.options.compaction_threshold_events;
364 tokio::task::spawn_blocking(move || match journal_clone.event_count() {
365 Ok(count) if count >= threshold => {
366 if let Err(e) = journal_clone.compact() {
367 tracing::warn!("journal compaction failed (non-fatal): {e}");
368 }
369 }
370 Ok(_) => {}
371 Err(e) => {
372 tracing::warn!("journal event count check failed (non-fatal): {e}");
373 }
374 })
375 .await
376 .ok(); Ok(())
379 }
380
381 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
382 let db = Arc::clone(&self.db);
383 tokio::task::spawn_blocking(move || {
384 let tx = db
385 .begin_read()
386 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
387 let table = tx
388 .open_table(EVENTS_TABLE)
389 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
390 let mut events = Vec::new();
391 for result in table
392 .iter()
393 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
394 {
395 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
396 let entry: JournalEntry = serde_json::from_slice(v.value())
397 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
398 events.push(entry.event);
399 }
400 Ok(events)
401 })
402 .await
403 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
404 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
405 }
406
407 async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
408 let db = Arc::clone(&self.db);
409 let durability = self.redb_durability();
410 let id = command_id.to_string();
411 tokio::task::spawn_blocking(move || {
412 let mut tx = db
414 .begin_write()
415 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
416 tx.set_durability(durability)
417 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
418 {
419 let mut table = tx
420 .open_table(COMMAND_IDS_TABLE)
421 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
422 table
423 .insert(id.as_str(), ())
424 .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
425 }
426 tx.commit()
427 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
428 Ok::<_, CamelError>(())
429 })
430 .await
431 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
432 .map_err(|e| DomainError::InvalidState(e.to_string()))
433 }
434
435 async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
436 let db = Arc::clone(&self.db);
437 let durability = self.redb_durability();
438 let id = command_id.to_string();
439 tokio::task::spawn_blocking(move || {
440 let mut tx = db
442 .begin_write()
443 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
444 tx.set_durability(durability)
445 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
446 {
447 let mut table = tx
448 .open_table(COMMAND_IDS_TABLE)
449 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
450 table
451 .remove(id.as_str())
452 .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
453 }
454 tx.commit()
455 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
456 Ok::<_, CamelError>(())
457 })
458 .await
459 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
460 .map_err(|e| DomainError::InvalidState(e.to_string()))
461 }
462
463 async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
464 let db = Arc::clone(&self.db);
465 tokio::task::spawn_blocking(move || {
466 let tx = db
467 .begin_read()
468 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
469 let table = tx
470 .open_table(COMMAND_IDS_TABLE)
471 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
472 let mut ids = Vec::new();
473 for result in table
474 .iter()
475 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
476 {
477 let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
478 ids.push(k.value().to_string());
479 }
480 Ok(ids)
481 })
482 .await
483 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
484 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
485 }
486}
487
488#[cfg(test)]
491mod tests {
492 use super::*;
493 use tempfile::tempdir;
494
495 async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
496 RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
497 .await
498 .unwrap()
499 }
500
501 #[tokio::test]
502 async fn redb_journal_roundtrip() {
503 let dir = tempdir().unwrap();
504 let journal = new_journal(&dir).await;
505
506 let events = vec![
507 RuntimeEvent::RouteRegistered {
508 route_id: "r1".to_string(),
509 },
510 RuntimeEvent::RouteStarted {
511 route_id: "r1".to_string(),
512 },
513 ];
514 journal.append_batch(&events).await.unwrap();
515
516 let loaded = journal.load_all().await.unwrap();
517 assert_eq!(loaded, events);
518 }
519
520 #[tokio::test]
521 async fn redb_journal_command_id_lifecycle() {
522 let dir = tempdir().unwrap();
523 let journal = new_journal(&dir).await;
524
525 journal.append_command_id("c1").await.unwrap();
526 journal.append_command_id("c2").await.unwrap();
527 journal.remove_command_id("c1").await.unwrap();
528
529 let ids = journal.load_command_ids().await.unwrap();
530 assert_eq!(ids, vec!["c2".to_string()]);
531 }
532
533 #[tokio::test]
534 async fn redb_journal_compaction_removes_completed_routes() {
535 let dir = tempdir().unwrap();
536 let journal = RedbRuntimeEventJournal::new(
538 dir.path().join("compact.db"),
539 RedbJournalOptions {
540 durability: JournalDurability::Eventual,
541 compaction_threshold_events: 1,
542 },
543 )
544 .await
545 .unwrap();
546
547 journal
549 .append_batch(&[RuntimeEvent::RouteRegistered {
550 route_id: "old".to_string(),
551 }])
552 .await
553 .unwrap();
554 journal
555 .append_batch(&[RuntimeEvent::RouteRemoved {
556 route_id: "old".to_string(),
557 }])
558 .await
559 .unwrap();
560
561 journal
563 .append_batch(&[RuntimeEvent::RouteRegistered {
564 route_id: "live".to_string(),
565 }])
566 .await
567 .unwrap();
568
569 let loaded = journal.load_all().await.unwrap();
570 assert!(
571 !loaded.iter().any(
572 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
573 ),
574 "old route events must be compacted"
575 );
576 assert!(
577 loaded.iter().any(
578 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
579 ),
580 "live route events must survive compaction"
581 );
582 }
583
584 #[tokio::test]
585 async fn redb_journal_compaction_preserves_reregistered_route() {
586 let dir = tempdir().unwrap();
587 let journal = RedbRuntimeEventJournal::new(
588 dir.path().join("rereg.db"),
589 RedbJournalOptions {
590 durability: JournalDurability::Eventual,
591 compaction_threshold_events: 1,
592 },
593 )
594 .await
595 .unwrap();
596
597 journal
598 .append_batch(&[RuntimeEvent::RouteRegistered {
599 route_id: "rereg".to_string(),
600 }])
601 .await
602 .unwrap();
603 journal
604 .append_batch(&[RuntimeEvent::RouteRemoved {
605 route_id: "rereg".to_string(),
606 }])
607 .await
608 .unwrap();
609 journal
610 .append_batch(&[RuntimeEvent::RouteRegistered {
611 route_id: "rereg".to_string(),
612 }])
613 .await
614 .unwrap();
615
616 let loaded = journal.load_all().await.unwrap();
617 let rereg_count = loaded
618 .iter()
619 .filter(
620 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
621 )
622 .count();
623 assert_eq!(
624 rereg_count, 1,
625 "re-registered route must have exactly one event after compaction"
626 );
627 }
628
629 #[tokio::test]
630 async fn redb_journal_durability_eventual() {
631 let dir = tempdir().unwrap();
632 let journal = RedbRuntimeEventJournal::new(
633 dir.path().join("eventual.db"),
634 RedbJournalOptions {
635 durability: JournalDurability::Eventual,
636 compaction_threshold_events: 10_000,
637 },
638 )
639 .await
640 .unwrap();
641
642 journal
643 .append_batch(&[RuntimeEvent::RouteRegistered {
644 route_id: "ev".to_string(),
645 }])
646 .await
647 .unwrap();
648 let loaded = journal.load_all().await.unwrap();
649 assert_eq!(loaded.len(), 1);
650 }
651
652 #[tokio::test]
653 async fn redb_journal_clone_shares_db() {
654 let dir = tempdir().unwrap();
655 let j1 = new_journal(&dir).await;
656 let j2 = j1.clone();
657
658 j1.append_batch(&[RuntimeEvent::RouteRegistered {
659 route_id: "shared".to_string(),
660 }])
661 .await
662 .unwrap();
663
664 let loaded = j2.load_all().await.unwrap();
666 assert_eq!(loaded.len(), 1);
667 }
668}