1use std::time::Duration;
2
3use async_nats::jetstream::{
4 consumer::{AckPolicy, ReplayPolicy, pull, push},
5 stream,
6};
7
8#[derive(Debug, Clone)]
9pub struct Config<C> {
10 pub stream: stream::Config,
11 pub consumer: C,
12 pub heartbeat: Duration,
13}
14
15impl Config<()> {
16 pub fn new(namespace: &str) -> Self {
25 let mut stream = stream::Config {
26 name: namespace.to_owned(),
27 ..Default::default()
28 };
29 stream.name = namespace.to_owned();
30 Self {
31 stream,
32 consumer: (),
33 heartbeat: Duration::from_secs(30),
34 }
35 }
36
37 pub fn with_pull_consumer(self) -> Config<pull::Config> {
57 Config {
58 stream: self.stream,
59 consumer: Default::default(),
60 heartbeat: Duration::from_secs(30),
61 }
62 }
63
64 pub fn with_ordered_pull_consumer(self) -> Config<pull::OrderedConfig> {
87 Config {
88 stream: self.stream,
89 consumer: Default::default(),
90 heartbeat: Duration::from_secs(30),
91 }
92 }
93
94 pub fn with_push_consumer(self) -> Config<push::Config> {
113 let consumer: push::Config = push::Config {
114 deliver_subject: "apalis-worker-group".to_owned(),
115 ..Default::default()
116 };
117 Config {
118 stream: self.stream,
119 consumer,
120 heartbeat: Duration::from_secs(30),
121 }
122 }
123
124 pub fn with_ordered_push_consumer(self) -> Config<push::OrderedConfig> {
148 let consumer: push::OrderedConfig = push::OrderedConfig {
149 deliver_subject: "apalis-worker-ordered-group".to_owned(),
150 ..Default::default()
151 };
152 Config {
153 stream: self.stream,
154 consumer,
155 heartbeat: Duration::from_secs(30),
156 }
157 }
158}
159
160impl Config<pull::Config> {
161 pub fn durable(mut self) -> Self {
162 self.consumer.durable_name = Some(format!("{}-queue", &self.stream.name));
163 self.consumer.name = Some(format!("{}-queue", &self.stream.name));
164 Self {
165 stream: self.stream,
166 consumer: self.consumer,
167 heartbeat: Duration::from_secs(30),
168 }
169 }
170
171 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
172 self.consumer.description = Some(desc.into());
173 self
174 }
175
176 pub fn with_ack_policy(mut self, policy: AckPolicy) -> Self {
177 self.consumer.ack_policy = policy;
178 self
179 }
180
181 pub fn with_ack_wait(mut self, wait: Duration) -> Self {
182 self.consumer.ack_wait = wait;
183 self
184 }
185
186 pub fn with_max_deliver(mut self, max: i64) -> Self {
187 self.consumer.max_deliver = max;
188 self
189 }
190
191 pub fn with_filter_subject(mut self, subject: impl Into<String>) -> Self {
192 self.consumer.filter_subject = subject.into();
193 self
194 }
195
196 pub fn with_replay_policy(mut self, policy: ReplayPolicy) -> Self {
197 self.consumer.replay_policy = policy;
198 self
199 }
200
201 pub fn with_rate_limit(mut self, rate: u64) -> Self {
202 self.consumer.rate_limit = rate;
203 self
204 }
205
206 pub fn with_max_ack_pending(mut self, max: i64) -> Self {
207 self.consumer.max_ack_pending = max;
208 self
209 }
210
211 pub fn with_backoff(mut self, backoff: Vec<Duration>) -> Self {
212 self.consumer.backoff = backoff;
213 self
214 }
215
216 pub fn with_inactive_threshold(mut self, threshold: Duration) -> Self {
217 self.consumer.inactive_threshold = threshold;
218 self
219 }
220}
221
222impl Config<push::Config> {
223 pub fn durable(mut self) -> Self {
224 self.consumer.durable_name = Some(format!("{}-queue", &self.stream.name));
225 self.consumer.name = Some(format!("{}-queue", &self.stream.name));
226 Self {
227 stream: self.stream,
228 consumer: self.consumer,
229 heartbeat: Duration::from_secs(30),
230 }
231 }
232
233 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
234 self.consumer.description = Some(desc.into());
235 self
236 }
237
238 pub fn with_deliver_group(mut self, group: impl Into<String>) -> Self {
239 self.consumer.deliver_group = Some(group.into());
240 self
241 }
242
243 pub fn with_ack_policy(mut self, policy: AckPolicy) -> Self {
244 self.consumer.ack_policy = policy;
245 self
246 }
247
248 pub fn with_ack_wait(mut self, wait: Duration) -> Self {
249 self.consumer.ack_wait = wait;
250 self
251 }
252
253 pub fn with_max_deliver(mut self, max: i64) -> Self {
254 self.consumer.max_deliver = max;
255 self
256 }
257
258 pub fn with_filter_subject(mut self, subject: impl Into<String>) -> Self {
259 self.consumer.filter_subject = subject.into();
260 self
261 }
262
263 pub fn with_replay_policy(mut self, policy: ReplayPolicy) -> Self {
264 self.consumer.replay_policy = policy;
265 self
266 }
267
268 pub fn with_rate_limit(mut self, rate: u64) -> Self {
269 self.consumer.rate_limit = rate;
270 self
271 }
272
273 pub fn with_max_ack_pending(mut self, max: i64) -> Self {
274 self.consumer.max_ack_pending = max;
275 self
276 }
277
278 pub fn with_flow_control(mut self, enabled: bool) -> Self {
279 self.consumer.flow_control = enabled;
280 self
281 }
282
283 pub fn with_idle_heartbeat(mut self, hb: Duration) -> Self {
284 self.consumer.idle_heartbeat = hb;
285 self
286 }
287
288 pub fn with_backoff(mut self, backoff: Vec<Duration>) -> Self {
289 self.consumer.backoff = backoff;
290 self
291 }
292
293 pub fn with_inactive_threshold(mut self, threshold: Duration) -> Self {
294 self.consumer.inactive_threshold = threshold;
295 self
296 }
297}