camel_api/resequencer.rs
1//! Resequencer EIP contract types.
2//!
3//! Batch/stream policy configs (Tasks 2/3).
4
5/// Window-based completion trigger for batch resequencing.
6///
7/// Narrowed to size and/or timeout — NOT the general `CompletionCondition`
8/// (which carries a `Predicate` variant semantically wrong for a
9/// resequencer window). Timeout values are in milliseconds.
10#[derive(Debug, Clone, PartialEq, Eq)]
11#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
12pub enum BatchCompletion {
13 /// Emit when `size` exchanges accumulate for a correlation key.
14 Size(usize),
15 /// Emit after `timeout_ms` since the first exchange for a correlation key.
16 Timeout(u64),
17 /// Emit when EITHER condition is met first.
18 SizeOrTimeout(usize, u64),
19}
20
21/// What to do when a stream resequencer gap timer fires.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
24pub enum GapPolicy {
25 /// Emit the contiguous run from `next_expected` and advance past the gap.
26 EmitPartial,
27 /// Drop all held exchanges with a warning log (no dead-letter sink wired).
28 DropAndLog,
29}
30
31/// What to do when the stream resequencer priority queue reaches capacity.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
34pub enum CapacityPolicy {
35 /// Log a warning and drop the incoming exchange (no dead-letter sink wired).
36 LogAndDrop,
37 /// Drop the oldest exchange from the queue to make room.
38 DropOldest,
39}
40
41/// Configurable resequencing policy mode.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
44pub enum ResequenceMode {
45 /// Window-based batch resequencing.
46 Batch {
47 /// Simple-language expression for the correlation key
48 /// (e.g. `"${header.region}"`).
49 correlation: String,
50 /// Simple-language expression for the sort key
51 /// (e.g. `"${header.sequence}"`).
52 sort: String,
53 /// Window completion trigger.
54 completion: BatchCompletion,
55 },
56 /// Stream resequencing — bounded priority queue with gap detection.
57 Stream {
58 /// Simple-language expression for the sequence number
59 /// (e.g. `"${header.seqNum}"`). Must evaluate to a u64.
60 sequence: String,
61 /// Maximum queue size (default 1000).
62 capacity: usize,
63 /// Gap timeout in milliseconds (default 5000).
64 gap_timeout: u64,
65 /// What to do when a gap timer fires.
66 on_gap: GapPolicy,
67 /// What to do when the queue reaches capacity.
68 on_capacity_exceeded: CapacityPolicy,
69 /// When true, duplicate/late sequence numbers are ignored.
70 /// Default false (Camel 4.x behavior: duplicates are inserted).
71 dedup: bool,
72 },
73}
74
75/// Configuration for the Resequencer EIP.
76#[derive(Debug, Clone, PartialEq, Eq)]
77#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
78pub struct ResequencePolicyConfig {
79 pub mode: ResequenceMode,
80}
81
82impl Default for ResequencePolicyConfig {
83 /// Safe-ish defaults: batch mode by correlation + sort on `header.id`,
84 /// 100-size window with 30s timeout fallback.
85 fn default() -> Self {
86 Self {
87 mode: ResequenceMode::Batch {
88 correlation: "header.id".into(),
89 sort: "header.id".into(),
90 completion: BatchCompletion::SizeOrTimeout(100, 30_000),
91 },
92 }
93 }
94}