1use std::path::PathBuf;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use redb::{
18 Database, Durability, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition,
19};
20use serde::{Deserialize, Serialize};
21
22use camel_api::CamelError;
23
24use crate::lifecycle::domain::{DomainError, RuntimeEvent};
25use crate::lifecycle::ports::RuntimeEventJournalPort;
26
27const EVENTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("events");
30const COMMAND_IDS_TABLE: TableDefinition<&str, ()> = TableDefinition::new("command_ids");
31
32#[derive(Debug, Clone, PartialEq, Default)]
36pub enum JournalDurability {
37 #[default]
39 Immediate,
40 Eventual,
42}
43
44#[derive(Debug, Clone)]
46pub struct RedbJournalOptions {
47 pub durability: JournalDurability,
48 pub compaction_threshold_events: u64,
50}
51
52impl Default for RedbJournalOptions {
53 fn default() -> Self {
54 Self {
55 durability: JournalDurability::Immediate,
56 compaction_threshold_events: 10_000,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JournalEntry {
64 pub seq: u64,
65 pub timestamp_ms: i64,
66 pub event: RuntimeEvent,
67}
68
69pub struct JournalInspectFilter {
71 pub route_id: Option<String>,
72 pub limit: usize,
73}
74
75#[derive(Clone)]
82pub struct RedbRuntimeEventJournal {
83 db: Arc<Database>,
84 options: RedbJournalOptions,
85}
86
87impl RedbRuntimeEventJournal {
88 pub async fn new(
94 path: impl Into<PathBuf>,
95 options: RedbJournalOptions,
96 ) -> Result<Self, CamelError> {
97 let path = path.into();
98 let db = tokio::task::spawn_blocking(move || {
99 if let Some(parent) = path.parent() {
100 std::fs::create_dir_all(parent).map_err(|e| {
101 CamelError::Io(format!(
102 "failed to create journal directory '{}': {e}",
103 parent.display()
104 ))
105 })?;
106 }
107 let db = Database::create(&path).map_err(|e| {
108 CamelError::Io(format!(
109 "failed to open journal at '{}': {e}",
110 path.display()
111 ))
112 })?;
113 let tx = db
115 .begin_write()
116 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
117 tx.open_table(EVENTS_TABLE)
118 .map_err(|e| CamelError::Io(format!("redb open events table: {e}")))?;
119 tx.open_table(COMMAND_IDS_TABLE)
120 .map_err(|e| CamelError::Io(format!("redb open command_ids table: {e}")))?;
121 tx.commit()
122 .map_err(|e| CamelError::Io(format!("redb commit init: {e}")))?;
123 Ok::<_, CamelError>(db)
124 })
125 .await
126 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))??;
127
128 Ok(Self {
129 db: Arc::new(db),
130 options,
131 })
132 }
133
134 pub async fn inspect(
139 path: impl Into<PathBuf>,
140 filter: JournalInspectFilter,
141 ) -> Result<Vec<JournalEntry>, CamelError> {
142 let path = path.into();
143 let limit = filter.limit;
144 let route_id = filter.route_id;
145 tokio::task::spawn_blocking(move || {
146 if !path.exists() {
147 return Err(CamelError::Io(format!(
148 "journal file not found: {}",
149 path.display()
150 )));
151 }
152 let db = Database::open(&path)
153 .map_err(|e| CamelError::Io(format!("invalid journal file: {e}")))?;
154 let tx = db
155 .begin_read()
156 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
157 let table = tx
158 .open_table(EVENTS_TABLE)
159 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
160
161 let mut entries: Vec<JournalEntry> = Vec::new();
166 for result in table
167 .iter()
168 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
169 .rev()
170 {
171 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
172 let entry: JournalEntry = serde_json::from_slice(v.value())
173 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
174 if let Some(ref rid) = route_id
175 && entry.event.route_id() != rid.as_str()
176 {
177 continue;
178 }
179 if entries.len() >= limit {
180 break;
181 }
182 entries.push(entry);
183 }
184 Ok(entries)
185 })
186 .await
187 .map_err(|e| CamelError::Io(format!("spawn_blocking join: {e}")))?
188 }
189
190 fn redb_durability(&self) -> Durability {
193 match self.options.durability {
194 JournalDurability::Immediate => Durability::Immediate,
195 JournalDurability::Eventual => Durability::None,
196 }
197 }
198
199 fn next_seq(table: &redb::Table<u64, &[u8]>) -> Result<u64, CamelError> {
202 match table
203 .iter()
204 .map_err(|e| CamelError::Io(format!("redb iter for seq: {e}")))?
205 .next_back()
206 {
207 Some(Ok((k, _))) => Ok(k.value() + 1),
208 Some(Err(e)) => Err(CamelError::Io(format!("redb seq read: {e}"))),
209 None => Ok(0),
210 }
211 }
212
213 fn event_count(&self) -> Result<u64, CamelError> {
215 let tx = self
216 .db
217 .begin_read()
218 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
219 let table = tx
220 .open_table(EVENTS_TABLE)
221 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
222 table
223 .len()
224 .map_err(|e| CamelError::Io(format!("redb len: {e}")))
225 }
226
227 fn compact(&self) -> Result<(), CamelError> {
229 let tx = self
230 .db
231 .begin_write()
232 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
233 {
234 let mut table = tx
235 .open_table(EVENTS_TABLE)
236 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
237
238 let mut last_removed_seq: std::collections::HashMap<String, u64> =
240 std::collections::HashMap::new();
241 for result in table
242 .iter()
243 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
244 {
245 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
246 let seq = k.value();
247 let entry: JournalEntry = serde_json::from_slice(v.value())
248 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
249 if matches!(entry.event, RuntimeEvent::RouteRemoved { .. }) {
250 last_removed_seq.insert(entry.event.route_id().to_string(), seq);
251 }
252 }
253
254 if last_removed_seq.is_empty() {
255 drop(table);
256 tx.commit()
257 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
258 return Ok(());
259 }
260
261 let mut to_delete: Vec<u64> = Vec::new();
263 for result in table
264 .iter()
265 .map_err(|e| CamelError::Io(format!("redb iter pass2: {e}")))?
266 {
267 let (k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
268 let seq = k.value();
269 let entry: JournalEntry = serde_json::from_slice(v.value())
270 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
271 let route_id = entry.event.route_id().to_string();
272 if let Some(&cutoff) = last_removed_seq.get(&route_id)
273 && seq <= cutoff
274 {
275 to_delete.push(seq);
276 }
277 }
278
279 for seq in to_delete {
280 table
281 .remove(&seq)
282 .map_err(|e| CamelError::Io(format!("redb remove seq {seq}: {e}")))?;
283 }
284 }
285 tx.commit()
286 .map_err(|e| CamelError::Io(format!("redb commit compact: {e}")))?;
287 Ok(())
288 }
289}
290
291trait RuntimeEventExt {
295 fn route_id(&self) -> &str;
296}
297
298impl RuntimeEventExt for RuntimeEvent {
299 fn route_id(&self) -> &str {
300 match self {
301 RuntimeEvent::RouteRegistered { route_id }
302 | RuntimeEvent::RouteStartRequested { route_id }
303 | RuntimeEvent::RouteStarted { route_id }
304 | RuntimeEvent::RouteFailed { route_id, .. }
305 | RuntimeEvent::RouteStopped { route_id }
306 | RuntimeEvent::RouteSuspended { route_id }
307 | RuntimeEvent::RouteResumed { route_id }
308 | RuntimeEvent::RouteReloaded { route_id }
309 | RuntimeEvent::RouteRemoved { route_id } => route_id,
310 }
311 }
312}
313
314#[async_trait]
317impl RuntimeEventJournalPort for RedbRuntimeEventJournal {
318 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), DomainError> {
319 if events.is_empty() {
320 return Ok(());
321 }
322 let db = Arc::clone(&self.db);
323 let durability = self.redb_durability();
324 let events = events.to_vec();
325 let now_ms = chrono::Utc::now().timestamp_millis();
326
327 tokio::task::spawn_blocking(move || {
328 let mut tx = db
330 .begin_write()
331 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
332 tx.set_durability(durability)
333 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
334 {
335 let mut table = tx
336 .open_table(EVENTS_TABLE)
337 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
338 let start_seq = Self::next_seq(&table)?;
339 for (next_seq, event) in (start_seq..).zip(events) {
340 let entry = JournalEntry {
341 seq: next_seq,
342 timestamp_ms: now_ms,
343 event,
344 };
345 let bytes = serde_json::to_vec(&entry)
346 .map_err(|e| CamelError::Io(format!("journal serialize: {e}")))?;
347 table
348 .insert(&next_seq, bytes.as_slice())
349 .map_err(|e| CamelError::Io(format!("redb insert: {e}")))?;
350 }
351 }
352 tx.commit()
353 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
354 Ok::<_, CamelError>(())
355 })
356 .await
357 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
358 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
359
360 let journal_clone = self.clone();
363 let threshold = self.options.compaction_threshold_events;
364 tokio::task::spawn_blocking(move || match journal_clone.event_count() {
365 Ok(count) if count >= threshold => {
366 if let Err(e) = journal_clone.compact() {
367 tracing::warn!("journal compaction failed (non-fatal): {e}");
368 }
369 }
370 Ok(_) => {}
371 Err(e) => {
372 tracing::warn!("journal event count check failed (non-fatal): {e}");
373 }
374 })
375 .await
376 .ok(); Ok(())
379 }
380
381 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, DomainError> {
382 let db = Arc::clone(&self.db);
383 tokio::task::spawn_blocking(move || {
384 let tx = db
385 .begin_read()
386 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
387 let table = tx
388 .open_table(EVENTS_TABLE)
389 .map_err(|e| CamelError::Io(format!("redb open events: {e}")))?;
390 let mut events = Vec::new();
391 for result in table
392 .iter()
393 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
394 {
395 let (_k, v) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
396 let entry: JournalEntry = serde_json::from_slice(v.value())
397 .map_err(|e| CamelError::Io(format!("journal deserialize: {e}")))?;
398 events.push(entry.event);
399 }
400 Ok(events)
401 })
402 .await
403 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
404 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
405 }
406
407 async fn append_command_id(&self, command_id: &str) -> Result<(), DomainError> {
408 let db = Arc::clone(&self.db);
409 let durability = self.redb_durability();
410 let id = command_id.to_string();
411 tokio::task::spawn_blocking(move || {
412 let mut tx = db
414 .begin_write()
415 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
416 tx.set_durability(durability)
417 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
418 {
419 let mut table = tx
420 .open_table(COMMAND_IDS_TABLE)
421 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
422 table
423 .insert(id.as_str(), ())
424 .map_err(|e| CamelError::Io(format!("redb insert command_id: {e}")))?;
425 }
426 tx.commit()
427 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
428 Ok::<_, CamelError>(())
429 })
430 .await
431 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
432 .map_err(|e| DomainError::InvalidState(e.to_string()))
433 }
434
435 async fn remove_command_id(&self, command_id: &str) -> Result<(), DomainError> {
436 let db = Arc::clone(&self.db);
437 let durability = self.redb_durability();
438 let id = command_id.to_string();
439 tokio::task::spawn_blocking(move || {
440 let mut tx = db
442 .begin_write()
443 .map_err(|e| CamelError::Io(format!("redb begin_write: {e}")))?;
444 tx.set_durability(durability)
445 .map_err(|e| CamelError::Io(format!("redb set_durability: {e}")))?;
446 {
447 let mut table = tx
448 .open_table(COMMAND_IDS_TABLE)
449 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
450 table
451 .remove(id.as_str())
452 .map_err(|e| CamelError::Io(format!("redb remove command_id: {e}")))?;
453 }
454 tx.commit()
455 .map_err(|e| CamelError::Io(format!("redb commit: {e}")))?;
456 Ok::<_, CamelError>(())
457 })
458 .await
459 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
460 .map_err(|e| DomainError::InvalidState(e.to_string()))
461 }
462
463 async fn load_command_ids(&self) -> Result<Vec<String>, DomainError> {
464 let db = Arc::clone(&self.db);
465 tokio::task::spawn_blocking(move || {
466 let tx = db
467 .begin_read()
468 .map_err(|e| CamelError::Io(format!("redb begin_read: {e}")))?;
469 let table = tx
470 .open_table(COMMAND_IDS_TABLE)
471 .map_err(|e| CamelError::Io(format!("redb open command_ids: {e}")))?;
472 let mut ids = Vec::new();
473 for result in table
474 .iter()
475 .map_err(|e| CamelError::Io(format!("redb iter: {e}")))?
476 {
477 let (k, _) = result.map_err(|e| CamelError::Io(format!("redb read: {e}")))?;
478 ids.push(k.value().to_string());
479 }
480 Ok(ids)
481 })
482 .await
483 .map_err(|e| DomainError::InvalidState(format!("spawn_blocking join: {e}")))?
484 .map_err(|e: CamelError| DomainError::InvalidState(e.to_string()))
485 }
486}
487
488#[cfg(test)]
491mod tests {
492 use super::*;
493 use tempfile::tempdir;
494
495 async fn new_journal(dir: &tempfile::TempDir) -> RedbRuntimeEventJournal {
496 RedbRuntimeEventJournal::new(dir.path().join("test.db"), RedbJournalOptions::default())
497 .await
498 .unwrap()
499 }
500
501 #[tokio::test]
502 async fn redb_journal_roundtrip() {
503 let dir = tempdir().unwrap();
504 let journal = new_journal(&dir).await;
505
506 let events = vec![
507 RuntimeEvent::RouteRegistered {
508 route_id: "r1".to_string(),
509 },
510 RuntimeEvent::RouteStarted {
511 route_id: "r1".to_string(),
512 },
513 ];
514 journal.append_batch(&events).await.unwrap();
515
516 let loaded = journal.load_all().await.unwrap();
517 assert_eq!(loaded, events);
518 }
519
520 #[tokio::test]
521 async fn redb_journal_command_id_lifecycle() {
522 let dir = tempdir().unwrap();
523 let journal = new_journal(&dir).await;
524
525 journal.append_command_id("c1").await.unwrap();
526 journal.append_command_id("c2").await.unwrap();
527 journal.remove_command_id("c1").await.unwrap();
528
529 let ids = journal.load_command_ids().await.unwrap();
530 assert_eq!(ids, vec!["c2".to_string()]);
531 }
532
533 #[tokio::test]
534 async fn redb_journal_compaction_removes_completed_routes() {
535 let dir = tempdir().unwrap();
536 let journal = RedbRuntimeEventJournal::new(
538 dir.path().join("compact.db"),
539 RedbJournalOptions {
540 durability: JournalDurability::Eventual,
541 compaction_threshold_events: 1,
542 },
543 )
544 .await
545 .unwrap();
546
547 journal
549 .append_batch(&[RuntimeEvent::RouteRegistered {
550 route_id: "old".to_string(),
551 }])
552 .await
553 .unwrap();
554 journal
555 .append_batch(&[RuntimeEvent::RouteRemoved {
556 route_id: "old".to_string(),
557 }])
558 .await
559 .unwrap();
560
561 journal
563 .append_batch(&[RuntimeEvent::RouteRegistered {
564 route_id: "live".to_string(),
565 }])
566 .await
567 .unwrap();
568
569 let loaded = journal.load_all().await.unwrap();
570 assert!(
571 !loaded.iter().any(
572 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "old")
573 ),
574 "old route events must be compacted"
575 );
576 assert!(
577 loaded.iter().any(
578 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "live")
579 ),
580 "live route events must survive compaction"
581 );
582 }
583
584 #[tokio::test]
585 async fn redb_journal_compaction_preserves_reregistered_route() {
586 let dir = tempdir().unwrap();
587 let journal = RedbRuntimeEventJournal::new(
588 dir.path().join("rereg.db"),
589 RedbJournalOptions {
590 durability: JournalDurability::Eventual,
591 compaction_threshold_events: 1,
592 },
593 )
594 .await
595 .unwrap();
596
597 journal
598 .append_batch(&[RuntimeEvent::RouteRegistered {
599 route_id: "rereg".to_string(),
600 }])
601 .await
602 .unwrap();
603 journal
604 .append_batch(&[RuntimeEvent::RouteRemoved {
605 route_id: "rereg".to_string(),
606 }])
607 .await
608 .unwrap();
609 journal
610 .append_batch(&[RuntimeEvent::RouteRegistered {
611 route_id: "rereg".to_string(),
612 }])
613 .await
614 .unwrap();
615
616 let loaded = journal.load_all().await.unwrap();
617 let rereg_count = loaded
618 .iter()
619 .filter(
620 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "rereg"),
621 )
622 .count();
623 assert_eq!(
624 rereg_count, 1,
625 "re-registered route must have exactly one event after compaction"
626 );
627 }
628
629 #[tokio::test]
630 async fn redb_journal_durability_eventual() {
631 let dir = tempdir().unwrap();
632 let journal = RedbRuntimeEventJournal::new(
633 dir.path().join("eventual.db"),
634 RedbJournalOptions {
635 durability: JournalDurability::Eventual,
636 compaction_threshold_events: 10_000,
637 },
638 )
639 .await
640 .unwrap();
641
642 journal
643 .append_batch(&[RuntimeEvent::RouteRegistered {
644 route_id: "ev".to_string(),
645 }])
646 .await
647 .unwrap();
648 let loaded = journal.load_all().await.unwrap();
649 assert_eq!(loaded.len(), 1);
650 }
651
652 #[tokio::test]
653 async fn redb_journal_clone_shares_db() {
654 let dir = tempdir().unwrap();
655 let j1 = new_journal(&dir).await;
656 let j2 = j1.clone();
657
658 j1.append_batch(&[RuntimeEvent::RouteRegistered {
659 route_id: "shared".to_string(),
660 }])
661 .await
662 .unwrap();
663
664 let loaded = j2.load_all().await.unwrap();
666 assert_eq!(loaded.len(), 1);
667 }
668
669 #[tokio::test]
670 async fn redb_journal_append_empty_batch_is_noop() {
671 let dir = tempdir().unwrap();
672 let journal = new_journal(&dir).await;
673
674 journal.append_batch(&[]).await.unwrap();
675 let loaded = journal.load_all().await.unwrap();
676 assert!(loaded.is_empty());
677 }
678
679 #[tokio::test]
680 async fn redb_journal_sequence_numbers_across_batches() {
681 let dir = tempdir().unwrap();
682 let journal = new_journal(&dir).await;
683
684 journal
685 .append_batch(&[
686 RuntimeEvent::RouteRegistered {
687 route_id: "r1".to_string(),
688 },
689 RuntimeEvent::RouteStarted {
690 route_id: "r1".to_string(),
691 },
692 ])
693 .await
694 .unwrap();
695
696 journal
697 .append_batch(&[RuntimeEvent::RouteStopped {
698 route_id: "r1".to_string(),
699 }])
700 .await
701 .unwrap();
702
703 let loaded = journal.load_all().await.unwrap();
704 assert_eq!(loaded.len(), 3);
705
706 drop(journal);
708
709 let entries = RedbRuntimeEventJournal::inspect(
711 dir.path().join("test.db"),
712 JournalInspectFilter {
713 route_id: None,
714 limit: 10,
715 },
716 )
717 .await
718 .unwrap();
719 let seqs: Vec<u64> = entries.iter().map(|e| e.seq).collect();
720 assert_eq!(seqs, vec![2, 1, 0]);
722 }
723
724 #[tokio::test]
725 async fn redb_journal_load_all_empty() {
726 let dir = tempdir().unwrap();
727 let journal = new_journal(&dir).await;
728 let loaded = journal.load_all().await.unwrap();
729 assert!(loaded.is_empty());
730 }
731
732 #[tokio::test]
733 async fn redb_journal_inspect_file_not_found() {
734 let dir = tempdir().unwrap();
735 let result = RedbRuntimeEventJournal::inspect(
736 dir.path().join("nonexistent.db"),
737 JournalInspectFilter {
738 route_id: None,
739 limit: 10,
740 },
741 )
742 .await;
743 assert!(result.is_err());
744 let err = result.unwrap_err().to_string();
745 assert!(err.contains("journal file not found"));
746 }
747
748 #[tokio::test]
749 async fn redb_journal_inspect_with_route_id_filter() {
750 let dir = tempdir().unwrap();
751 let journal = new_journal(&dir).await;
752
753 journal
754 .append_batch(&[
755 RuntimeEvent::RouteRegistered {
756 route_id: "alpha".to_string(),
757 },
758 RuntimeEvent::RouteRegistered {
759 route_id: "beta".to_string(),
760 },
761 RuntimeEvent::RouteStarted {
762 route_id: "alpha".to_string(),
763 },
764 ])
765 .await
766 .unwrap();
767
768 drop(journal);
769
770 let entries = RedbRuntimeEventJournal::inspect(
771 dir.path().join("test.db"),
772 JournalInspectFilter {
773 route_id: Some("alpha".to_string()),
774 limit: 10,
775 },
776 )
777 .await
778 .unwrap();
779
780 assert_eq!(entries.len(), 2);
781 assert!(entries.iter().all(|e| {
782 matches!(&e.event, RuntimeEvent::RouteRegistered { route_id } | RuntimeEvent::RouteStarted { route_id } if route_id == "alpha")
783 }));
784 }
785
786 #[tokio::test]
787 async fn redb_journal_inspect_limit_enforcement() {
788 let dir = tempdir().unwrap();
789 let journal = new_journal(&dir).await;
790
791 for i in 0..5 {
792 journal
793 .append_batch(&[RuntimeEvent::RouteRegistered {
794 route_id: format!("r{i}"),
795 }])
796 .await
797 .unwrap();
798 }
799
800 drop(journal);
801
802 let entries = RedbRuntimeEventJournal::inspect(
803 dir.path().join("test.db"),
804 JournalInspectFilter {
805 route_id: None,
806 limit: 2,
807 },
808 )
809 .await
810 .unwrap();
811
812 assert_eq!(entries.len(), 2);
813 assert!(
815 matches!(&entries[0].event, RuntimeEvent::RouteRegistered { route_id } if route_id == "r4")
816 );
817 assert!(
818 matches!(&entries[1].event, RuntimeEvent::RouteRegistered { route_id } if route_id == "r3")
819 );
820 }
821
822 #[tokio::test]
823 async fn redb_journal_inspect_limit_with_filter_returns_matching_count() {
824 let dir = tempdir().unwrap();
825 let journal = new_journal(&dir).await;
826
827 for i in 0..4 {
829 let rid = if i % 2 == 0 { "alpha" } else { "beta" };
830 journal
831 .append_batch(&[RuntimeEvent::RouteRegistered {
832 route_id: rid.to_string(),
833 }])
834 .await
835 .unwrap();
836 }
837
838 drop(journal);
839
840 let entries = RedbRuntimeEventJournal::inspect(
842 dir.path().join("test.db"),
843 JournalInspectFilter {
844 route_id: Some("alpha".to_string()),
845 limit: 1,
846 },
847 )
848 .await
849 .unwrap();
850
851 assert_eq!(entries.len(), 1);
852 assert!(
853 matches!(&entries[0].event, RuntimeEvent::RouteRegistered { route_id } if route_id == "alpha")
854 );
855 }
856
857 #[test]
858 fn redb_journal_durability_default_is_immediate() {
859 assert_eq!(JournalDurability::default(), JournalDurability::Immediate);
860 }
861
862 #[test]
863 fn redb_journal_options_default() {
864 let opts = RedbJournalOptions::default();
865 assert_eq!(opts.durability, JournalDurability::Immediate);
866 assert_eq!(opts.compaction_threshold_events, 10_000);
867 }
868
869 #[test]
870 fn redb_journal_entry_serialization_roundtrip() {
871 let entry = JournalEntry {
872 seq: 42,
873 timestamp_ms: 1_700_000_000_000,
874 event: RuntimeEvent::RouteFailed {
875 route_id: "fail-route".to_string(),
876 error: "boom".to_string(),
877 },
878 };
879
880 let bytes = serde_json::to_vec(&entry).unwrap();
881 let decoded: JournalEntry = serde_json::from_slice(&bytes).unwrap();
882 assert_eq!(decoded.seq, 42);
883 assert_eq!(decoded.timestamp_ms, 1_700_000_000_000);
884 assert_eq!(decoded.event, entry.event);
885 }
886
887 #[test]
888 fn redb_journal_runtime_event_ext_all_variants() {
889 let events = [
890 RuntimeEvent::RouteRegistered {
891 route_id: "a".into(),
892 },
893 RuntimeEvent::RouteStartRequested {
894 route_id: "b".into(),
895 },
896 RuntimeEvent::RouteStarted {
897 route_id: "c".into(),
898 },
899 RuntimeEvent::RouteFailed {
900 route_id: "d".into(),
901 error: "err".into(),
902 },
903 RuntimeEvent::RouteStopped {
904 route_id: "e".into(),
905 },
906 RuntimeEvent::RouteSuspended {
907 route_id: "f".into(),
908 },
909 RuntimeEvent::RouteResumed {
910 route_id: "g".into(),
911 },
912 RuntimeEvent::RouteReloaded {
913 route_id: "h".into(),
914 },
915 RuntimeEvent::RouteRemoved {
916 route_id: "i".into(),
917 },
918 ];
919 let expected = ["a", "b", "c", "d", "e", "f", "g", "h", "i"];
920 for (event, expected_id) in events.iter().zip(expected.iter()) {
921 assert_eq!(event.route_id(), *expected_id);
922 }
923 }
924
925 #[tokio::test]
926 async fn redb_journal_compaction_no_removed_routes_early_return() {
927 let dir = tempdir().unwrap();
928 let journal = RedbRuntimeEventJournal::new(
929 dir.path().join("no_remove.db"),
930 RedbJournalOptions {
931 durability: JournalDurability::Eventual,
932 compaction_threshold_events: 1,
933 },
934 )
935 .await
936 .unwrap();
937
938 journal
940 .append_batch(&[RuntimeEvent::RouteRegistered {
941 route_id: "active".to_string(),
942 }])
943 .await
944 .unwrap();
945 journal
946 .append_batch(&[RuntimeEvent::RouteStarted {
947 route_id: "active".to_string(),
948 }])
949 .await
950 .unwrap();
951
952 let loaded = journal.load_all().await.unwrap();
953 assert_eq!(loaded.len(), 2);
954 }
955
956 #[tokio::test]
957 async fn redb_journal_command_ids_multiple_and_remove_nonexistent() {
958 let dir = tempdir().unwrap();
959 let journal = new_journal(&dir).await;
960
961 journal.append_command_id("cmd1").await.unwrap();
962 journal.append_command_id("cmd2").await.unwrap();
963 journal.append_command_id("cmd3").await.unwrap();
964
965 journal.remove_command_id("nonexistent").await.unwrap();
967
968 let ids = journal.load_command_ids().await.unwrap();
969 assert_eq!(ids.len(), 3);
970 assert!(ids.contains(&"cmd1".to_string()));
971 assert!(ids.contains(&"cmd2".to_string()));
972 assert!(ids.contains(&"cmd3".to_string()));
973 }
974
975 #[tokio::test]
976 async fn redb_journal_multiple_routes_compaction() {
977 let dir = tempdir().unwrap();
978 let journal = RedbRuntimeEventJournal::new(
979 dir.path().join("multi_compact.db"),
980 RedbJournalOptions {
981 durability: JournalDurability::Eventual,
982 compaction_threshold_events: 1,
983 },
984 )
985 .await
986 .unwrap();
987
988 journal
990 .append_batch(&[RuntimeEvent::RouteRegistered {
991 route_id: "removed1".to_string(),
992 }])
993 .await
994 .unwrap();
995 journal
996 .append_batch(&[RuntimeEvent::RouteRemoved {
997 route_id: "removed1".to_string(),
998 }])
999 .await
1000 .unwrap();
1001 journal
1002 .append_batch(&[RuntimeEvent::RouteRegistered {
1003 route_id: "removed2".to_string(),
1004 }])
1005 .await
1006 .unwrap();
1007 journal
1008 .append_batch(&[RuntimeEvent::RouteRemoved {
1009 route_id: "removed2".to_string(),
1010 }])
1011 .await
1012 .unwrap();
1013 journal
1014 .append_batch(&[RuntimeEvent::RouteRegistered {
1015 route_id: "kept".to_string(),
1016 }])
1017 .await
1018 .unwrap();
1019
1020 let loaded = journal.load_all().await.unwrap();
1021 assert!(
1022 !loaded.iter().any(|e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "removed1" || route_id == "removed2")),
1023 "removed routes must be compacted"
1024 );
1025 assert!(
1026 loaded.iter().any(
1027 |e| matches!(e, RuntimeEvent::RouteRegistered { route_id } if route_id == "kept")
1028 ),
1029 "kept route must survive"
1030 );
1031 }
1032}