intercom_rs/jetstream/
queue.rs

1//! Work queue support for JetStream.
2//!
3//! This module provides a convenient interface for creating and using work queues
4//! with JetStream, including support for interest-based consumers.
5
6use std::{
7    marker::PhantomData,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use futures::{Sink, Stream};
13use pin_project_lite::pin_project;
14use serde::{de::DeserializeOwned, Serialize};
15
16use crate::{
17    codec::CodecType,
18    error::{Error, Result},
19    jetstream::{
20        consumer::{JetStreamMessage, PullBatch, PullConsumer, PullMessages},
21        stream::{RetentionPolicy, Stream as JsStream, StreamBuilder},
22    },
23};
24
25/// A typed work queue backed by JetStream with configurable codec.
26///
27/// Work queues provide at-least-once delivery with automatic acknowledgment tracking.
28/// Messages are removed from the queue once acknowledged.
29///
30/// # Type Parameters
31///
32/// * `T` - The message type for this queue
33/// * `C` - The codec type used for serialization
34///
35/// # Example
36///
37/// ```no_run
38/// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
39/// use serde::{Deserialize, Serialize};
40/// use futures::StreamExt;
41///
42/// #[derive(Serialize, Deserialize, Debug)]
43/// struct Job {
44///     id: u64,
45///     payload: String,
46/// }
47///
48/// # async fn example() -> intercom::Result<()> {
49/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
50/// let jetstream = client.jetstream();
51///
52/// // Create a work queue
53/// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
54///     .max_messages(10_000)
55///     .create()
56///     .await?;
57///
58/// // Push a job
59/// queue.push(&Job { id: 1, payload: "do work".into() }).await?;
60///
61/// // Option 1: Use as a Stream (pulls one job at a time)
62/// let mut queue = queue.into_stream().await?;
63/// while let Some(result) = queue.next().await {
64///     let job = result?;
65///     println!("Processing: {:?}", job.payload);
66///     job.ack().await?;
67/// }
68///
69/// // Option 2: Pull a batch of jobs
70/// // let mut jobs = queue.pull(10).await?;
71/// // while let Some(result) = jobs.next().await {
72/// //     let job = result?;
73/// //     job.ack().await?;
74/// // }
75/// # Ok(())
76/// # }
77/// ```
78pub struct WorkQueue<T, C: CodecType> {
79    stream: JsStream<C>,
80    consumer: PullConsumer<T, C>,
81    subject: String,
82    context: async_nats::jetstream::Context,
83    _marker: PhantomData<(T, C)>,
84}
85
86impl<T, C: CodecType> WorkQueue<T, C> {
87    /// Create a work queue builder.
88    pub fn builder(
89        context: &crate::jetstream::context::JetStreamContext<C>,
90        name: &str,
91    ) -> WorkQueueBuilder<T, C> {
92        WorkQueueBuilder::new(context.inner().clone(), name.to_string())
93    }
94
95    /// Get the stream backing this queue.
96    pub fn stream(&self) -> &JsStream<C> {
97        &self.stream
98    }
99
100    /// Get the consumer for this queue.
101    pub fn consumer(&self) -> &PullConsumer<T, C> {
102        &self.consumer
103    }
104
105    /// Get the subject for this queue.
106    pub fn subject(&self) -> &str {
107        &self.subject
108    }
109}
110
111impl<T: Serialize, C: CodecType> WorkQueue<T, C> {
112    /// Push a message to the queue.
113    pub async fn push(&self, message: &T) -> Result<u64> {
114        let data = C::encode(message)?;
115        let ack = self
116            .context
117            .publish(self.subject.clone(), data.into())
118            .await
119            .map_err(|e| Error::JetStream(e.to_string()))?
120            .await
121            .map_err(|e| Error::JetStream(e.to_string()))?;
122        Ok(ack.sequence)
123    }
124
125    /// Create a sink for pushing messages.
126    pub fn sink(&self) -> WorkQueueSink<T, C> {
127        WorkQueueSink::new(self.context.clone(), self.subject.clone())
128    }
129}
130
131impl<T: DeserializeOwned, C: CodecType> WorkQueue<T, C> {
132    /// Pull a batch of messages from the queue.
133    pub async fn pull(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
134        self.consumer.fetch(batch_size).await
135    }
136
137    /// Get a continuous stream of messages.
138    pub async fn messages(&self) -> Result<crate::jetstream::consumer::PullMessages<T, C>> {
139        self.consumer.messages().await
140    }
141
142    /// Convert this queue into a [`Stream`] that continuously pulls messages one at a time.
143    ///
144    /// This is the most ergonomic way to process queue messages when you want to handle
145    /// them one at a time in a loop.
146    ///
147    /// # Example
148    ///
149    /// ```no_run
150    /// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
151    /// use serde::{Deserialize, Serialize};
152    /// use futures::StreamExt;
153    ///
154    /// #[derive(Serialize, Deserialize, Debug)]
155    /// struct Job { id: u64 }
156    ///
157    /// # async fn example() -> intercom::Result<()> {
158    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
159    /// let jetstream = client.jetstream();
160    /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
161    ///
162    /// // Convert to a Stream and iterate
163    /// let mut queue = queue.into_stream().await?;
164    /// while let Some(job) = queue.next().await {
165    ///     let job = job?;
166    ///     println!("Processing job: {:?}", job.payload);
167    ///     job.ack().await?;
168    /// }
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub async fn into_stream(self) -> Result<StreamingWorkQueue<T, C>> {
173        let messages = self.consumer.messages().await?;
174        Ok(StreamingWorkQueue {
175            messages,
176            context: self.context,
177            subject: self.subject,
178            _marker: PhantomData,
179        })
180    }
181}
182
183pin_project! {
184    /// A work queue that implements [`Stream`] for continuous message processing.
185    ///
186    /// Created from [`WorkQueue::into_stream`]. This type allows using the queue
187    /// directly with `StreamExt` methods like `next()`.
188    ///
189    /// # Example
190    ///
191    /// ```no_run
192    /// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
193    /// use serde::{Deserialize, Serialize};
194    /// use futures::StreamExt;
195    ///
196    /// #[derive(Serialize, Deserialize, Debug)]
197    /// struct Job { id: u64 }
198    ///
199    /// # async fn example() -> intercom::Result<()> {
200    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
201    /// let jetstream = client.jetstream();
202    /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
203    ///
204    /// let mut streaming = queue.into_stream().await?;
205    /// while let Some(job) = streaming.next().await {
206    ///     let job = job?;
207    ///     job.ack().await?;
208    /// }
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub struct StreamingWorkQueue<T, C: CodecType> {
213        #[pin]
214        messages: PullMessages<T, C>,
215        context: async_nats::jetstream::Context,
216        subject: String,
217        _marker: PhantomData<(T, C)>,
218    }
219}
220
221impl<T: Serialize, C: CodecType> StreamingWorkQueue<T, C> {
222    /// Push a message to the queue.
223    ///
224    /// Note: This is available even while streaming, allowing you to add work
225    /// to the queue while processing existing messages.
226    pub async fn push(&self, message: &T) -> Result<u64> {
227        let data = C::encode(message)?;
228        let ack = self
229            .context
230            .publish(self.subject.clone(), data.into())
231            .await
232            .map_err(|e| Error::JetStream(e.to_string()))?
233            .await
234            .map_err(|e| Error::JetStream(e.to_string()))?;
235        Ok(ack.sequence)
236    }
237}
238
239impl<T: DeserializeOwned, C: CodecType> Stream for StreamingWorkQueue<T, C> {
240    type Item = Result<JetStreamMessage<T>>;
241
242    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        self.project().messages.poll_next(cx)
244    }
245}
246
247/// Builder for work queues.
248pub struct WorkQueueBuilder<T, C: CodecType> {
249    context: async_nats::jetstream::Context,
250    name: String,
251    subject: Option<String>,
252    max_messages: i64,
253    max_bytes: i64,
254    max_age: std::time::Duration,
255    replicas: usize,
256    _marker: PhantomData<(T, C)>,
257}
258
259impl<T, C: CodecType> WorkQueueBuilder<T, C> {
260    /// Create a new work queue builder.
261    fn new(context: async_nats::jetstream::Context, name: String) -> Self {
262        Self {
263            context,
264            name,
265            subject: None,
266            max_messages: -1,
267            max_bytes: -1,
268            max_age: std::time::Duration::ZERO,
269            replicas: 1,
270            _marker: PhantomData,
271        }
272    }
273
274    /// Set the subject for the queue.
275    ///
276    /// If not set, defaults to the queue name.
277    pub fn subject(mut self, subject: impl Into<String>) -> Self {
278        self.subject = Some(subject.into());
279        self
280    }
281
282    /// Set the maximum number of messages.
283    pub fn max_messages(mut self, max: i64) -> Self {
284        self.max_messages = max;
285        self
286    }
287
288    /// Set the maximum bytes.
289    pub fn max_bytes(mut self, max: i64) -> Self {
290        self.max_bytes = max;
291        self
292    }
293
294    /// Set the maximum age for messages.
295    pub fn max_age(mut self, age: std::time::Duration) -> Self {
296        self.max_age = age;
297        self
298    }
299
300    /// Set the number of replicas.
301    pub fn replicas(mut self, replicas: usize) -> Self {
302        self.replicas = replicas;
303        self
304    }
305
306    /// Create the work queue.
307    pub async fn create(self) -> Result<WorkQueue<T, C>> {
308        let subject = self.subject.unwrap_or_else(|| self.name.clone());
309        let consumer_name = format!("{}-worker", self.name);
310
311        // Create the stream with work queue retention
312        let stream_builder = StreamBuilder::<C>::new(self.context.clone(), self.name.clone())
313            .subjects(vec![subject.clone()])
314            .retention(RetentionPolicy::WorkQueue)
315            .max_messages(self.max_messages)
316            .max_bytes(self.max_bytes)
317            .max_age(self.max_age)
318            .replicas(self.replicas);
319
320        let stream = stream_builder.create_or_update().await?;
321
322        // Create a durable pull consumer
323        let consumer = stream
324            .pull_consumer_builder::<T>(&consumer_name)
325            .durable()
326            .create_or_update()
327            .await?;
328
329        Ok(WorkQueue {
330            stream,
331            consumer,
332            subject,
333            context: self.context,
334            _marker: PhantomData,
335        })
336    }
337}
338
339/// A sink for pushing messages to a work queue.
340pub struct WorkQueueSink<T, C: CodecType> {
341    context: async_nats::jetstream::Context,
342    subject: String,
343    _marker: PhantomData<(T, C)>,
344}
345
346impl<T, C: CodecType> WorkQueueSink<T, C> {
347    fn new(context: async_nats::jetstream::Context, subject: String) -> Self {
348        Self {
349            context,
350            subject,
351            _marker: PhantomData,
352        }
353    }
354}
355
356impl<T: Serialize, C: CodecType> WorkQueueSink<T, C> {
357    /// Publish a message directly with error handling.
358    ///
359    /// Unlike the [`Sink`] implementation, this method returns any publish errors
360    /// and waits for JetStream acknowledgment.
361    ///
362    /// # Example
363    ///
364    /// ```no_run
365    /// use intercom::{Client, MsgPackCodec, WorkQueue};
366    /// use serde::{Deserialize, Serialize};
367    ///
368    /// #[derive(Serialize, Deserialize)]
369    /// struct Job { id: u64 }
370    ///
371    /// # async fn example() -> intercom::Result<()> {
372    /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
373    /// let jetstream = client.jetstream();
374    /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
375    /// let sink = queue.sink();
376    ///
377    /// // Direct publish with acknowledgment
378    /// let seq = sink.publish(&Job { id: 1 }).await?;
379    /// println!("Published at sequence: {}", seq);
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub async fn publish(&self, message: &T) -> Result<u64> {
384        let data = C::encode(message)?;
385        let ack = self
386            .context
387            .publish(self.subject.clone(), data.into())
388            .await
389            .map_err(|e| Error::JetStream(e.to_string()))?
390            .await
391            .map_err(|e| Error::JetStream(e.to_string()))?;
392        Ok(ack.sequence)
393    }
394}
395
396impl<T, C: CodecType> Clone for WorkQueueSink<T, C> {
397    fn clone(&self) -> Self {
398        Self {
399            context: self.context.clone(),
400            subject: self.subject.clone(),
401            _marker: PhantomData,
402        }
403    }
404}
405
406impl<T: Serialize, C: CodecType> Sink<T> for WorkQueueSink<T, C> {
407    type Error = Error;
408
409    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
410        Poll::Ready(Ok(()))
411    }
412
413    fn start_send(self: Pin<&mut Self>, item: T) -> Result<()> {
414        let data = C::encode(&item)?;
415        // Note: Uses fire-and-forget for Sink compatibility.
416        // Use WorkQueueSink::publish() for error handling and acknowledgment.
417        let context = self.context.clone();
418        let subject = self.subject.clone();
419        tokio::spawn(async move {
420            let _ = context.publish(subject, data.into()).await;
421        });
422        Ok(())
423    }
424
425    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
426        Poll::Ready(Ok(()))
427    }
428
429    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
430        self.poll_flush(cx)
431    }
432}
433
434// ============================================================================
435// Interest-Based Queue
436// ============================================================================
437
438/// A typed queue with interest-based retention and configurable codec.
439///
440/// Messages are retained until acknowledged by all consumers that were
441/// active when the message was published.
442///
443/// # Type Parameters
444///
445/// * `T` - The message type for this queue
446/// * `C` - The codec type used for serialization
447///
448/// # Example
449///
450/// ```no_run
451/// use intercom::{Client, MsgPackCodec, jetstream::queue::InterestQueue};
452/// use serde::{Deserialize, Serialize};
453///
454/// #[derive(Serialize, Deserialize, Debug)]
455/// struct Event {
456///     id: u64,
457///     data: String,
458/// }
459///
460/// # async fn example() -> intercom::Result<()> {
461/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
462/// let jetstream = client.jetstream();
463///
464/// // Create an interest-based queue
465/// let queue = InterestQueue::<Event, MsgPackCodec>::builder(&jetstream, "events")
466///     .subject("events.>")
467///     .create()
468///     .await?;
469///
470/// // Add consumers
471/// let consumer1 = queue.add_consumer("service-a").await?;
472/// let consumer2 = queue.add_consumer("service-b").await?;
473///
474/// // Both consumers must acknowledge for message removal
475/// # Ok(())
476/// # }
477/// ```
478pub struct InterestQueue<T, C: CodecType> {
479    stream: JsStream<C>,
480    subject: String,
481    context: async_nats::jetstream::Context,
482    _marker: PhantomData<(T, C)>,
483}
484
485impl<T, C: CodecType> InterestQueue<T, C> {
486    /// Create an interest queue builder.
487    pub fn builder(
488        context: &crate::jetstream::context::JetStreamContext<C>,
489        name: &str,
490    ) -> InterestQueueBuilder<T, C> {
491        InterestQueueBuilder::new(context.inner().clone(), name.to_string())
492    }
493
494    /// Get the stream backing this queue.
495    pub fn stream(&self) -> &JsStream<C> {
496        &self.stream
497    }
498
499    /// Get the subject for this queue.
500    pub fn subject(&self) -> &str {
501        &self.subject
502    }
503
504    /// Add a durable consumer to this queue.
505    pub async fn add_consumer(&self, name: &str) -> Result<PullConsumer<T, C>> {
506        self.stream
507            .pull_consumer_builder::<T>(name)
508            .durable()
509            .create_or_update()
510            .await
511    }
512
513    /// Add a consumer with a filter subject.
514    pub async fn add_consumer_filtered(
515        &self,
516        name: &str,
517        filter: &str,
518    ) -> Result<PullConsumer<T, C>> {
519        self.stream
520            .pull_consumer_builder::<T>(name)
521            .durable()
522            .filter_subject(filter)
523            .create_or_update()
524            .await
525    }
526}
527
528impl<T: Serialize, C: CodecType> InterestQueue<T, C> {
529    /// Publish a message to the queue.
530    pub async fn publish(&self, message: &T) -> Result<u64> {
531        let data = C::encode(message)?;
532        let ack = self
533            .context
534            .publish(self.subject.clone(), data.into())
535            .await
536            .map_err(|e| Error::JetStream(e.to_string()))?
537            .await
538            .map_err(|e| Error::JetStream(e.to_string()))?;
539        Ok(ack.sequence)
540    }
541
542    /// Publish a message to a specific subject within the queue's subject space.
543    pub async fn publish_to(&self, subject: &str, message: &T) -> Result<u64> {
544        let data = C::encode(message)?;
545        let ack = self
546            .context
547            .publish(subject.to_string(), data.into())
548            .await
549            .map_err(|e| Error::JetStream(e.to_string()))?
550            .await
551            .map_err(|e| Error::JetStream(e.to_string()))?;
552        Ok(ack.sequence)
553    }
554}
555
556/// Builder for interest-based queues.
557pub struct InterestQueueBuilder<T, C: CodecType> {
558    context: async_nats::jetstream::Context,
559    name: String,
560    subject: Option<String>,
561    max_messages: i64,
562    max_bytes: i64,
563    max_age: std::time::Duration,
564    replicas: usize,
565    _marker: PhantomData<(T, C)>,
566}
567
568impl<T, C: CodecType> InterestQueueBuilder<T, C> {
569    fn new(context: async_nats::jetstream::Context, name: String) -> Self {
570        Self {
571            context,
572            name,
573            subject: None,
574            max_messages: -1,
575            max_bytes: -1,
576            max_age: std::time::Duration::ZERO,
577            replicas: 1,
578            _marker: PhantomData,
579        }
580    }
581
582    /// Set the subject for the queue.
583    pub fn subject(mut self, subject: impl Into<String>) -> Self {
584        self.subject = Some(subject.into());
585        self
586    }
587
588    /// Set the maximum number of messages.
589    pub fn max_messages(mut self, max: i64) -> Self {
590        self.max_messages = max;
591        self
592    }
593
594    /// Set the maximum bytes.
595    pub fn max_bytes(mut self, max: i64) -> Self {
596        self.max_bytes = max;
597        self
598    }
599
600    /// Set the maximum age for messages.
601    pub fn max_age(mut self, age: std::time::Duration) -> Self {
602        self.max_age = age;
603        self
604    }
605
606    /// Set the number of replicas.
607    pub fn replicas(mut self, replicas: usize) -> Self {
608        self.replicas = replicas;
609        self
610    }
611
612    /// Create the interest queue.
613    pub async fn create(self) -> Result<InterestQueue<T, C>> {
614        let subject = self.subject.unwrap_or_else(|| self.name.clone());
615
616        // Create the stream with interest-based retention
617        let stream_builder = StreamBuilder::<C>::new(self.context.clone(), self.name.clone())
618            .subjects(vec![subject.clone()])
619            .retention(RetentionPolicy::Interest)
620            .max_messages(self.max_messages)
621            .max_bytes(self.max_bytes)
622            .max_age(self.max_age)
623            .replicas(self.replicas);
624
625        let stream = stream_builder.create_or_update().await?;
626
627        Ok(InterestQueue {
628            stream,
629            subject,
630            context: self.context,
631            _marker: PhantomData,
632        })
633    }
634}
635
636// ============================================================================
637// Queue Messages (alias for convenience)
638// ============================================================================
639
640/// A message from a work queue or interest queue.
641///
642/// This is an alias for [`JetStreamMessage`] for convenience.
643pub type QueueMessage<T> = JetStreamMessage<T>;