Skip to main content

outbox_core/
config.rs

1//! Runtime configuration for the outbox crate.
2//!
3//! [`OutboxConfig`] carries the tunables that both the producer-side
4//! [`OutboxService`](crate::service::OutboxService) and the worker-side
5//! [`OutboxManager`](crate::manager::OutboxManager) read — batch size, timer
6//! intervals, lock timeout, and which [`IdempotencyStrategy`] to apply when
7//! new events are written.
8
9use crate::model::Event;
10use serde::Serialize;
11use std::fmt::Debug;
12
13/// Runtime configuration shared by the producer and worker sides.
14///
15/// Generic over the user's domain event payload type `P` because the
16/// [`Custom`](IdempotencyStrategy::Custom) strategy variant holds a function
17/// pointer of type `fn(&Event<P>) -> String`.
18///
19/// All fields are public so callers can construct the struct with a literal or
20/// start from [`default`](Self::default) and override selected fields.
21///
22/// # Example
23///
24/// ```
25/// use outbox_core::prelude::*;
26///
27/// # #[derive(Debug, Clone, serde::Serialize)]
28/// # struct MyEvent;
29/// let cfg: OutboxConfig<MyEvent> = OutboxConfig {
30///     batch_size: 200,
31///     poll_interval_secs: 2,
32///     ..OutboxConfig::default()
33/// };
34/// assert_eq!(cfg.batch_size, 200);
35/// assert_eq!(cfg.retention_days, 7); // inherited from default
36/// ```
37#[derive(Clone)]
38pub struct OutboxConfig<P>
39where
40    P: Debug + Clone + Serialize,
41{
42    /// Maximum number of events fetched per processing iteration.
43    pub batch_size: u32,
44    /// How long sent events are kept before the garbage collector deletes
45    /// them, measured in days.
46    pub retention_days: i64,
47    /// Interval between garbage-collection passes, in seconds.
48    pub gc_interval_secs: u64,
49    /// Fallback polling interval for the worker loop, in seconds. Used
50    /// alongside database `LISTEN`/notify to guarantee progress even when
51    /// notifications are missed or unsupported.
52    pub poll_interval_secs: u64,
53    /// Duration a row stays locked while a worker processes it, in minutes.
54    /// Once this timeout elapses, the row becomes eligible to be picked up
55    /// again (recovering from a crashed or stuck worker).
56    pub lock_timeout_mins: i64,
57
58    /// How idempotency tokens are produced for newly written events. See
59    /// [`IdempotencyStrategy`] for the available variants.
60    pub idempotency_strategy: IdempotencyStrategy<P>,
61    /// Failure count at which an event becomes eligible for quarantine. Events
62    /// whose failure counter reaches this value are returned by
63    /// [`DlqHeap::drain_exceeded`](crate::dlq::storage::DlqHeap::drain_exceeded)
64    /// on the next reaper pass.
65    ///
66    /// Only consulted when the `dlq` feature is enabled.
67    pub dlq_threshold: u32,
68    /// Interval between dead-letter reaper passes, in seconds. Each pass drains
69    /// events that have crossed [`dlq_threshold`](Self::dlq_threshold) and hands
70    /// them off for quarantine.
71    ///
72    /// Only consulted when the `dlq` feature is enabled.
73    pub dlq_interval_secs: u64,
74}
75
76impl<P> Default for OutboxConfig<P>
77where
78    P: Debug + Clone + Serialize,
79{
80    /// Returns a configuration suitable as a starting point.
81    ///
82    /// The defaults are:
83    ///
84    /// | Field | Value |
85    /// |---|---|
86    /// | `batch_size` | 100 |
87    /// | `retention_days` | 7 |
88    /// | `gc_interval_secs` | 3600 |
89    /// | `poll_interval_secs` | 10 |
90    /// | `lock_timeout_mins` | 5 |
91    /// | `idempotency_strategy` | [`IdempotencyStrategy::None`] |
92    /// | `dlq_threshold` | 10 |
93    /// | `dlq_interval_secs` | 300 |
94    ///
95    /// These values are part of the public contract — tuning them is a
96    /// deliberate behaviour change.
97    fn default() -> Self {
98        Self {
99            batch_size: 100,
100            retention_days: 7,
101            gc_interval_secs: 3600,
102            poll_interval_secs: 10,
103            lock_timeout_mins: 5,
104            idempotency_strategy: IdempotencyStrategy::None,
105            dlq_threshold: 10,
106            dlq_interval_secs: 300,
107        }
108    }
109}
110
111/// How an idempotency token is produced when a new event is written.
112///
113/// The variant is evaluated inside
114/// [`OutboxService::add_event`](crate::service::OutboxService::add_event)
115/// before the event is persisted. When an
116/// [`IdempotencyStorageProvider`](crate::idempotency::storage::IdempotencyStorageProvider)
117/// is wired, the produced token is also used to reserve uniqueness up front.
118#[derive(Clone)]
119pub enum IdempotencyStrategy<P>
120where
121    P: Debug + Clone + Serialize,
122{
123    /// Uses the caller-supplied token as-is. Passing `None` at call site means
124    /// the event is stored without a token and no reservation is attempted.
125    Provided,
126    /// Derives the token by applying the given function to the event about to
127    /// be written. The `add_event` callback `get_event` must return `Some`
128    /// for this variant — otherwise the service panics.
129    Custom(fn(&Event<P>) -> String),
130    /// Generates a fresh UUID v7 token at write time. Any caller-supplied
131    /// token is ignored.
132    Uuid,
133    //TODO:
134    //HashPayload, //BLAKE3
135    /// Disables idempotency — no token is produced and no reservation is
136    /// attempted. This is the default.
137    None,
138}
139
140#[cfg(test)]
141#[allow(clippy::unwrap_used)]
142mod tests {
143    use super::*;
144    use rstest::rstest;
145    use serde::{Deserialize, Serialize};
146
147    #[derive(Debug, Clone, Serialize, Deserialize)]
148    struct TestPayload;
149
150    fn default_cfg() -> OutboxConfig<TestPayload> {
151        OutboxConfig::default()
152    }
153
154    #[rstest]
155    fn default_batch_size_is_100() {
156        assert_eq!(default_cfg().batch_size, 100);
157    }
158
159    #[rstest]
160    fn default_retention_days_is_7() {
161        assert_eq!(default_cfg().retention_days, 7);
162    }
163
164    #[rstest]
165    fn default_gc_interval_secs_is_3600() {
166        assert_eq!(default_cfg().gc_interval_secs, 3600);
167    }
168
169    #[rstest]
170    fn default_poll_interval_secs_is_10() {
171        assert_eq!(default_cfg().poll_interval_secs, 10);
172    }
173
174    #[rstest]
175    fn default_lock_timeout_mins_is_5() {
176        assert_eq!(default_cfg().lock_timeout_mins, 5);
177    }
178
179    #[rstest]
180    fn default_idempotency_strategy_is_none() {
181        assert!(matches!(
182            default_cfg().idempotency_strategy,
183            IdempotencyStrategy::None
184        ));
185    }
186
187    // ------------------------------- Clone -------------------------------
188
189    #[rstest]
190    fn clone_preserves_scalar_fields() {
191        let cfg = OutboxConfig::<TestPayload> {
192            batch_size: 42,
193            retention_days: 3,
194            gc_interval_secs: 99,
195            poll_interval_secs: 1,
196            lock_timeout_mins: 2,
197            idempotency_strategy: IdempotencyStrategy::Uuid,
198            dlq_threshold: 10,
199            dlq_interval_secs: 1,
200        };
201        let cloned = cfg.clone();
202        assert_eq!(cloned.batch_size, 42);
203        assert_eq!(cloned.retention_days, 3);
204        assert_eq!(cloned.gc_interval_secs, 99);
205        assert_eq!(cloned.poll_interval_secs, 1);
206        assert_eq!(cloned.lock_timeout_mins, 2);
207        assert_eq!(cloned.dlq_threshold, 10);
208        assert_eq!(cloned.dlq_interval_secs, 1);
209        assert!(matches!(
210            cloned.idempotency_strategy,
211            IdempotencyStrategy::Uuid
212        ));
213    }
214
215    #[rstest]
216    fn clone_preserves_custom_strategy_function_pointer() {
217        fn derive(_: &Event<TestPayload>) -> String {
218            "fp".into()
219        }
220        let cfg = OutboxConfig::<TestPayload> {
221            batch_size: 1,
222            retention_days: 1,
223            gc_interval_secs: 1,
224            poll_interval_secs: 1,
225            lock_timeout_mins: 1,
226            idempotency_strategy: IdempotencyStrategy::Custom(derive),
227            dlq_threshold: 10,
228            dlq_interval_secs: 1,
229        };
230        let cloned = cfg.clone();
231        match cloned.idempotency_strategy {
232            IdempotencyStrategy::Custom(f) => {
233                let e = Event::new(
234                    crate::object::EventType::new("t"),
235                    crate::object::Payload::new(TestPayload),
236                    None,
237                );
238                assert_eq!(f(&e), "fp");
239            }
240            _ => panic!("expected Custom variant after clone"),
241        }
242    }
243}