intercom_rs/jetstream/
stream.rs

1//! JetStream stream configuration and management.
2
3use std::marker::PhantomData;
4
5use crate::{codec::CodecType, error::{Error, Result}};
6
7use super::consumer::{PullConsumerBuilder, PushConsumerBuilder};
8
9/// A JetStream stream with configurable codec.
10///
11/// # Type Parameters
12///
13/// * `C` - The codec type used for message serialization
14#[derive(Clone)]
15pub struct Stream<C: CodecType> {
16    inner: async_nats::jetstream::stream::Stream,
17    _codec: PhantomData<C>,
18}
19
20impl<C: CodecType> Stream<C> {
21    /// Create a new stream wrapper.
22    pub(crate) fn new(inner: async_nats::jetstream::stream::Stream) -> Self {
23        Self {
24            inner,
25            _codec: PhantomData,
26        }
27    }
28
29    /// Get the underlying async-nats stream.
30    pub fn inner(&self) -> &async_nats::jetstream::stream::Stream {
31        &self.inner
32    }
33
34    /// Get the stream name.
35    pub fn name(&self) -> &str {
36        &self.inner.cached_info().config.name
37    }
38
39    /// Get stream information.
40    pub async fn info(&self) -> Result<StreamInfo> {
41        let mut stream = self.inner.clone();
42        let info = stream
43            .info()
44            .await
45            .map_err(|e| Error::JetStreamStream(e.to_string()))?;
46        Ok(StreamInfo {
47            config: StreamConfig::from_native(&info.config),
48            state: StreamState {
49                messages: info.state.messages,
50                bytes: info.state.bytes,
51                first_sequence: info.state.first_sequence,
52                last_sequence: info.state.last_sequence,
53                consumer_count: info.state.consumer_count,
54            },
55        })
56    }
57
58    /// Create a pull consumer builder.
59    ///
60    /// # Type Parameters
61    ///
62    /// * `T` - The message type for this consumer
63    ///
64    /// # Example
65    ///
66    /// ```no_run
67    /// use intercom::{Client, MsgPackCodec};
68    /// use serde::{Deserialize, Serialize};
69    ///
70    /// #[derive(Serialize, Deserialize, Debug)]
71    /// struct Event { id: u64 }
72    ///
73    /// # async fn example() -> intercom::Result<()> {
74    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
75    /// let jetstream = client.jetstream();
76    /// let stream = jetstream.get_stream("events").await?;
77    ///
78    /// let consumer = stream
79    ///     .pull_consumer_builder::<Event>("my-consumer")
80    ///     .durable()
81    ///     .filter_subject("events.user.>")
82    ///     .create()
83    ///     .await?;
84    /// # Ok(())
85    /// # }
86    /// ```
87    pub fn pull_consumer_builder<T>(&self, name: &str) -> PullConsumerBuilder<T, C> {
88        PullConsumerBuilder::new(self.inner.clone(), name.to_string())
89    }
90
91    /// Create a push consumer builder.
92    ///
93    /// # Type Parameters
94    ///
95    /// * `T` - The message type for this consumer
96    ///
97    /// # Example
98    ///
99    /// ```no_run
100    /// use intercom::{Client, MsgPackCodec};
101    /// use serde::{Deserialize, Serialize};
102    ///
103    /// #[derive(Serialize, Deserialize, Debug)]
104    /// struct Event { id: u64 }
105    ///
106    /// # async fn example() -> intercom::Result<()> {
107    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
108    /// let jetstream = client.jetstream();
109    /// let stream = jetstream.get_stream("events").await?;
110    ///
111    /// let consumer = stream
112    ///     .push_consumer_builder::<Event>("my-push-consumer")
113    ///     .deliver_subject("deliver.events")
114    ///     .create()
115    ///     .await?;
116    /// # Ok(())
117    /// # }
118    /// ```
119    pub fn push_consumer_builder<T>(&self, name: &str) -> PushConsumerBuilder<T, C> {
120        PushConsumerBuilder::new(self.inner.clone(), name.to_string())
121    }
122
123    /// Get an existing pull consumer by name.
124    pub async fn get_pull_consumer<T>(
125        &self,
126        name: &str,
127    ) -> Result<super::consumer::PullConsumer<T, C>> {
128        let inner: async_nats::jetstream::consumer::Consumer<
129            async_nats::jetstream::consumer::pull::Config,
130        > = self
131            .inner
132            .get_consumer(name)
133            .await
134            .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
135        Ok(super::consumer::PullConsumer::new(inner))
136    }
137
138    /// Get an existing push consumer by name.
139    pub async fn get_push_consumer<T>(
140        &self,
141        name: &str,
142    ) -> Result<super::consumer::PushConsumer<T, C>> {
143        let inner: async_nats::jetstream::consumer::Consumer<
144            async_nats::jetstream::consumer::push::Config,
145        > = self
146            .inner
147            .get_consumer(name)
148            .await
149            .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
150        Ok(super::consumer::PushConsumer::new(inner))
151    }
152
153    /// Delete a consumer by name.
154    pub async fn delete_consumer(&self, name: &str) -> Result<()> {
155        self.inner
156            .delete_consumer(name)
157            .await
158            .map_err(|e| Error::JetStreamConsumer(e.to_string()))?;
159        Ok(())
160    }
161
162    /// Purge all messages from the stream.
163    pub async fn purge(&self) -> Result<u64> {
164        let response = self
165            .inner
166            .clone()
167            .purge()
168            .await
169            .map_err(|e| Error::JetStreamStream(e.to_string()))?;
170        Ok(response.purged)
171    }
172
173    /// Purge messages matching a filter subject.
174    pub async fn purge_subject(&self, filter: &str) -> Result<u64> {
175        let response = self
176            .inner
177            .clone()
178            .purge()
179            .filter(filter)
180            .await
181            .map_err(|e| Error::JetStreamStream(e.to_string()))?;
182        Ok(response.purged)
183    }
184}
185
186/// Builder for creating JetStream streams.
187pub struct StreamBuilder<C: CodecType> {
188    context: async_nats::jetstream::Context,
189    config: async_nats::jetstream::stream::Config,
190    _codec: PhantomData<C>,
191}
192
193impl<C: CodecType> StreamBuilder<C> {
194    /// Create a new stream builder.
195    pub(crate) fn new(context: async_nats::jetstream::Context, name: String) -> Self {
196        Self {
197            context,
198            config: async_nats::jetstream::stream::Config {
199                name,
200                ..Default::default()
201            },
202            _codec: PhantomData,
203        }
204    }
205
206    /// Set the subjects for this stream.
207    pub fn subjects(mut self, subjects: Vec<String>) -> Self {
208        self.config.subjects = subjects;
209        self
210    }
211
212    /// Add a single subject to this stream.
213    pub fn subject(mut self, subject: impl Into<String>) -> Self {
214        self.config.subjects.push(subject.into());
215        self
216    }
217
218    /// Set the description for this stream.
219    pub fn description(mut self, description: impl Into<String>) -> Self {
220        self.config.description = Some(description.into());
221        self
222    }
223
224    /// Set the retention policy.
225    pub fn retention(mut self, retention: RetentionPolicy) -> Self {
226        self.config.retention = retention.into();
227        self
228    }
229
230    /// Set the maximum number of messages.
231    pub fn max_messages(mut self, max: i64) -> Self {
232        self.config.max_messages = max;
233        self
234    }
235
236    /// Set the maximum number of messages per subject.
237    pub fn max_messages_per_subject(mut self, max: i64) -> Self {
238        self.config.max_messages_per_subject = max;
239        self
240    }
241
242    /// Set the maximum bytes.
243    pub fn max_bytes(mut self, max: i64) -> Self {
244        self.config.max_bytes = max;
245        self
246    }
247
248    /// Set the maximum message size.
249    pub fn max_message_size(mut self, max: i32) -> Self {
250        self.config.max_message_size = max;
251        self
252    }
253
254    /// Set the maximum age for messages.
255    pub fn max_age(mut self, age: std::time::Duration) -> Self {
256        self.config.max_age = age;
257        self
258    }
259
260    /// Set the maximum number of consumers.
261    pub fn max_consumers(mut self, max: i32) -> Self {
262        self.config.max_consumers = max;
263        self
264    }
265
266    /// Set the number of replicas.
267    pub fn replicas(mut self, replicas: usize) -> Self {
268        self.config.num_replicas = replicas;
269        self
270    }
271
272    /// Set the storage type.
273    pub fn storage(mut self, storage: StorageType) -> Self {
274        self.config.storage = storage.into();
275        self
276    }
277
278    /// Set the discard policy.
279    pub fn discard_policy(mut self, policy: DiscardPolicy) -> Self {
280        self.config.discard = policy.into();
281        self
282    }
283
284    /// Enable or disable duplicate detection window.
285    pub fn duplicate_window(mut self, window: std::time::Duration) -> Self {
286        self.config.duplicate_window = window;
287        self
288    }
289
290    /// Allow direct gets.
291    pub fn allow_direct(mut self, allow: bool) -> Self {
292        self.config.allow_direct = allow;
293        self
294    }
295
296    /// Enable mirroring from another stream.
297    pub fn mirror(mut self, source: StreamSource) -> Self {
298        self.config.mirror = Some(source.into());
299        self
300    }
301
302    /// Add a source stream.
303    pub fn add_source(mut self, source: StreamSource) -> Self {
304        self.config.sources.get_or_insert_with(Vec::new).push(source.into());
305        self
306    }
307
308    /// Set whether this is a sealed stream.
309    pub fn sealed(mut self, sealed: bool) -> Self {
310        self.config.sealed = sealed;
311        self
312    }
313
314    /// Set whether to deny delete.
315    pub fn deny_delete(mut self, deny: bool) -> Self {
316        self.config.deny_delete = deny;
317        self
318    }
319
320    /// Set whether to deny purge.
321    pub fn deny_purge(mut self, deny: bool) -> Self {
322        self.config.deny_purge = deny;
323        self
324    }
325
326    /// Set whether to allow rollup headers.
327    pub fn allow_rollup(mut self, allow: bool) -> Self {
328        self.config.allow_rollup = allow;
329        self
330    }
331
332    /// Set the compression type.
333    pub fn compression(mut self, compression: Compression) -> Self {
334        self.config.compression = Some(compression.into());
335        self
336    }
337
338    /// Set the first sequence number.
339    pub fn first_sequence(mut self, seq: u64) -> Self {
340        self.config.first_sequence = Some(seq);
341        self
342    }
343
344    /// Set a subject transform.
345    pub fn subject_transform(mut self, source: &str, destination: &str) -> Self {
346        self.config.subject_transform = Some(async_nats::jetstream::stream::SubjectTransform {
347            source: source.to_string(),
348            destination: destination.to_string(),
349        });
350        self
351    }
352
353    /// Create the stream.
354    pub async fn create(self) -> Result<Stream<C>> {
355        let inner = self
356            .context
357            .create_stream(self.config)
358            .await?;
359        Ok(Stream::new(inner))
360    }
361
362    /// Create or update the stream.
363    pub async fn create_or_update(self) -> Result<Stream<C>> {
364        let inner = self
365            .context
366            .get_or_create_stream(self.config)
367            .await?;
368        Ok(Stream::new(inner))
369    }
370}
371
372/// Retention policy for a stream.
373#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
374pub enum RetentionPolicy {
375    /// Messages are retained until the limits are reached.
376    #[default]
377    Limits,
378    /// Messages are retained until acknowledged by all consumers (interest-based).
379    Interest,
380    /// Messages are removed after being acknowledged (work queue).
381    WorkQueue,
382}
383
384impl From<RetentionPolicy> for async_nats::jetstream::stream::RetentionPolicy {
385    fn from(policy: RetentionPolicy) -> Self {
386        match policy {
387            RetentionPolicy::Limits => async_nats::jetstream::stream::RetentionPolicy::Limits,
388            RetentionPolicy::Interest => async_nats::jetstream::stream::RetentionPolicy::Interest,
389            RetentionPolicy::WorkQueue => async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
390        }
391    }
392}
393
394/// Storage type for a stream.
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum StorageType {
397    /// File-based storage.
398    #[default]
399    File,
400    /// Memory-based storage.
401    Memory,
402}
403
404impl From<StorageType> for async_nats::jetstream::stream::StorageType {
405    fn from(storage: StorageType) -> Self {
406        match storage {
407            StorageType::File => async_nats::jetstream::stream::StorageType::File,
408            StorageType::Memory => async_nats::jetstream::stream::StorageType::Memory,
409        }
410    }
411}
412
413/// Discard policy for a stream.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
415pub enum DiscardPolicy {
416    /// Discard old messages when limits are reached.
417    #[default]
418    Old,
419    /// Discard new messages when limits are reached.
420    New,
421}
422
423impl From<DiscardPolicy> for async_nats::jetstream::stream::DiscardPolicy {
424    fn from(policy: DiscardPolicy) -> Self {
425        match policy {
426            DiscardPolicy::Old => async_nats::jetstream::stream::DiscardPolicy::Old,
427            DiscardPolicy::New => async_nats::jetstream::stream::DiscardPolicy::New,
428        }
429    }
430}
431
432/// Compression type for a stream.
433#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
434pub enum Compression {
435    /// No compression.
436    #[default]
437    None,
438    /// S2 compression.
439    S2,
440}
441
442impl From<Compression> for async_nats::jetstream::stream::Compression {
443    fn from(compression: Compression) -> Self {
444        match compression {
445            Compression::None => async_nats::jetstream::stream::Compression::None,
446            Compression::S2 => async_nats::jetstream::stream::Compression::S2,
447        }
448    }
449}
450
451/// Stream source configuration.
452#[derive(Debug, Clone)]
453pub struct StreamSource {
454    /// The source stream name.
455    pub name: String,
456    /// Optional starting sequence.
457    pub start_seq: Option<u64>,
458    /// Optional starting time.
459    pub start_time: Option<time::OffsetDateTime>,
460    /// Optional filter subject.
461    pub filter_subject: Option<String>,
462}
463
464impl StreamSource {
465    /// Create a new stream source.
466    pub fn new(name: impl Into<String>) -> Self {
467        Self {
468            name: name.into(),
469            start_seq: None,
470            start_time: None,
471            filter_subject: None,
472        }
473    }
474
475    /// Set the starting sequence.
476    pub fn start_seq(mut self, seq: u64) -> Self {
477        self.start_seq = Some(seq);
478        self
479    }
480
481    /// Set the starting time.
482    pub fn start_time(mut self, time: time::OffsetDateTime) -> Self {
483        self.start_time = Some(time);
484        self
485    }
486
487    /// Set the filter subject.
488    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
489        self.filter_subject = Some(subject.into());
490        self
491    }
492}
493
494impl From<StreamSource> for async_nats::jetstream::stream::Source {
495    fn from(source: StreamSource) -> Self {
496        let mut s = async_nats::jetstream::stream::Source {
497            name: source.name,
498            ..Default::default()
499        };
500        if let Some(seq) = source.start_seq {
501            s.start_sequence = Some(seq);
502        }
503        if let Some(time) = source.start_time {
504            s.start_time = Some(time);
505        }
506        if let Some(subject) = source.filter_subject {
507            s.filter_subject = Some(subject);
508        }
509        s
510    }
511}
512
513/// Stream configuration.
514#[derive(Debug, Clone)]
515pub struct StreamConfig {
516    /// Stream name.
517    pub name: String,
518    /// Stream description.
519    pub description: Option<String>,
520    /// Subjects this stream listens on.
521    pub subjects: Vec<String>,
522    /// Retention policy.
523    pub retention: RetentionPolicy,
524    /// Maximum messages.
525    pub max_messages: i64,
526    /// Maximum bytes.
527    pub max_bytes: i64,
528    /// Maximum age.
529    pub max_age: std::time::Duration,
530    /// Maximum message size.
531    pub max_message_size: i32,
532    /// Storage type.
533    pub storage: StorageType,
534    /// Number of replicas.
535    pub replicas: usize,
536}
537
538impl StreamConfig {
539    /// Convert from native async-nats config.
540    pub(crate) fn from_native(config: &async_nats::jetstream::stream::Config) -> Self {
541        Self {
542            name: config.name.clone(),
543            description: config.description.clone(),
544            subjects: config.subjects.clone(),
545            retention: match config.retention {
546                async_nats::jetstream::stream::RetentionPolicy::Limits => RetentionPolicy::Limits,
547                async_nats::jetstream::stream::RetentionPolicy::Interest => {
548                    RetentionPolicy::Interest
549                }
550                async_nats::jetstream::stream::RetentionPolicy::WorkQueue => {
551                    RetentionPolicy::WorkQueue
552                }
553            },
554            max_messages: config.max_messages,
555            max_bytes: config.max_bytes,
556            max_age: config.max_age,
557            max_message_size: config.max_message_size,
558            storage: match config.storage {
559                async_nats::jetstream::stream::StorageType::File => StorageType::File,
560                async_nats::jetstream::stream::StorageType::Memory => StorageType::Memory,
561            },
562            replicas: config.num_replicas,
563        }
564    }
565}
566
567/// Stream state information.
568#[derive(Debug, Clone)]
569pub struct StreamState {
570    /// Number of messages.
571    pub messages: u64,
572    /// Total bytes.
573    pub bytes: u64,
574    /// First sequence number.
575    pub first_sequence: u64,
576    /// Last sequence number.
577    pub last_sequence: u64,
578    /// Number of consumers.
579    pub consumer_count: usize,
580}
581
582/// Stream information.
583#[derive(Debug, Clone)]
584pub struct StreamInfo {
585    /// Stream configuration.
586    pub config: StreamConfig,
587    /// Stream state.
588    pub state: StreamState,
589}