Skip to main content

bus_nats/
stream.rs

1use async_nats::jetstream::{self, stream};
2use std::time::Duration;
3
4/// Configuration for the JetStream stream.
5/// Defaults are production-safe: R3 replication, 5-minute dedup window.
6#[derive(Debug, Clone)]
7pub struct StreamConfig {
8    pub name: String,
9    pub subjects: Vec<String>,
10    pub num_replicas: usize,
11    pub duplicate_window: Duration,
12    pub max_age: Duration,
13}
14
15impl Default for StreamConfig {
16    fn default() -> Self {
17        Self {
18            name: "EVENTS".into(),
19            subjects: vec!["events.>".into()],
20            num_replicas: 3,
21            duplicate_window: Duration::from_secs(5 * 60),
22            max_age: Duration::from_secs(7 * 24 * 3600),
23        }
24    }
25}
26
27/// Ensure the stream exists, creating it if absent. Idempotent.
28pub async fn ensure_stream(
29    js: &jetstream::Context,
30    cfg: &StreamConfig,
31) -> Result<stream::Stream, async_nats::Error> {
32    js.get_or_create_stream(stream::Config {
33        name: cfg.name.clone(),
34        subjects: cfg.subjects.clone(),
35        num_replicas: cfg.num_replicas,
36        storage: stream::StorageType::File,
37        duplicate_window: cfg.duplicate_window,
38        discard: stream::DiscardPolicy::Old,
39        max_age: cfg.max_age,
40        allow_direct: true,
41        ..Default::default()
42    })
43    .await
44    .map_err(|e| Box::new(e) as async_nats::Error)
45}