Skip to main content

apalis_nats/
config.rs

1use std::time::Duration;
2
3use async_nats::jetstream::{
4    consumer::{AckPolicy, ReplayPolicy, pull, push},
5    stream,
6};
7
8#[derive(Debug, Clone)]
9pub struct Config<C> {
10    pub stream: stream::Config,
11    pub consumer: C,
12    pub heartbeat: Duration,
13}
14
15impl Config<()> {
16    /// Create a new JetStream configuration scoped to a namespace.
17    ///
18    /// This initializes the underlying stream with the given `namespace` as its name.
19    /// No consumer type is selected at this stage — you must choose one of
20    /// [`Config::with_pull_consumer`], [`Config::with_push_consumer`].
21    ///
22    /// # Arguments
23    /// - `namespace`: Logical name used for the stream.
24    pub fn new(namespace: &str) -> Self {
25        let mut stream = stream::Config {
26            name: namespace.to_owned(),
27            ..Default::default()
28        };
29        stream.name = namespace.to_owned();
30        Self {
31            stream,
32            consumer: (),
33            heartbeat: Duration::from_secs(30),
34        }
35    }
36
37    /// Configure a **pull-based consumer**.
38    ///
39    /// In this mode, the client explicitly requests messages using APIs like
40    /// `fetch` or `next`. This provides strong control over throughput and
41    /// natural backpressure.
42    ///
43    /// # Characteristics
44    /// - Client-driven (you decide when/how many messages to receive)
45    /// - Supports acknowledgements and redelivery
46    /// - Durable and fault-tolerant
47    ///
48    /// # Use cases
49    /// - Job queues
50    /// - Worker systems
51    /// - Batch processing
52    ///
53    /// # Notes
54    /// - Does **not** support features like `idle_heartbeat`
55    /// - Recommended default for most backend processing systems
56    pub fn with_pull_consumer(self) -> Config<pull::Config> {
57        Config {
58            stream: self.stream,
59            consumer: Default::default(),
60            heartbeat: Duration::from_secs(30),
61        }
62    }
63
64    /// Configure an **ordered pull-based consumer**.
65    ///
66    /// This is a simplified pull consumer that guarantees strict message ordering,
67    /// but disables reliability features such as acknowledgements and redelivery.
68    ///
69    /// # Characteristics
70    /// - Strict ordering guaranteed
71    /// - No acknowledgements
72    /// - No redelivery on failure
73    /// - Ephemeral (non-durable)
74    ///
75    /// # Behavior
76    /// If a message is missed or a sequence gap is detected, the consumer is
77    /// transparently reset to a new position.
78    ///
79    /// # Use cases
80    /// - Stream inspection
81    /// - Debugging
82    /// - Replay / analytics pipelines where occasional loss is acceptable
83    ///
84    /// # ⚠️ Warning
85    /// Do **not** use for job processing or systems requiring reliability.
86    pub fn with_ordered_pull_consumer(self) -> Config<pull::OrderedConfig> {
87        Config {
88            stream: self.stream,
89            consumer: Default::default(),
90            heartbeat: Duration::from_secs(30),
91        }
92    }
93
94    /// Configure a **push-based consumer**.
95    ///
96    /// In this mode, JetStream delivers messages to a subject (`deliver_subject`)
97    /// and the client subscribes to that subject.
98    ///
99    /// # Characteristics
100    /// - Server-driven (messages are pushed to the client)
101    /// - Supports acknowledgements and redelivery
102    /// - Can be combined with queue groups for load balancing
103    ///
104    /// # Use cases
105    /// - Event-driven systems
106    /// - Real-time pipelines
107    /// - Reactive services
108    ///
109    /// # Notes
110    /// - Supports features like `idle_heartbeat` and flow control
111    /// - Requires configuring a `deliver_subject` defaults to `apalis-worker-group`
112    pub fn with_push_consumer(self) -> Config<push::Config> {
113        let consumer: push::Config = push::Config {
114            deliver_subject: "apalis-worker-group".to_owned(),
115            ..Default::default()
116        };
117        Config {
118            stream: self.stream,
119            consumer,
120            heartbeat: Duration::from_secs(30),
121        }
122    }
123
124    /// Configure an **ordered push-based consumer**.
125    ///
126    /// This is a push consumer that guarantees strict ordering, but removes
127    /// reliability guarantees such as acknowledgements and redelivery.
128    ///
129    /// # Characteristics
130    /// - Strict ordering guaranteed
131    /// - No acknowledgements
132    /// - No redelivery
133    /// - Ephemeral (non-durable)
134    ///
135    /// # Behavior
136    /// If message delivery order is disrupted, the consumer is automatically
137    /// recreated and resumes from a new position.
138    ///
139    /// # Use cases
140    /// - Observability pipelines
141    /// - Real-time stream inspection
142    /// - Monitoring and debugging
143    ///
144    /// # ⚠️ Warning
145    /// Not suitable for production job processing or any system that requires
146    /// guaranteed delivery.
147    pub fn with_ordered_push_consumer(self) -> Config<push::OrderedConfig> {
148        let consumer: push::OrderedConfig = push::OrderedConfig {
149            deliver_subject: "apalis-worker-ordered-group".to_owned(),
150            ..Default::default()
151        };
152        Config {
153            stream: self.stream,
154            consumer,
155            heartbeat: Duration::from_secs(30),
156        }
157    }
158}
159
160impl Config<pull::Config> {
161    pub fn durable(mut self) -> Self {
162        self.consumer.durable_name = Some(format!("{}-queue", &self.stream.name));
163        self.consumer.name = Some(format!("{}-queue", &self.stream.name));
164        Self {
165            stream: self.stream,
166            consumer: self.consumer,
167            heartbeat: Duration::from_secs(30),
168        }
169    }
170
171    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
172        self.consumer.description = Some(desc.into());
173        self
174    }
175
176    pub fn with_ack_policy(mut self, policy: AckPolicy) -> Self {
177        self.consumer.ack_policy = policy;
178        self
179    }
180
181    pub fn with_ack_wait(mut self, wait: Duration) -> Self {
182        self.consumer.ack_wait = wait;
183        self
184    }
185
186    pub fn with_max_deliver(mut self, max: i64) -> Self {
187        self.consumer.max_deliver = max;
188        self
189    }
190
191    pub fn with_filter_subject(mut self, subject: impl Into<String>) -> Self {
192        self.consumer.filter_subject = subject.into();
193        self
194    }
195
196    pub fn with_replay_policy(mut self, policy: ReplayPolicy) -> Self {
197        self.consumer.replay_policy = policy;
198        self
199    }
200
201    pub fn with_rate_limit(mut self, rate: u64) -> Self {
202        self.consumer.rate_limit = rate;
203        self
204    }
205
206    pub fn with_max_ack_pending(mut self, max: i64) -> Self {
207        self.consumer.max_ack_pending = max;
208        self
209    }
210
211    pub fn with_backoff(mut self, backoff: Vec<Duration>) -> Self {
212        self.consumer.backoff = backoff;
213        self
214    }
215
216    pub fn with_inactive_threshold(mut self, threshold: Duration) -> Self {
217        self.consumer.inactive_threshold = threshold;
218        self
219    }
220}
221
222impl Config<push::Config> {
223    pub fn durable(mut self) -> Self {
224        self.consumer.durable_name = Some(format!("{}-queue", &self.stream.name));
225        self.consumer.name = Some(format!("{}-queue", &self.stream.name));
226        Self {
227            stream: self.stream,
228            consumer: self.consumer,
229            heartbeat: Duration::from_secs(30),
230        }
231    }
232
233    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
234        self.consumer.description = Some(desc.into());
235        self
236    }
237
238    pub fn with_deliver_group(mut self, group: impl Into<String>) -> Self {
239        self.consumer.deliver_group = Some(group.into());
240        self
241    }
242
243    pub fn with_ack_policy(mut self, policy: AckPolicy) -> Self {
244        self.consumer.ack_policy = policy;
245        self
246    }
247
248    pub fn with_ack_wait(mut self, wait: Duration) -> Self {
249        self.consumer.ack_wait = wait;
250        self
251    }
252
253    pub fn with_max_deliver(mut self, max: i64) -> Self {
254        self.consumer.max_deliver = max;
255        self
256    }
257
258    pub fn with_filter_subject(mut self, subject: impl Into<String>) -> Self {
259        self.consumer.filter_subject = subject.into();
260        self
261    }
262
263    pub fn with_replay_policy(mut self, policy: ReplayPolicy) -> Self {
264        self.consumer.replay_policy = policy;
265        self
266    }
267
268    pub fn with_rate_limit(mut self, rate: u64) -> Self {
269        self.consumer.rate_limit = rate;
270        self
271    }
272
273    pub fn with_max_ack_pending(mut self, max: i64) -> Self {
274        self.consumer.max_ack_pending = max;
275        self
276    }
277
278    pub fn with_flow_control(mut self, enabled: bool) -> Self {
279        self.consumer.flow_control = enabled;
280        self
281    }
282
283    pub fn with_idle_heartbeat(mut self, hb: Duration) -> Self {
284        self.consumer.idle_heartbeat = hb;
285        self
286    }
287
288    pub fn with_backoff(mut self, backoff: Vec<Duration>) -> Self {
289        self.consumer.backoff = backoff;
290        self
291    }
292
293    pub fn with_inactive_threshold(mut self, threshold: Duration) -> Self {
294        self.consumer.inactive_threshold = threshold;
295        self
296    }
297}