camel_core/lifecycle/adapters/
redb_journal.rs1use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{
18 Database, Durability, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition,
19};
20use serde::{Deserialize, Serialize};
21
22use camel_api::CamelError;
23
24use crate::lifecycle::domain::{DomainError, RuntimeEvent};
25use crate::lifecycle::ports::RuntimeEventJournalPort;
26
27const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
30const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
31
32#[derive(Debug, Clone, PartialEq, Default)]
36pub enum JournalDurability {
37 #[default]
39 Immediate,
40 Eventual,
42}
43
44#[derive(Debug, Clone)]
46pub struct RedbJournalOptions {
47 pub durability: JournalDurability,
48 pub compaction_threshold_events: u64,
50}
51
52impl Default for RedbJournalOptions {
53 fn default() -> Self {
54 Self {
55 durability: JournalDurability::Immediate,
56 compaction_threshold_events: 10_000,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JournalEntry {
64 pub seq: u64,
65 pub timestamp_ms: i64,
66 pub event: RuntimeEvent,
67}
68
69pub struct JournalInspectFilter {
71 pub route_id: Option<String>,
72 pub limit: usize,
73}
74
75#[derive(Clone)]
82pub struct RedbRuntimeEventJournal {
83 db: Arc<Database>,
84 options: RedbJournalOptions,
85}
86
87impl RedbRuntimeEventJournal {
88 pub async fn new(
94 path: impl Into<PathBuf>,
95 options: RedbJournalOptions,
96 ) -> Result<Self, CamelError> {
97 let path = path.into();
98 let db = tokio::task::spawn_blocking(move || {
99 if let Some(parent) = path.parent() {
100 std::fs::create_dir_all(parent).map_err(|e| {
101 CamelError::Io(format!(
102 "failed to create journal directory '{}': {e}",
103 parent.display()
104 ))
105 })?;
106 }
107 let db = Database::create(&path).map_err(|e| {
108 CamelError::Io(format!(
109 "failed to open journal at '{}': {e}",
110 path.display()
111 ))
112 })?;
113 let tx = db
115 .begin_write()
116 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
117 tx.open_table(EVENTS_TABLE)
118 .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
119 tx.open_table(COMMAND_IDS_TABLE)
120 .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
121 tx.commit()
122 .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
123 Ok::<_, CamelError>(db)
124 })
125 .await
126 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
127
128 Ok(Self {
129 db: Arc::new(db),
130 options,
131 })
132 }
133
134 pub async fn inspect(
139 path: impl Into<PathBuf>,
140 filter: JournalInspectFilter,
141 ) -> Result<Vec<JournalEntry>, CamelError> {
142 let path = path.into();
143 let limit = filter.limit;
144 let route_id = filter.route_id;
145 tokio::task::spawn_blocking(move || {
146 if !path.exists() {
147 return Err(CamelError::Io(format!(
148 "journal file not found: {}",
149 path.display()
150 )));
151 }
152 let db = Database::open(&path)
153 .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
154 let tx = db
155 .begin_read()
156 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
157 let table = tx
158 .open_table(EVENTS_TABLE)
159 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
160
161 let mut entries: Vec<JournalEntry> = Vec::new();
166 for result in table
167 .iter()
168 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
169 .rev()
170 {
171 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
172 let entry: JournalEntry = serde_json::from_slice(v.value())
173 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
174 if let Some(ref rid) = route_id
175 && entry.event.route_id() != rid.as_str()
176 {
177 continue;
178 }
179 if entries.len() >= limit {
180 break;
181 }
182 entries.push(entry);
183 }
184 Ok(entries)
185 })
186 .await
187 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
188 }
189
190 fn redb_durability(&self) -> Durability {
193 match self.options.durability {
194 JournalDurability::Immediate => Durability::Immediate,
195 JournalDurability::Eventual => Durability::None,
196 }
197 }
198
199 fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
202 match table
203 .iter()
204 .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
205 .next_back()
206 {
207 Some(Ok((k, _))) => Ok(k.value() + 1),
208 Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
209 None => Ok(0),
210 }
211 }
212
213 fn event_count(&self) -> Result<u64, CamelError> {
215 let tx = self
216 .db
217 .begin_read()
218 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
219 let table = tx
220 .open_table(EVENTS_TABLE)
221 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
222 table
223 .len()
224 .map_err(|e| CamelError::Io(format!("redb len: {e}")))
225 }
226
227 fn compact(&self) -> Result<(), CamelError> {
229 let tx = self
230 .db
231 .begin_write()
232 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
233 {
234 let mut table = tx
235 .open_table(EVENTS_TABLE)
236 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
237
238 let mut last_removed_seq: std::collections::HashMap<String, u64> =
240 std::collections::HashMap::new();
241 for result in table
242 .iter()
243 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
244 {
245 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
246 let seq = k.value();
247 let entry: JournalEntry = serde_json::from_slice(v.value())
248 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
249 if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
250 last_removed_seq.insert(entry.event.route_id().to_string(), seq);
251 }
252 }
253
254 if last_removed_seq.is_empty() {
255 drop(table);
256 tx.commit()
257 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
258 return Ok(());
259 }
260
261 let mut to_delete: Vec<u64> = Vec::new();
263 for result in table
264 .iter()
265 .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
266 {
267 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
268 let seq = k.value();
269 let entry: JournalEntry = serde_json::from_slice(v.value())
270 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
271 let route_id = entry.event.route_id().to_string();
272 if let Some(&cutoff) = last_removed_seq.get(&route_id)
273 && seq <= cutoff
274 {
275 to_delete.push(seq);
276 }
277 }
278
279 for seq in to_delete {
280 table
281 .remove(&seq)
282 .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
283 }
284 }
285 tx.commit()
286 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
287 Ok(())
288 }
289}
290
291trait RuntimeEventExt {
295 fn route_id(&self) -> &str;
296}
297
298impl RuntimeEventExt for RuntimeEvent {
299 fn route_id(&self) -> &str {
300 match self {
301 RuntimeEvent::RouteRegistered { route_id }
302 | RuntimeEvent::RouteStartRequested { route_id }
303 | RuntimeEvent::RouteStarted { route_id }
304 | RuntimeEvent::RouteFailed { route_id, .. }
305 | RuntimeEvent::RouteStopped { route_id }
306 | RuntimeEvent::RouteSuspended { route_id }
307 | RuntimeEvent::RouteResumed { route_id }
308 | RuntimeEvent::RouteReloaded { route_id }
309 | RuntimeEvent::RouteRemoved { route_id } => route_id,
310 }
311 }
312}
313
314#[async_trait]
317impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
318 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
319 if events.is_empty() {
320 return Ok(());
321 }
322 let db = Arc::clone(&self.db);
323 let durability = self.redb_durability();
324 let events = events.to_vec();
325 let now_ms = chrono::Utc::now().timestamp_millis();
326
327 tokio::task::spawn_blocking(move || {
328 let mut tx = db
330 .begin_write()
331 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
332 tx.set_durability(durability)
333 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
334 {
335 let mut table = tx
336 .open_table(EVENTS_TABLE)
337 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
338 let mut next_seq = Self::next_seq(&table)?;
339 for event in events {
340 let entry = JournalEntry {
341 seq: next_seq,
342 timestamp_ms: now_ms,
343 event,
344 };
345 let bytes = serde_json::to_vec(&entry)
346 .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
347 table
348 .insert(&next_seq, bytes.as_slice())
349 .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
350 next_seq += 1;
351 }
352 }
353 tx.commit()
354 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
355 Ok::<_, CamelError>(())
356 })
357 .await
358 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
359 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
360
361 let journal_clone = self.clone();
364 let threshold = self.options.compaction_threshold_events;
365 tokio::task::spawn_blocking(move || match journal_clone.event_count() {
366 Ok(count) if count >= threshold => {
367 if let Err(e) = journal_clone.compact() {
368 tracing::warn!("journal compaction failed (non-fatal): {e}");
369 }
370 }
371 Ok(_) => {}
372 Err(e) => {
373 tracing::warn!("journal event count check failed (non-fatal): {e}");
374 }
375 })
376 .await
377 .ok(); Ok(())
380 }
381
382 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
383 let db = Arc::clone(&self.db);
384 tokio::task::spawn_blocking(move || {
385 let tx = db
386 .begin_read()
387 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
388 let table = tx
389 .open_table(EVENTS_TABLE)
390 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
391 let mut events = Vec::new();
392 for result in table
393 .iter()
394 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
395 {
396 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
397 let entry: JournalEntry = serde_json::from_slice(v.value())
398 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
399 events.push(entry.event);
400 }
401 Ok(events)
402 })
403 .await
404 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
405 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
406 }
407
408 async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
409 let db = Arc::clone(&self.db);
410 let durability = self.redb_durability();
411 let id = command_id.to_string();
412 tokio::task::spawn_blocking(move || {
413 let mut tx = db
415 .begin_write()
416 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
417 tx.set_durability(durability)
418 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
419 {
420 let mut table = tx
421 .open_table(COMMAND_IDS_TABLE)
422 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
423 table
424 .insert(id.as_str(), ())
425 .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
426 }
427 tx.commit()
428 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
429 Ok::<_, CamelError>(())
430 })
431 .await
432 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
433 .map_err(|e| DomainError::InvalidState(e.to_string()))
434 }
435
436 async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
437 let db = Arc::clone(&self.db);
438 let durability = self.redb_durability();
439 let id = command_id.to_string();
440 tokio::task::spawn_blocking(move || {
441 let mut tx = db
443 .begin_write()
444 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
445 tx.set_durability(durability)
446 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
447 {
448 let mut table = tx
449 .open_table(COMMAND_IDS_TABLE)
450 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
451 table
452 .remove(id.as_str())
453 .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
454 }
455 tx.commit()
456 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
457 Ok::<_, CamelError>(())
458 })
459 .await
460 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
461 .map_err(|e| DomainError::InvalidState(e.to_string()))
462 }
463
464 async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
465 let db = Arc::clone(&self.db);
466 tokio::task::spawn_blocking(move || {
467 let tx = db
468 .begin_read()
469 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
470 let table = tx
471 .open_table(COMMAND_IDS_TABLE)
472 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
473 let mut ids = Vec::new();
474 for result in table
475 .iter()
476 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
477 {
478 let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
479 ids.push(k.value().to_string());
480 }
481 Ok(ids)
482 })
483 .await
484 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
485 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
486 }
487}
488
489#[cfg(test)]
492mod tests {
493 use super::*;
494 use tempfile::tempdir;
495
496 async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
497 RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
498 .await
499 .unwrap()
500 }
501
502 #[tokio::test]
503 async fn redb_journal_roundtrip() {
504 let dir = tempdir().unwrap();
505 let journal = new_journal(&dir).await;
506
507 let events = vec![
508 RuntimeEvent::RouteRegistered {
509 route_id: "r1".to_string(),
510 },
511 RuntimeEvent::RouteStarted {
512 route_id: "r1".to_string(),
513 },
514 ];
515 journal.append_batch(&events).await.unwrap();
516
517 let loaded = journal.load_all().await.unwrap();
518 assert_eq!(loaded, events);
519 }
520
521 #[tokio::test]
522 async fn redb_journal_command_id_lifecycle() {
523 let dir = tempdir().unwrap();
524 let journal = new_journal(&dir).await;
525
526 journal.append_command_id("c1").await.unwrap();
527 journal.append_command_id("c2").await.unwrap();
528 journal.remove_command_id("c1").await.unwrap();
529
530 let ids = journal.load_command_ids().await.unwrap();
531 assert_eq!(ids, vec!["c2".to_string()]);
532 }
533
534 #[tokio::test]
535 async fn redb_journal_compaction_removes_completed_routes() {
536 let dir = tempdir().unwrap();
537 let journal = RedbRuntimeEventJournal::new(
539 dir.path().join("compact.db"),
540 RedbJournalOptions {
541 durability: JournalDurability::Eventual,
542 compaction_threshold_events: 1,
543 },
544 )
545 .await
546 .unwrap();
547
548 journal
550 .append_batch(&[RuntimeEvent::RouteRegistered {
551 route_id: "old".to_string(),
552 }])
553 .await
554 .unwrap();
555 journal
556 .append_batch(&[RuntimeEvent::RouteRemoved {
557 route_id: "old".to_string(),
558 }])
559 .await
560 .unwrap();
561
562 journal
564 .append_batch(&[RuntimeEvent::RouteRegistered {
565 route_id: "live".to_string(),
566 }])
567 .await
568 .unwrap();
569
570 let loaded = journal.load_all().await.unwrap();
571 assert!(
572 !loaded.iter().any(
573 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
574 ),
575 "old route events must be compacted"
576 );
577 assert!(
578 loaded.iter().any(
579 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
580 ),
581 "live route events must survive compaction"
582 );
583 }
584
585 #[tokio::test]
586 async fn redb_journal_compaction_preserves_reregistered_route() {
587 let dir = tempdir().unwrap();
588 let journal = RedbRuntimeEventJournal::new(
589 dir.path().join("rereg.db"),
590 RedbJournalOptions {
591 durability: JournalDurability::Eventual,
592 compaction_threshold_events: 1,
593 },
594 )
595 .await
596 .unwrap();
597
598 journal
599 .append_batch(&[RuntimeEvent::RouteRegistered {
600 route_id: "rereg".to_string(),
601 }])
602 .await
603 .unwrap();
604 journal
605 .append_batch(&[RuntimeEvent::RouteRemoved {
606 route_id: "rereg".to_string(),
607 }])
608 .await
609 .unwrap();
610 journal
611 .append_batch(&[RuntimeEvent::RouteRegistered {
612 route_id: "rereg".to_string(),
613 }])
614 .await
615 .unwrap();
616
617 let loaded = journal.load_all().await.unwrap();
618 let rereg_count = loaded
619 .iter()
620 .filter(
621 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
622 )
623 .count();
624 assert_eq!(
625 rereg_count, 1,
626 "re-registered route must have exactly one event after compaction"
627 );
628 }
629
630 #[tokio::test]
631 async fn redb_journal_durability_eventual() {
632 let dir = tempdir().unwrap();
633 let journal = RedbRuntimeEventJournal::new(
634 dir.path().join("eventual.db"),
635 RedbJournalOptions {
636 durability: JournalDurability::Eventual,
637 compaction_threshold_events: 10_000,
638 },
639 )
640 .await
641 .unwrap();
642
643 journal
644 .append_batch(&[RuntimeEvent::RouteRegistered {
645 route_id: "ev".to_string(),
646 }])
647 .await
648 .unwrap();
649 let loaded = journal.load_all().await.unwrap();
650 assert_eq!(loaded.len(), 1);
651 }
652
653 #[tokio::test]
654 async fn redb_journal_clone_shares_db() {
655 let dir = tempdir().unwrap();
656 let j1 = new_journal(&dir).await;
657 let j2 = j1.clone();
658
659 j1.append_batch(&[RuntimeEvent::RouteRegistered {
660 route_id: "shared".to_string(),
661 }])
662 .await
663 .unwrap();
664
665 let loaded = j2.load_all().await.unwrap();
667 assert_eq!(loaded.len(), 1);
668 }
669}