camel_core/lifecycle/adapters/
redb_journal.rs1use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
18use serde::{Deserialize, Serialize};
19
20use camel_api::CamelError;
21
22use crate::lifecycle::domain::RuntimeEvent;
23use crate::lifecycle::ports::RuntimeEventJournalPort;
24
25const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
28const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
29
30#[derive(Debug, Clone, PartialEq, Default)]
34pub enum JournalDurability {
35 #[default]
37 Immediate,
38 Eventual,
40}
41
42#[derive(Debug, Clone)]
44pub struct RedbJournalOptions {
45 pub durability: JournalDurability,
46 pub compaction_threshold_events: u64,
48}
49
50impl Default for RedbJournalOptions {
51 fn default() -> Self {
52 Self {
53 durability: JournalDurability::Immediate,
54 compaction_threshold_events: 10_000,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct JournalEntry {
62 pub seq: u64,
63 pub timestamp_ms: i64,
64 pub event: RuntimeEvent,
65}
66
67pub struct JournalInspectFilter {
69 pub route_id: Option<String>,
70 pub limit: usize,
71}
72
73#[derive(Clone)]
80pub struct RedbRuntimeEventJournal {
81 db: Arc<Database>,
82 options: RedbJournalOptions,
83}
84
85impl RedbRuntimeEventJournal {
86 pub async fn new(
92 path: impl Into<PathBuf>,
93 options: RedbJournalOptions,
94 ) -> Result<Self, CamelError> {
95 let path = path.into();
96 let db = tokio::task::spawn_blocking(move || {
97 if let Some(parent) = path.parent() {
98 std::fs::create_dir_all(parent).map_err(|e| {
99 CamelError::Io(format!(
100 "failed to create journal directory '{}': {e}",
101 parent.display()
102 ))
103 })?;
104 }
105 let db = Database::create(&path).map_err(|e| {
106 CamelError::Io(format!(
107 "failed to open journal at '{}': {e}",
108 path.display()
109 ))
110 })?;
111 let tx = db
113 .begin_write()
114 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
115 tx.open_table(EVENTS_TABLE)
116 .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
117 tx.open_table(COMMAND_IDS_TABLE)
118 .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
119 tx.commit()
120 .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
121 Ok::<_, CamelError>(db)
122 })
123 .await
124 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
125
126 Ok(Self {
127 db: Arc::new(db),
128 options,
129 })
130 }
131
132 pub async fn inspect(
137 path: impl Into<PathBuf>,
138 filter: JournalInspectFilter,
139 ) -> Result<Vec<JournalEntry>, CamelError> {
140 let path = path.into();
141 let limit = filter.limit;
142 let route_id = filter.route_id;
143 tokio::task::spawn_blocking(move || {
144 if !path.exists() {
145 return Err(CamelError::Io(format!(
146 "journal file not found: {}",
147 path.display()
148 )));
149 }
150 let db = Database::open(&path)
151 .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
152 let tx = db
153 .begin_read()
154 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
155 let table = tx
156 .open_table(EVENTS_TABLE)
157 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
158
159 let mut entries: Vec<JournalEntry> = Vec::new();
164 for result in table
165 .iter()
166 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
167 .rev()
168 {
169 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
170 let entry: JournalEntry = serde_json::from_slice(v.value())
171 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
172 if let Some(ref rid) = route_id
173 && entry.event.route_id() != rid.as_str()
174 {
175 continue;
176 }
177 if entries.len() >= limit {
178 break;
179 }
180 entries.push(entry);
181 }
182 Ok(entries)
183 })
184 .await
185 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
186 }
187
188 fn redb_durability(&self) -> redb::Durability {
191 match self.options.durability {
192 JournalDurability::Immediate => redb::Durability::Immediate,
193 JournalDurability::Eventual => redb::Durability::Eventual,
194 }
195 }
196
197 fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
200 match table
201 .iter()
202 .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
203 .next_back()
204 {
205 Some(Ok((k, _))) => Ok(k.value() + 1),
206 Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
207 None => Ok(0),
208 }
209 }
210
211 fn event_count(&self) -> Result<u64, CamelError> {
213 let tx = self
214 .db
215 .begin_read()
216 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
217 let table = tx
218 .open_table(EVENTS_TABLE)
219 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
220 table
221 .len()
222 .map_err(|e| CamelError::Io(format!("redb len: {e}")))
223 }
224
225 fn compact(&self) -> Result<(), CamelError> {
227 let tx = self
228 .db
229 .begin_write()
230 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
231 {
232 let mut table = tx
233 .open_table(EVENTS_TABLE)
234 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
235
236 let mut last_removed_seq: std::collections::HashMap<String, u64> =
238 std::collections::HashMap::new();
239 for result in table
240 .iter()
241 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
242 {
243 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
244 let seq = k.value();
245 let entry: JournalEntry = serde_json::from_slice(v.value())
246 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
247 if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
248 last_removed_seq.insert(entry.event.route_id().to_string(), seq);
249 }
250 }
251
252 if last_removed_seq.is_empty() {
253 drop(table);
254 tx.commit()
255 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
256 return Ok(());
257 }
258
259 let mut to_delete: Vec<u64> = Vec::new();
261 for result in table
262 .iter()
263 .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
264 {
265 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
266 let seq = k.value();
267 let entry: JournalEntry = serde_json::from_slice(v.value())
268 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
269 let route_id = entry.event.route_id().to_string();
270 if let Some(&cutoff) = last_removed_seq.get(&route_id)
271 && seq <= cutoff
272 {
273 to_delete.push(seq);
274 }
275 }
276
277 for seq in to_delete {
278 table
279 .remove(&seq)
280 .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
281 }
282 }
283 tx.commit()
284 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
285 Ok(())
286 }
287}
288
289trait RuntimeEventExt {
293 fn route_id(&self) -> &str;
294}
295
296impl RuntimeEventExt for RuntimeEvent {
297 fn route_id(&self) -> &str {
298 match self {
299 RuntimeEvent::RouteRegistered { route_id }
300 | RuntimeEvent::RouteStartRequested { route_id }
301 | RuntimeEvent::RouteStarted { route_id }
302 | RuntimeEvent::RouteFailed { route_id, .. }
303 | RuntimeEvent::RouteStopped { route_id }
304 | RuntimeEvent::RouteSuspended { route_id }
305 | RuntimeEvent::RouteResumed { route_id }
306 | RuntimeEvent::RouteReloaded { route_id }
307 | RuntimeEvent::RouteRemoved { route_id } => route_id,
308 }
309 }
310}
311
312#[async_trait]
315impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
316 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
317 if events.is_empty() {
318 return Ok(());
319 }
320 let db = Arc::clone(&self.db);
321 let durability = self.redb_durability();
322 let events = events.to_vec();
323 let now_ms = chrono::Utc::now().timestamp_millis();
324
325 tokio::task::spawn_blocking(move || {
326 let mut tx = db
328 .begin_write()
329 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
330 tx.set_durability(durability);
331 {
332 let mut table = tx
333 .open_table(EVENTS_TABLE)
334 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
335 let mut next_seq = Self::next_seq(&table)?;
336 for event in events {
337 let entry = JournalEntry {
338 seq: next_seq,
339 timestamp_ms: now_ms,
340 event,
341 };
342 let bytes = serde_json::to_vec(&entry)
343 .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
344 table
345 .insert(&next_seq, bytes.as_slice())
346 .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
347 next_seq += 1;
348 }
349 }
350 tx.commit()
351 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
352 Ok::<_, CamelError>(())
353 })
354 .await
355 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
356
357 let journal_clone = self.clone();
360 let threshold = self.options.compaction_threshold_events;
361 tokio::task::spawn_blocking(move || match journal_clone.event_count() {
362 Ok(count) if count >= threshold => {
363 if let Err(e) = journal_clone.compact() {
364 tracing::warn!("journal compaction failed (non-fatal): {e}");
365 }
366 }
367 Ok(_) => {}
368 Err(e) => {
369 tracing::warn!("journal event count check failed (non-fatal): {e}");
370 }
371 })
372 .await
373 .ok(); Ok(())
376 }
377
378 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
379 let db = Arc::clone(&self.db);
380 tokio::task::spawn_blocking(move || {
381 let tx = db
382 .begin_read()
383 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
384 let table = tx
385 .open_table(EVENTS_TABLE)
386 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
387 let mut events = Vec::new();
388 for result in table
389 .iter()
390 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
391 {
392 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
393 let entry: JournalEntry = serde_json::from_slice(v.value())
394 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
395 events.push(entry.event);
396 }
397 Ok(events)
398 })
399 .await
400 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
401 }
402
403 async fn append_command_id(&self, command_id: &str) -> Result<(), CamelError> {
404 let db = Arc::clone(&self.db);
405 let durability = self.redb_durability();
406 let id = command_id.to_string();
407 tokio::task::spawn_blocking(move || {
408 let mut tx = db
410 .begin_write()
411 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
412 tx.set_durability(durability);
413 {
414 let mut table = tx
415 .open_table(COMMAND_IDS_TABLE)
416 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
417 table
418 .insert(id.as_str(), ())
419 .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
420 }
421 tx.commit()
422 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
423 Ok::<_, CamelError>(())
424 })
425 .await
426 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
427 }
428
429 async fn remove_command_id(&self, command_id: &str) -> Result<(), CamelError> {
430 let db = Arc::clone(&self.db);
431 let durability = self.redb_durability();
432 let id = command_id.to_string();
433 tokio::task::spawn_blocking(move || {
434 let mut tx = db
436 .begin_write()
437 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
438 tx.set_durability(durability);
439 {
440 let mut table = tx
441 .open_table(COMMAND_IDS_TABLE)
442 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
443 table
444 .remove(id.as_str())
445 .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
446 }
447 tx.commit()
448 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
449 Ok::<_, CamelError>(())
450 })
451 .await
452 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
453 }
454
455 async fn load_command_ids(&self) -> Result<Vec<String>, CamelError> {
456 let db = Arc::clone(&self.db);
457 tokio::task::spawn_blocking(move || {
458 let tx = db
459 .begin_read()
460 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
461 let table = tx
462 .open_table(COMMAND_IDS_TABLE)
463 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
464 let mut ids = Vec::new();
465 for result in table
466 .iter()
467 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
468 {
469 let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
470 ids.push(k.value().to_string());
471 }
472 Ok(ids)
473 })
474 .await
475 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
476 }
477}
478
479#[cfg(test)]
482mod tests {
483 use super::*;
484 use tempfile::tempdir;
485
486 async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
487 RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
488 .await
489 .unwrap()
490 }
491
492 #[tokio::test]
493 async fn redb_journal_roundtrip() {
494 let dir = tempdir().unwrap();
495 let journal = new_journal(&dir).await;
496
497 let events = vec![
498 RuntimeEvent::RouteRegistered {
499 route_id: "r1".to_string(),
500 },
501 RuntimeEvent::RouteStarted {
502 route_id: "r1".to_string(),
503 },
504 ];
505 journal.append_batch(&events).await.unwrap();
506
507 let loaded = journal.load_all().await.unwrap();
508 assert_eq!(loaded, events);
509 }
510
511 #[tokio::test]
512 async fn redb_journal_command_id_lifecycle() {
513 let dir = tempdir().unwrap();
514 let journal = new_journal(&dir).await;
515
516 journal.append_command_id("c1").await.unwrap();
517 journal.append_command_id("c2").await.unwrap();
518 journal.remove_command_id("c1").await.unwrap();
519
520 let ids = journal.load_command_ids().await.unwrap();
521 assert_eq!(ids, vec!["c2".to_string()]);
522 }
523
524 #[tokio::test]
525 async fn redb_journal_compaction_removes_completed_routes() {
526 let dir = tempdir().unwrap();
527 let journal = RedbRuntimeEventJournal::new(
529 dir.path().join("compact.db"),
530 RedbJournalOptions {
531 durability: JournalDurability::Eventual,
532 compaction_threshold_events: 1,
533 },
534 )
535 .await
536 .unwrap();
537
538 journal
540 .append_batch(&[RuntimeEvent::RouteRegistered {
541 route_id: "old".to_string(),
542 }])
543 .await
544 .unwrap();
545 journal
546 .append_batch(&[RuntimeEvent::RouteRemoved {
547 route_id: "old".to_string(),
548 }])
549 .await
550 .unwrap();
551
552 journal
554 .append_batch(&[RuntimeEvent::RouteRegistered {
555 route_id: "live".to_string(),
556 }])
557 .await
558 .unwrap();
559
560 let loaded = journal.load_all().await.unwrap();
561 assert!(
562 !loaded.iter().any(
563 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
564 ),
565 "old route events must be compacted"
566 );
567 assert!(
568 loaded.iter().any(
569 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
570 ),
571 "live route events must survive compaction"
572 );
573 }
574
575 #[tokio::test]
576 async fn redb_journal_compaction_preserves_reregistered_route() {
577 let dir = tempdir().unwrap();
578 let journal = RedbRuntimeEventJournal::new(
579 dir.path().join("rereg.db"),
580 RedbJournalOptions {
581 durability: JournalDurability::Eventual,
582 compaction_threshold_events: 1,
583 },
584 )
585 .await
586 .unwrap();
587
588 journal
589 .append_batch(&[RuntimeEvent::RouteRegistered {
590 route_id: "rereg".to_string(),
591 }])
592 .await
593 .unwrap();
594 journal
595 .append_batch(&[RuntimeEvent::RouteRemoved {
596 route_id: "rereg".to_string(),
597 }])
598 .await
599 .unwrap();
600 journal
601 .append_batch(&[RuntimeEvent::RouteRegistered {
602 route_id: "rereg".to_string(),
603 }])
604 .await
605 .unwrap();
606
607 let loaded = journal.load_all().await.unwrap();
608 let rereg_count = loaded
609 .iter()
610 .filter(
611 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
612 )
613 .count();
614 assert_eq!(
615 rereg_count, 1,
616 "re-registered route must have exactly one event after compaction"
617 );
618 }
619
620 #[tokio::test]
621 async fn redb_journal_durability_eventual() {
622 let dir = tempdir().unwrap();
623 let journal = RedbRuntimeEventJournal::new(
624 dir.path().join("eventual.db"),
625 RedbJournalOptions {
626 durability: JournalDurability::Eventual,
627 compaction_threshold_events: 10_000,
628 },
629 )
630 .await
631 .unwrap();
632
633 journal
634 .append_batch(&[RuntimeEvent::RouteRegistered {
635 route_id: "ev".to_string(),
636 }])
637 .await
638 .unwrap();
639 let loaded = journal.load_all().await.unwrap();
640 assert_eq!(loaded.len(), 1);
641 }
642
643 #[tokio::test]
644 async fn redb_journal_clone_shares_db() {
645 let dir = tempdir().unwrap();
646 let j1 = new_journal(&dir).await;
647 let j2 = j1.clone();
648
649 j1.append_batch(&[RuntimeEvent::RouteRegistered {
650 route_id: "shared".to_string(),
651 }])
652 .await
653 .unwrap();
654
655 let loaded = j2.load_all().await.unwrap();
657 assert_eq!(loaded.len(), 1);
658 }
659}