Skip to main content

camel_api/
resequencer.rs

1//! Resequencer EIP contract types.
2//!
3//! Batch/stream policy configs (Tasks 2/3).
4
5/// Window-based completion trigger for batch resequencing.
6///
7/// Narrowed to size and/or timeout — NOT the general `CompletionCondition`
8/// (which carries a `Predicate` variant semantically wrong for a
9/// resequencer window). Timeout values are in milliseconds.
10#[derive(Debug, Clone, PartialEq, Eq)]
11#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
12pub enum BatchCompletion {
13    /// Emit when `size` exchanges accumulate for a correlation key.
14    Size(usize),
15    /// Emit after `timeout_ms` since the first exchange for a correlation key.
16    Timeout(u64),
17    /// Emit when EITHER condition is met first.
18    SizeOrTimeout(usize, u64),
19}
20
21/// What to do when a stream resequencer gap timer fires.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
24pub enum GapPolicy {
25    /// Emit the contiguous run from `next_expected` and advance past the gap.
26    EmitPartial,
27    /// Drop all held exchanges with a warning log (no dead-letter sink wired).
28    DropAndLog,
29}
30
31/// What to do when the stream resequencer priority queue reaches capacity.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
34pub enum CapacityPolicy {
35    /// Log a warning and drop the incoming exchange (no dead-letter sink wired).
36    LogAndDrop,
37    /// Drop the oldest exchange from the queue to make room.
38    DropOldest,
39}
40
41/// Configurable resequencing policy mode.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
44pub enum ResequenceMode {
45    /// Window-based batch resequencing.
46    Batch {
47        /// Simple-language expression for the correlation key
48        /// (e.g. `"${header.region}"`).
49        correlation: String,
50        /// Simple-language expression for the sort key
51        /// (e.g. `"${header.sequence}"`).
52        sort: String,
53        /// Window completion trigger.
54        completion: BatchCompletion,
55    },
56    /// Stream resequencing — bounded priority queue with gap detection.
57    Stream {
58        /// Simple-language expression for the sequence number
59        /// (e.g. `"${header.seqNum}"`). Must evaluate to a u64.
60        sequence: String,
61        /// Maximum queue size (default 1000).
62        capacity: usize,
63        /// Gap timeout in milliseconds (default 5000).
64        gap_timeout: u64,
65        /// What to do when a gap timer fires.
66        on_gap: GapPolicy,
67        /// What to do when the queue reaches capacity.
68        on_capacity_exceeded: CapacityPolicy,
69        /// When true, duplicate/late sequence numbers are ignored.
70        /// Default false (Camel 4.x behavior: duplicates are inserted).
71        dedup: bool,
72    },
73}
74
75/// Configuration for the Resequencer EIP.
76#[derive(Debug, Clone, PartialEq, Eq)]
77#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
78pub struct ResequencePolicyConfig {
79    pub mode: ResequenceMode,
80}
81
82impl Default for ResequencePolicyConfig {
83    /// Safe-ish defaults: batch mode by correlation + sort on `header.id`,
84    /// 100-size window with 30s timeout fallback.
85    fn default() -> Self {
86        Self {
87            mode: ResequenceMode::Batch {
88                correlation: "header.id".into(),
89                sort: "header.id".into(),
90                completion: BatchCompletion::SizeOrTimeout(100, 30_000),
91            },
92        }
93    }
94}