Skip to main content

eventbus_core/contract/
mod.rs

1pub mod delivery;
2pub mod message;
3
4use serde::{Deserialize, Serialize};
5
6use crate::error::EventBusError;
7
8// ---------------------------------------------------------------------------
9// Delivery guarantee
10// ---------------------------------------------------------------------------
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
13#[serde(rename_all = "kebab-case")]
14pub enum DeliveryGuarantee {
15    AtMostOnce,
16    AtLeastOnce,
17    ExactlyOnce,
18}
19
20// ---------------------------------------------------------------------------
21// Publish confirmation
22// ---------------------------------------------------------------------------
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25#[serde(rename_all = "kebab-case")]
26pub enum PublishConfirmation {
27    FireAndForget,
28    Accepted,
29    Persisted,
30}
31
32// ---------------------------------------------------------------------------
33// Guarantee matrix
34// ---------------------------------------------------------------------------
35
36#[derive(Debug, Clone)]
37pub struct GuaranteeMatrix {
38    pub publish: DeliveryGuarantee,
39    pub consume: DeliveryGuarantee,
40    pub confirmation: PublishConfirmation,
41}
42
43impl GuaranteeMatrix {
44    pub fn validate(&self) -> Result<(), EventBusError> {
45        let needs_persisted = self.publish == DeliveryGuarantee::ExactlyOnce
46            || self.consume == DeliveryGuarantee::ExactlyOnce;
47
48        if needs_persisted && self.confirmation != PublishConfirmation::Persisted {
49            return Err(EventBusError::Validation(format!(
50                "exactly-once requires {:?} confirmation",
51                PublishConfirmation::Persisted,
52            )));
53        }
54
55        Ok(())
56    }
57}
58
59// ---------------------------------------------------------------------------
60// Overflow strategy
61// ---------------------------------------------------------------------------
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
64#[serde(rename_all = "kebab-case")]
65pub enum OverflowStrategy {
66    Reject,
67    Block,
68    DropNewest,
69    DropOldest,
70}
71
72// ---------------------------------------------------------------------------
73// Backpressure policy
74// ---------------------------------------------------------------------------
75
76#[derive(Debug, Clone)]
77pub struct BackpressurePolicy {
78    pub max_in_flight: usize,
79    pub max_pending_acks: usize,
80    pub max_batch_size: usize,
81    pub overflow_strategy: OverflowStrategy,
82}
83
84impl BackpressurePolicy {
85    pub fn validate(&self) -> Result<(), EventBusError> {
86        if self.max_in_flight == 0 {
87            return Err(EventBusError::Validation(
88                "max in flight must be > 0".into(),
89            ));
90        }
91        if self.max_pending_acks == 0 {
92            return Err(EventBusError::Validation(
93                "max pending acks must be > 0".into(),
94            ));
95        }
96        if self.max_pending_acks < self.max_in_flight {
97            return Err(EventBusError::Validation(
98                "max pending acks must be >= max in flight".into(),
99            ));
100        }
101        if self.max_batch_size == 0 {
102            return Err(EventBusError::Validation(
103                "max batch size must be > 0".into(),
104            ));
105        }
106        if self.max_batch_size > self.max_in_flight {
107            return Err(EventBusError::Validation(
108                "max batch size must be <= max in flight".into(),
109            ));
110        }
111        if self.max_batch_size > self.max_pending_acks {
112            return Err(EventBusError::Validation(
113                "max batch size must be <= max pending acks".into(),
114            ));
115        }
116        Ok(())
117    }
118}
119
120// ---------------------------------------------------------------------------
121// Ack mode
122// ---------------------------------------------------------------------------
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
125#[serde(rename_all = "kebab-case")]
126pub enum AckMode {
127    Manual,
128    AutoOnReceive,
129    AutoOnHandlerSuccess,
130}
131
132// ---------------------------------------------------------------------------
133// Ordering mode
134// ---------------------------------------------------------------------------
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[serde(rename_all = "kebab-case")]
138pub enum OrderingMode {
139    None,
140    Key,
141}
142
143// ---------------------------------------------------------------------------
144// Consumer balance mode
145// ---------------------------------------------------------------------------
146
147/// How a consumer group distributes messages across consumers.
148///
149/// Backend support varies — `StreamBus` currently implements `Competing`
150/// (Redis Streams consumer-group semantics) and rejects `FanOut` at subscribe
151/// time. New backends may unlock additional modes; check the backend's docs.
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(rename_all = "kebab-case")]
154pub enum ConsumerBalanceMode {
155    Competing,
156    FanOut,
157}
158
159// ---------------------------------------------------------------------------
160// Subscription semantics
161// ---------------------------------------------------------------------------
162
163#[derive(Debug, Clone)]
164pub struct SubscriptionSemantics {
165    pub ack_mode: AckMode,
166    pub ordering_mode: OrderingMode,
167    pub balance_mode: ConsumerBalanceMode,
168    pub wildcard_topic: bool,
169    pub require_ordered_key: bool,
170}
171
172impl SubscriptionSemantics {
173    pub fn validate(&self) -> Result<(), EventBusError> {
174        if self.require_ordered_key && self.ordering_mode != OrderingMode::Key {
175            return Err(EventBusError::Validation(format!(
176                "ordered key can only be required when ordering mode is {:?}",
177                OrderingMode::Key,
178            )));
179        }
180        Ok(())
181    }
182}
183
184// ---------------------------------------------------------------------------
185// Tests
186// ---------------------------------------------------------------------------
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn guarantee_matrix_accepts_at_least_once_with_accepted() {
194        let matrix = GuaranteeMatrix {
195            publish: DeliveryGuarantee::AtLeastOnce,
196            consume: DeliveryGuarantee::AtLeastOnce,
197            confirmation: PublishConfirmation::Accepted,
198        };
199        assert!(matrix.validate().is_ok());
200    }
201
202    #[test]
203    fn guarantee_matrix_rejects_exactly_once_without_persisted() {
204        let matrix = GuaranteeMatrix {
205            publish: DeliveryGuarantee::ExactlyOnce,
206            consume: DeliveryGuarantee::ExactlyOnce,
207            confirmation: PublishConfirmation::Accepted,
208        };
209        assert!(matrix.validate().is_err());
210    }
211
212    #[test]
213    fn backpressure_accepts_valid_policy() {
214        let policy = BackpressurePolicy {
215            max_in_flight: 128,
216            max_pending_acks: 256,
217            max_batch_size: 64,
218            overflow_strategy: OverflowStrategy::Reject,
219        };
220        assert!(policy.validate().is_ok());
221    }
222
223    #[test]
224    fn backpressure_rejects_zero_in_flight() {
225        let policy = BackpressurePolicy {
226            max_in_flight: 0,
227            max_pending_acks: 10,
228            max_batch_size: 1,
229            overflow_strategy: OverflowStrategy::Reject,
230        };
231        assert!(policy.validate().is_err());
232    }
233
234    #[test]
235    fn backpressure_rejects_pending_less_than_in_flight() {
236        let policy = BackpressurePolicy {
237            max_in_flight: 10,
238            max_pending_acks: 5,
239            max_batch_size: 1,
240            overflow_strategy: OverflowStrategy::Reject,
241        };
242        assert!(policy.validate().is_err());
243    }
244
245    #[test]
246    fn backpressure_rejects_batch_larger_than_in_flight() {
247        let policy = BackpressurePolicy {
248            max_in_flight: 4,
249            max_pending_acks: 8,
250            max_batch_size: 5,
251            overflow_strategy: OverflowStrategy::Reject,
252        };
253        assert!(policy.validate().is_err());
254    }
255
256    #[test]
257    fn subscription_semantics_rejects_require_ordered_key_without_key_mode() {
258        let sem = SubscriptionSemantics {
259            ack_mode: AckMode::Manual,
260            ordering_mode: OrderingMode::None,
261            balance_mode: ConsumerBalanceMode::Competing,
262            wildcard_topic: false,
263            require_ordered_key: true,
264        };
265        assert!(sem.validate().is_err());
266    }
267}