1pub mod delivery;
2pub mod message;
3
4use serde::{Deserialize, Serialize};
5
6use crate::error::EventBusError;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
13#[serde(rename_all = "kebab-case")]
14pub enum DeliveryGuarantee {
15 AtMostOnce,
16 AtLeastOnce,
17 ExactlyOnce,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25#[serde(rename_all = "kebab-case")]
26pub enum PublishConfirmation {
27 FireAndForget,
28 Accepted,
29 Persisted,
30}
31
32#[derive(Debug, Clone)]
37pub struct GuaranteeMatrix {
38 pub publish: DeliveryGuarantee,
39 pub consume: DeliveryGuarantee,
40 pub confirmation: PublishConfirmation,
41}
42
43impl GuaranteeMatrix {
44 pub fn validate(&self) -> Result<(), EventBusError> {
45 let needs_persisted = self.publish == DeliveryGuarantee::ExactlyOnce
46 || self.consume == DeliveryGuarantee::ExactlyOnce;
47
48 if needs_persisted && self.confirmation != PublishConfirmation::Persisted {
49 return Err(EventBusError::Validation(format!(
50 "exactly-once requires {:?} confirmation",
51 PublishConfirmation::Persisted,
52 )));
53 }
54
55 Ok(())
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
64#[serde(rename_all = "kebab-case")]
65pub enum OverflowStrategy {
66 Reject,
67 Block,
68 DropNewest,
69 DropOldest,
70}
71
72#[derive(Debug, Clone)]
77pub struct BackpressurePolicy {
78 pub max_in_flight: usize,
79 pub max_pending_acks: usize,
80 pub max_batch_size: usize,
81 pub overflow_strategy: OverflowStrategy,
82}
83
84impl BackpressurePolicy {
85 pub fn validate(&self) -> Result<(), EventBusError> {
86 if self.max_in_flight == 0 {
87 return Err(EventBusError::Validation(
88 "max in flight must be > 0".into(),
89 ));
90 }
91 if self.max_pending_acks == 0 {
92 return Err(EventBusError::Validation(
93 "max pending acks must be > 0".into(),
94 ));
95 }
96 if self.max_pending_acks < self.max_in_flight {
97 return Err(EventBusError::Validation(
98 "max pending acks must be >= max in flight".into(),
99 ));
100 }
101 if self.max_batch_size == 0 {
102 return Err(EventBusError::Validation(
103 "max batch size must be > 0".into(),
104 ));
105 }
106 if self.max_batch_size > self.max_in_flight {
107 return Err(EventBusError::Validation(
108 "max batch size must be <= max in flight".into(),
109 ));
110 }
111 if self.max_batch_size > self.max_pending_acks {
112 return Err(EventBusError::Validation(
113 "max batch size must be <= max pending acks".into(),
114 ));
115 }
116 Ok(())
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
125#[serde(rename_all = "kebab-case")]
126pub enum AckMode {
127 Manual,
128 AutoOnReceive,
129 AutoOnHandlerSuccess,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[serde(rename_all = "kebab-case")]
138pub enum OrderingMode {
139 None,
140 Key,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
153#[serde(rename_all = "kebab-case")]
154pub enum ConsumerBalanceMode {
155 Competing,
156 FanOut,
157}
158
159#[derive(Debug, Clone)]
164pub struct SubscriptionSemantics {
165 pub ack_mode: AckMode,
166 pub ordering_mode: OrderingMode,
167 pub balance_mode: ConsumerBalanceMode,
168 pub wildcard_topic: bool,
169 pub require_ordered_key: bool,
170}
171
172impl SubscriptionSemantics {
173 pub fn validate(&self) -> Result<(), EventBusError> {
174 if self.require_ordered_key && self.ordering_mode != OrderingMode::Key {
175 return Err(EventBusError::Validation(format!(
176 "ordered key can only be required when ordering mode is {:?}",
177 OrderingMode::Key,
178 )));
179 }
180 Ok(())
181 }
182}
183
184#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn guarantee_matrix_accepts_at_least_once_with_accepted() {
194 let matrix = GuaranteeMatrix {
195 publish: DeliveryGuarantee::AtLeastOnce,
196 consume: DeliveryGuarantee::AtLeastOnce,
197 confirmation: PublishConfirmation::Accepted,
198 };
199 assert!(matrix.validate().is_ok());
200 }
201
202 #[test]
203 fn guarantee_matrix_rejects_exactly_once_without_persisted() {
204 let matrix = GuaranteeMatrix {
205 publish: DeliveryGuarantee::ExactlyOnce,
206 consume: DeliveryGuarantee::ExactlyOnce,
207 confirmation: PublishConfirmation::Accepted,
208 };
209 assert!(matrix.validate().is_err());
210 }
211
212 #[test]
213 fn backpressure_accepts_valid_policy() {
214 let policy = BackpressurePolicy {
215 max_in_flight: 128,
216 max_pending_acks: 256,
217 max_batch_size: 64,
218 overflow_strategy: OverflowStrategy::Reject,
219 };
220 assert!(policy.validate().is_ok());
221 }
222
223 #[test]
224 fn backpressure_rejects_zero_in_flight() {
225 let policy = BackpressurePolicy {
226 max_in_flight: 0,
227 max_pending_acks: 10,
228 max_batch_size: 1,
229 overflow_strategy: OverflowStrategy::Reject,
230 };
231 assert!(policy.validate().is_err());
232 }
233
234 #[test]
235 fn backpressure_rejects_pending_less_than_in_flight() {
236 let policy = BackpressurePolicy {
237 max_in_flight: 10,
238 max_pending_acks: 5,
239 max_batch_size: 1,
240 overflow_strategy: OverflowStrategy::Reject,
241 };
242 assert!(policy.validate().is_err());
243 }
244
245 #[test]
246 fn backpressure_rejects_batch_larger_than_in_flight() {
247 let policy = BackpressurePolicy {
248 max_in_flight: 4,
249 max_pending_acks: 8,
250 max_batch_size: 5,
251 overflow_strategy: OverflowStrategy::Reject,
252 };
253 assert!(policy.validate().is_err());
254 }
255
256 #[test]
257 fn subscription_semantics_rejects_require_ordered_key_without_key_mode() {
258 let sem = SubscriptionSemantics {
259 ack_mode: AckMode::Manual,
260 ordering_mode: OrderingMode::None,
261 balance_mode: ConsumerBalanceMode::Competing,
262 wildcard_topic: false,
263 require_ordered_key: true,
264 };
265 assert!(sem.validate().is_err());
266 }
267}