outbox_core/config.rs
1//! Runtime configuration for the outbox crate.
2//!
3//! [`OutboxConfig`] carries the tunables that both the producer-side
4//! [`OutboxService`](crate::service::OutboxService) and the worker-side
5//! [`OutboxManager`](crate::manager::OutboxManager) read — batch size, timer
6//! intervals, lock timeout, and which [`IdempotencyStrategy`] to apply when
7//! new events are written.
8
9use crate::model::Event;
10use serde::Serialize;
11use std::fmt::Debug;
12
13/// Runtime configuration shared by the producer and worker sides.
14///
15/// Generic over the user's domain event payload type `P` because the
16/// [`Custom`](IdempotencyStrategy::Custom) strategy variant holds a function
17/// pointer of type `fn(&Event<P>) -> String`.
18///
19/// All fields are public so callers can construct the struct with a literal or
20/// start from [`default`](Self::default) and override selected fields.
21///
22/// # Example
23///
24/// ```
25/// use outbox_core::prelude::*;
26///
27/// # #[derive(Debug, Clone, serde::Serialize)]
28/// # struct MyEvent;
29/// let cfg: OutboxConfig<MyEvent> = OutboxConfig {
30/// batch_size: 200,
31/// poll_interval_secs: 2,
32/// ..OutboxConfig::default()
33/// };
34/// assert_eq!(cfg.batch_size, 200);
35/// assert_eq!(cfg.retention_days, 7); // inherited from default
36/// ```
37#[derive(Clone)]
38pub struct OutboxConfig<P>
39where
40 P: Debug + Clone + Serialize,
41{
42 /// Maximum number of events fetched per processing iteration.
43 pub batch_size: u32,
44 /// How long sent events are kept before the garbage collector deletes
45 /// them, measured in days.
46 pub retention_days: i64,
47 /// Interval between garbage-collection passes, in seconds.
48 pub gc_interval_secs: u64,
49 /// Fallback polling interval for the worker loop, in seconds. Used
50 /// alongside database `LISTEN`/notify to guarantee progress even when
51 /// notifications are missed or unsupported.
52 pub poll_interval_secs: u64,
53 /// Duration a row stays locked while a worker processes it, in minutes.
54 /// Once this timeout elapses, the row becomes eligible to be picked up
55 /// again (recovering from a crashed or stuck worker).
56 pub lock_timeout_mins: i64,
57
58 /// How idempotency tokens are produced for newly written events. See
59 /// [`IdempotencyStrategy`] for the available variants.
60 pub idempotency_strategy: IdempotencyStrategy<P>,
61 /// Failure count at which an event becomes eligible for quarantine. Events
62 /// whose failure counter reaches this value are returned by
63 /// [`DlqHeap::drain_exceeded`](crate::dlq::storage::DlqHeap::drain_exceeded)
64 /// on the next reaper pass.
65 ///
66 /// Only consulted when the `dlq` feature is enabled.
67 pub dlq_threshold: u32,
68 /// Interval between dead-letter reaper passes, in seconds. Each pass drains
69 /// events that have crossed [`dlq_threshold`](Self::dlq_threshold) and hands
70 /// them off for quarantine.
71 ///
72 /// Only consulted when the `dlq` feature is enabled.
73 pub dlq_interval_secs: u64,
74}
75
76impl<P> Default for OutboxConfig<P>
77where
78 P: Debug + Clone + Serialize,
79{
80 /// Returns a configuration suitable as a starting point.
81 ///
82 /// The defaults are:
83 ///
84 /// | Field | Value |
85 /// |---|---|
86 /// | `batch_size` | 100 |
87 /// | `retention_days` | 7 |
88 /// | `gc_interval_secs` | 3600 |
89 /// | `poll_interval_secs` | 10 |
90 /// | `lock_timeout_mins` | 5 |
91 /// | `idempotency_strategy` | [`IdempotencyStrategy::None`] |
92 /// | `dlq_threshold` | 10 |
93 /// | `dlq_interval_secs` | 300 |
94 ///
95 /// These values are part of the public contract — tuning them is a
96 /// deliberate behaviour change.
97 fn default() -> Self {
98 Self {
99 batch_size: 100,
100 retention_days: 7,
101 gc_interval_secs: 3600,
102 poll_interval_secs: 10,
103 lock_timeout_mins: 5,
104 idempotency_strategy: IdempotencyStrategy::None,
105 dlq_threshold: 10,
106 dlq_interval_secs: 300,
107 }
108 }
109}
110
111/// How an idempotency token is produced when a new event is written.
112///
113/// The variant is evaluated inside
114/// [`OutboxService::add_event`](crate::service::OutboxService::add_event)
115/// before the event is persisted. When an
116/// [`IdempotencyStorageProvider`](crate::idempotency::storage::IdempotencyStorageProvider)
117/// is wired, the produced token is also used to reserve uniqueness up front.
118#[derive(Clone)]
119pub enum IdempotencyStrategy<P>
120where
121 P: Debug + Clone + Serialize,
122{
123 /// Uses the caller-supplied token as-is. Passing `None` at call site means
124 /// the event is stored without a token and no reservation is attempted.
125 Provided,
126 /// Derives the token by applying the given function to the event about to
127 /// be written. The `add_event` callback `get_event` must return `Some`
128 /// for this variant — otherwise the service panics.
129 Custom(fn(&Event<P>) -> String),
130 /// Generates a fresh UUID v7 token at write time. Any caller-supplied
131 /// token is ignored.
132 Uuid,
133 //TODO:
134 //HashPayload, //BLAKE3
135 /// Disables idempotency — no token is produced and no reservation is
136 /// attempted. This is the default.
137 None,
138}
139
140#[cfg(test)]
141#[allow(clippy::unwrap_used)]
142mod tests {
143 use super::*;
144 use rstest::rstest;
145 use serde::{Deserialize, Serialize};
146
147 #[derive(Debug, Clone, Serialize, Deserialize)]
148 struct TestPayload;
149
150 fn default_cfg() -> OutboxConfig<TestPayload> {
151 OutboxConfig::default()
152 }
153
154 #[rstest]
155 fn default_batch_size_is_100() {
156 assert_eq!(default_cfg().batch_size, 100);
157 }
158
159 #[rstest]
160 fn default_retention_days_is_7() {
161 assert_eq!(default_cfg().retention_days, 7);
162 }
163
164 #[rstest]
165 fn default_gc_interval_secs_is_3600() {
166 assert_eq!(default_cfg().gc_interval_secs, 3600);
167 }
168
169 #[rstest]
170 fn default_poll_interval_secs_is_10() {
171 assert_eq!(default_cfg().poll_interval_secs, 10);
172 }
173
174 #[rstest]
175 fn default_lock_timeout_mins_is_5() {
176 assert_eq!(default_cfg().lock_timeout_mins, 5);
177 }
178
179 #[rstest]
180 fn default_idempotency_strategy_is_none() {
181 assert!(matches!(
182 default_cfg().idempotency_strategy,
183 IdempotencyStrategy::None
184 ));
185 }
186
187 // ------------------------------- Clone -------------------------------
188
189 #[rstest]
190 fn clone_preserves_scalar_fields() {
191 let cfg = OutboxConfig::<TestPayload> {
192 batch_size: 42,
193 retention_days: 3,
194 gc_interval_secs: 99,
195 poll_interval_secs: 1,
196 lock_timeout_mins: 2,
197 idempotency_strategy: IdempotencyStrategy::Uuid,
198 dlq_threshold: 10,
199 dlq_interval_secs: 1,
200 };
201 let cloned = cfg.clone();
202 assert_eq!(cloned.batch_size, 42);
203 assert_eq!(cloned.retention_days, 3);
204 assert_eq!(cloned.gc_interval_secs, 99);
205 assert_eq!(cloned.poll_interval_secs, 1);
206 assert_eq!(cloned.lock_timeout_mins, 2);
207 assert_eq!(cloned.dlq_threshold, 10);
208 assert_eq!(cloned.dlq_interval_secs, 1);
209 assert!(matches!(
210 cloned.idempotency_strategy,
211 IdempotencyStrategy::Uuid
212 ));
213 }
214
215 #[rstest]
216 fn clone_preserves_custom_strategy_function_pointer() {
217 fn derive(_: &Event<TestPayload>) -> String {
218 "fp".into()
219 }
220 let cfg = OutboxConfig::<TestPayload> {
221 batch_size: 1,
222 retention_days: 1,
223 gc_interval_secs: 1,
224 poll_interval_secs: 1,
225 lock_timeout_mins: 1,
226 idempotency_strategy: IdempotencyStrategy::Custom(derive),
227 dlq_threshold: 10,
228 dlq_interval_secs: 1,
229 };
230 let cloned = cfg.clone();
231 match cloned.idempotency_strategy {
232 IdempotencyStrategy::Custom(f) => {
233 let e = Event::new(
234 crate::object::EventType::new("t"),
235 crate::object::Payload::new(TestPayload),
236 None,
237 );
238 assert_eq!(f(&e), "fp");
239 }
240 _ => panic!("expected Custom variant after clone"),
241 }
242 }
243}