intercom_rs/jetstream/queue.rs
1//! Work queue support for JetStream.
2//!
3//! This module provides a convenient interface for creating and using work queues
4//! with JetStream, including support for interest-based consumers.
5
6use std::{
7 marker::PhantomData,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use futures::{Sink, Stream};
13use pin_project_lite::pin_project;
14use serde::{de::DeserializeOwned, Serialize};
15
16use crate::{
17 codec::CodecType,
18 error::{Error, Result},
19 jetstream::{
20 consumer::{JetStreamMessage, PullBatch, PullConsumer, PullMessages},
21 stream::{RetentionPolicy, Stream as JsStream, StreamBuilder},
22 },
23};
24
25/// A typed work queue backed by JetStream with configurable codec.
26///
27/// Work queues provide at-least-once delivery with automatic acknowledgment tracking.
28/// Messages are removed from the queue once acknowledged.
29///
30/// # Type Parameters
31///
32/// * `T` - The message type for this queue
33/// * `C` - The codec type used for serialization
34///
35/// # Example
36///
37/// ```no_run
38/// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
39/// use serde::{Deserialize, Serialize};
40/// use futures::StreamExt;
41///
42/// #[derive(Serialize, Deserialize, Debug)]
43/// struct Job {
44/// id: u64,
45/// payload: String,
46/// }
47///
48/// # async fn example() -> intercom::Result<()> {
49/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
50/// let jetstream = client.jetstream();
51///
52/// // Create a work queue
53/// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
54/// .max_messages(10_000)
55/// .create()
56/// .await?;
57///
58/// // Push a job
59/// queue.push(&Job { id: 1, payload: "do work".into() }).await?;
60///
61/// // Option 1: Use as a Stream (pulls one job at a time)
62/// let mut queue = queue.into_stream().await?;
63/// while let Some(result) = queue.next().await {
64/// let job = result?;
65/// println!("Processing: {:?}", job.payload);
66/// job.ack().await?;
67/// }
68///
69/// // Option 2: Pull a batch of jobs
70/// // let mut jobs = queue.pull(10).await?;
71/// // while let Some(result) = jobs.next().await {
72/// // let job = result?;
73/// // job.ack().await?;
74/// // }
75/// # Ok(())
76/// # }
77/// ```
78pub struct WorkQueue<T, C: CodecType> {
79 stream: JsStream<C>,
80 consumer: PullConsumer<T, C>,
81 subject: String,
82 context: async_nats::jetstream::Context,
83 _marker: PhantomData<(T, C)>,
84}
85
86impl<T, C: CodecType> WorkQueue<T, C> {
87 /// Create a work queue builder.
88 pub fn builder(
89 context: &crate::jetstream::context::JetStreamContext<C>,
90 name: &str,
91 ) -> WorkQueueBuilder<T, C> {
92 WorkQueueBuilder::new(context.inner().clone(), name.to_string())
93 }
94
95 /// Get the stream backing this queue.
96 pub fn stream(&self) -> &JsStream<C> {
97 &self.stream
98 }
99
100 /// Get the consumer for this queue.
101 pub fn consumer(&self) -> &PullConsumer<T, C> {
102 &self.consumer
103 }
104
105 /// Get the subject for this queue.
106 pub fn subject(&self) -> &str {
107 &self.subject
108 }
109}
110
111impl<T: Serialize, C: CodecType> WorkQueue<T, C> {
112 /// Push a message to the queue.
113 pub async fn push(&self, message: &T) -> Result<u64> {
114 let data = C::encode(message)?;
115 let ack = self
116 .context
117 .publish(self.subject.clone(), data.into())
118 .await
119 .map_err(|e| Error::JetStream(e.to_string()))?
120 .await
121 .map_err(|e| Error::JetStream(e.to_string()))?;
122 Ok(ack.sequence)
123 }
124
125 /// Create a sink for pushing messages.
126 pub fn sink(&self) -> WorkQueueSink<T, C> {
127 WorkQueueSink::new(self.context.clone(), self.subject.clone())
128 }
129}
130
131impl<T: DeserializeOwned, C: CodecType> WorkQueue<T, C> {
132 /// Pull a batch of messages from the queue.
133 pub async fn pull(&self, batch_size: usize) -> Result<PullBatch<T, C>> {
134 self.consumer.fetch(batch_size).await
135 }
136
137 /// Get a continuous stream of messages.
138 pub async fn messages(&self) -> Result<crate::jetstream::consumer::PullMessages<T, C>> {
139 self.consumer.messages().await
140 }
141
142 /// Convert this queue into a [`Stream`] that continuously pulls messages one at a time.
143 ///
144 /// This is the most ergonomic way to process queue messages when you want to handle
145 /// them one at a time in a loop.
146 ///
147 /// # Example
148 ///
149 /// ```no_run
150 /// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
151 /// use serde::{Deserialize, Serialize};
152 /// use futures::StreamExt;
153 ///
154 /// #[derive(Serialize, Deserialize, Debug)]
155 /// struct Job { id: u64 }
156 ///
157 /// # async fn example() -> intercom::Result<()> {
158 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
159 /// let jetstream = client.jetstream();
160 /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
161 ///
162 /// // Convert to a Stream and iterate
163 /// let mut queue = queue.into_stream().await?;
164 /// while let Some(job) = queue.next().await {
165 /// let job = job?;
166 /// println!("Processing job: {:?}", job.payload);
167 /// job.ack().await?;
168 /// }
169 /// # Ok(())
170 /// # }
171 /// ```
172 pub async fn into_stream(self) -> Result<StreamingWorkQueue<T, C>> {
173 let messages = self.consumer.messages().await?;
174 Ok(StreamingWorkQueue {
175 messages,
176 context: self.context,
177 subject: self.subject,
178 _marker: PhantomData,
179 })
180 }
181}
182
183pin_project! {
184 /// A work queue that implements [`Stream`] for continuous message processing.
185 ///
186 /// Created from [`WorkQueue::into_stream`]. This type allows using the queue
187 /// directly with `StreamExt` methods like `next()`.
188 ///
189 /// # Example
190 ///
191 /// ```no_run
192 /// use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
193 /// use serde::{Deserialize, Serialize};
194 /// use futures::StreamExt;
195 ///
196 /// #[derive(Serialize, Deserialize, Debug)]
197 /// struct Job { id: u64 }
198 ///
199 /// # async fn example() -> intercom::Result<()> {
200 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
201 /// let jetstream = client.jetstream();
202 /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
203 ///
204 /// let mut streaming = queue.into_stream().await?;
205 /// while let Some(job) = streaming.next().await {
206 /// let job = job?;
207 /// job.ack().await?;
208 /// }
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub struct StreamingWorkQueue<T, C: CodecType> {
213 #[pin]
214 messages: PullMessages<T, C>,
215 context: async_nats::jetstream::Context,
216 subject: String,
217 _marker: PhantomData<(T, C)>,
218 }
219}
220
221impl<T: Serialize, C: CodecType> StreamingWorkQueue<T, C> {
222 /// Push a message to the queue.
223 ///
224 /// Note: This is available even while streaming, allowing you to add work
225 /// to the queue while processing existing messages.
226 pub async fn push(&self, message: &T) -> Result<u64> {
227 let data = C::encode(message)?;
228 let ack = self
229 .context
230 .publish(self.subject.clone(), data.into())
231 .await
232 .map_err(|e| Error::JetStream(e.to_string()))?
233 .await
234 .map_err(|e| Error::JetStream(e.to_string()))?;
235 Ok(ack.sequence)
236 }
237}
238
239impl<T: DeserializeOwned, C: CodecType> Stream for StreamingWorkQueue<T, C> {
240 type Item = Result<JetStreamMessage<T>>;
241
242 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 self.project().messages.poll_next(cx)
244 }
245}
246
247/// Builder for work queues.
248pub struct WorkQueueBuilder<T, C: CodecType> {
249 context: async_nats::jetstream::Context,
250 name: String,
251 subject: Option<String>,
252 max_messages: i64,
253 max_bytes: i64,
254 max_age: std::time::Duration,
255 replicas: usize,
256 _marker: PhantomData<(T, C)>,
257}
258
259impl<T, C: CodecType> WorkQueueBuilder<T, C> {
260 /// Create a new work queue builder.
261 fn new(context: async_nats::jetstream::Context, name: String) -> Self {
262 Self {
263 context,
264 name,
265 subject: None,
266 max_messages: -1,
267 max_bytes: -1,
268 max_age: std::time::Duration::ZERO,
269 replicas: 1,
270 _marker: PhantomData,
271 }
272 }
273
274 /// Set the subject for the queue.
275 ///
276 /// If not set, defaults to the queue name.
277 pub fn subject(mut self, subject: impl Into<String>) -> Self {
278 self.subject = Some(subject.into());
279 self
280 }
281
282 /// Set the maximum number of messages.
283 pub fn max_messages(mut self, max: i64) -> Self {
284 self.max_messages = max;
285 self
286 }
287
288 /// Set the maximum bytes.
289 pub fn max_bytes(mut self, max: i64) -> Self {
290 self.max_bytes = max;
291 self
292 }
293
294 /// Set the maximum age for messages.
295 pub fn max_age(mut self, age: std::time::Duration) -> Self {
296 self.max_age = age;
297 self
298 }
299
300 /// Set the number of replicas.
301 pub fn replicas(mut self, replicas: usize) -> Self {
302 self.replicas = replicas;
303 self
304 }
305
306 /// Create the work queue.
307 pub async fn create(self) -> Result<WorkQueue<T, C>> {
308 let subject = self.subject.unwrap_or_else(|| self.name.clone());
309 let consumer_name = format!("{}-worker", self.name);
310
311 // Create the stream with work queue retention
312 let stream_builder = StreamBuilder::<C>::new(self.context.clone(), self.name.clone())
313 .subjects(vec![subject.clone()])
314 .retention(RetentionPolicy::WorkQueue)
315 .max_messages(self.max_messages)
316 .max_bytes(self.max_bytes)
317 .max_age(self.max_age)
318 .replicas(self.replicas);
319
320 let stream = stream_builder.create_or_update().await?;
321
322 // Create a durable pull consumer
323 let consumer = stream
324 .pull_consumer_builder::<T>(&consumer_name)
325 .durable()
326 .create_or_update()
327 .await?;
328
329 Ok(WorkQueue {
330 stream,
331 consumer,
332 subject,
333 context: self.context,
334 _marker: PhantomData,
335 })
336 }
337}
338
339/// A sink for pushing messages to a work queue.
340pub struct WorkQueueSink<T, C: CodecType> {
341 context: async_nats::jetstream::Context,
342 subject: String,
343 _marker: PhantomData<(T, C)>,
344}
345
346impl<T, C: CodecType> WorkQueueSink<T, C> {
347 fn new(context: async_nats::jetstream::Context, subject: String) -> Self {
348 Self {
349 context,
350 subject,
351 _marker: PhantomData,
352 }
353 }
354}
355
356impl<T: Serialize, C: CodecType> WorkQueueSink<T, C> {
357 /// Publish a message directly with error handling.
358 ///
359 /// Unlike the [`Sink`] implementation, this method returns any publish errors
360 /// and waits for JetStream acknowledgment.
361 ///
362 /// # Example
363 ///
364 /// ```no_run
365 /// use intercom::{Client, MsgPackCodec, WorkQueue};
366 /// use serde::{Deserialize, Serialize};
367 ///
368 /// #[derive(Serialize, Deserialize)]
369 /// struct Job { id: u64 }
370 ///
371 /// # async fn example() -> intercom::Result<()> {
372 /// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
373 /// let jetstream = client.jetstream();
374 /// let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
375 /// let sink = queue.sink();
376 ///
377 /// // Direct publish with acknowledgment
378 /// let seq = sink.publish(&Job { id: 1 }).await?;
379 /// println!("Published at sequence: {}", seq);
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub async fn publish(&self, message: &T) -> Result<u64> {
384 let data = C::encode(message)?;
385 let ack = self
386 .context
387 .publish(self.subject.clone(), data.into())
388 .await
389 .map_err(|e| Error::JetStream(e.to_string()))?
390 .await
391 .map_err(|e| Error::JetStream(e.to_string()))?;
392 Ok(ack.sequence)
393 }
394}
395
396impl<T, C: CodecType> Clone for WorkQueueSink<T, C> {
397 fn clone(&self) -> Self {
398 Self {
399 context: self.context.clone(),
400 subject: self.subject.clone(),
401 _marker: PhantomData,
402 }
403 }
404}
405
406impl<T: Serialize, C: CodecType> Sink<T> for WorkQueueSink<T, C> {
407 type Error = Error;
408
409 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
410 Poll::Ready(Ok(()))
411 }
412
413 fn start_send(self: Pin<&mut Self>, item: T) -> Result<()> {
414 let data = C::encode(&item)?;
415 // Note: Uses fire-and-forget for Sink compatibility.
416 // Use WorkQueueSink::publish() for error handling and acknowledgment.
417 let context = self.context.clone();
418 let subject = self.subject.clone();
419 tokio::spawn(async move {
420 let _ = context.publish(subject, data.into()).await;
421 });
422 Ok(())
423 }
424
425 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
426 Poll::Ready(Ok(()))
427 }
428
429 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
430 self.poll_flush(cx)
431 }
432}
433
434// ============================================================================
435// Interest-Based Queue
436// ============================================================================
437
438/// A typed queue with interest-based retention and configurable codec.
439///
440/// Messages are retained until acknowledged by all consumers that were
441/// active when the message was published.
442///
443/// # Type Parameters
444///
445/// * `T` - The message type for this queue
446/// * `C` - The codec type used for serialization
447///
448/// # Example
449///
450/// ```no_run
451/// use intercom::{Client, MsgPackCodec, jetstream::queue::InterestQueue};
452/// use serde::{Deserialize, Serialize};
453///
454/// #[derive(Serialize, Deserialize, Debug)]
455/// struct Event {
456/// id: u64,
457/// data: String,
458/// }
459///
460/// # async fn example() -> intercom::Result<()> {
461/// let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
462/// let jetstream = client.jetstream();
463///
464/// // Create an interest-based queue
465/// let queue = InterestQueue::<Event, MsgPackCodec>::builder(&jetstream, "events")
466/// .subject("events.>")
467/// .create()
468/// .await?;
469///
470/// // Add consumers
471/// let consumer1 = queue.add_consumer("service-a").await?;
472/// let consumer2 = queue.add_consumer("service-b").await?;
473///
474/// // Both consumers must acknowledge for message removal
475/// # Ok(())
476/// # }
477/// ```
478pub struct InterestQueue<T, C: CodecType> {
479 stream: JsStream<C>,
480 subject: String,
481 context: async_nats::jetstream::Context,
482 _marker: PhantomData<(T, C)>,
483}
484
485impl<T, C: CodecType> InterestQueue<T, C> {
486 /// Create an interest queue builder.
487 pub fn builder(
488 context: &crate::jetstream::context::JetStreamContext<C>,
489 name: &str,
490 ) -> InterestQueueBuilder<T, C> {
491 InterestQueueBuilder::new(context.inner().clone(), name.to_string())
492 }
493
494 /// Get the stream backing this queue.
495 pub fn stream(&self) -> &JsStream<C> {
496 &self.stream
497 }
498
499 /// Get the subject for this queue.
500 pub fn subject(&self) -> &str {
501 &self.subject
502 }
503
504 /// Add a durable consumer to this queue.
505 pub async fn add_consumer(&self, name: &str) -> Result<PullConsumer<T, C>> {
506 self.stream
507 .pull_consumer_builder::<T>(name)
508 .durable()
509 .create_or_update()
510 .await
511 }
512
513 /// Add a consumer with a filter subject.
514 pub async fn add_consumer_filtered(
515 &self,
516 name: &str,
517 filter: &str,
518 ) -> Result<PullConsumer<T, C>> {
519 self.stream
520 .pull_consumer_builder::<T>(name)
521 .durable()
522 .filter_subject(filter)
523 .create_or_update()
524 .await
525 }
526}
527
528impl<T: Serialize, C: CodecType> InterestQueue<T, C> {
529 /// Publish a message to the queue.
530 pub async fn publish(&self, message: &T) -> Result<u64> {
531 let data = C::encode(message)?;
532 let ack = self
533 .context
534 .publish(self.subject.clone(), data.into())
535 .await
536 .map_err(|e| Error::JetStream(e.to_string()))?
537 .await
538 .map_err(|e| Error::JetStream(e.to_string()))?;
539 Ok(ack.sequence)
540 }
541
542 /// Publish a message to a specific subject within the queue's subject space.
543 pub async fn publish_to(&self, subject: &str, message: &T) -> Result<u64> {
544 let data = C::encode(message)?;
545 let ack = self
546 .context
547 .publish(subject.to_string(), data.into())
548 .await
549 .map_err(|e| Error::JetStream(e.to_string()))?
550 .await
551 .map_err(|e| Error::JetStream(e.to_string()))?;
552 Ok(ack.sequence)
553 }
554}
555
556/// Builder for interest-based queues.
557pub struct InterestQueueBuilder<T, C: CodecType> {
558 context: async_nats::jetstream::Context,
559 name: String,
560 subject: Option<String>,
561 max_messages: i64,
562 max_bytes: i64,
563 max_age: std::time::Duration,
564 replicas: usize,
565 _marker: PhantomData<(T, C)>,
566}
567
568impl<T, C: CodecType> InterestQueueBuilder<T, C> {
569 fn new(context: async_nats::jetstream::Context, name: String) -> Self {
570 Self {
571 context,
572 name,
573 subject: None,
574 max_messages: -1,
575 max_bytes: -1,
576 max_age: std::time::Duration::ZERO,
577 replicas: 1,
578 _marker: PhantomData,
579 }
580 }
581
582 /// Set the subject for the queue.
583 pub fn subject(mut self, subject: impl Into<String>) -> Self {
584 self.subject = Some(subject.into());
585 self
586 }
587
588 /// Set the maximum number of messages.
589 pub fn max_messages(mut self, max: i64) -> Self {
590 self.max_messages = max;
591 self
592 }
593
594 /// Set the maximum bytes.
595 pub fn max_bytes(mut self, max: i64) -> Self {
596 self.max_bytes = max;
597 self
598 }
599
600 /// Set the maximum age for messages.
601 pub fn max_age(mut self, age: std::time::Duration) -> Self {
602 self.max_age = age;
603 self
604 }
605
606 /// Set the number of replicas.
607 pub fn replicas(mut self, replicas: usize) -> Self {
608 self.replicas = replicas;
609 self
610 }
611
612 /// Create the interest queue.
613 pub async fn create(self) -> Result<InterestQueue<T, C>> {
614 let subject = self.subject.unwrap_or_else(|| self.name.clone());
615
616 // Create the stream with interest-based retention
617 let stream_builder = StreamBuilder::<C>::new(self.context.clone(), self.name.clone())
618 .subjects(vec![subject.clone()])
619 .retention(RetentionPolicy::Interest)
620 .max_messages(self.max_messages)
621 .max_bytes(self.max_bytes)
622 .max_age(self.max_age)
623 .replicas(self.replicas);
624
625 let stream = stream_builder.create_or_update().await?;
626
627 Ok(InterestQueue {
628 stream,
629 subject,
630 context: self.context,
631 _marker: PhantomData,
632 })
633 }
634}
635
636// ============================================================================
637// Queue Messages (alias for convenience)
638// ============================================================================
639
640/// A message from a work queue or interest queue.
641///
642/// This is an alias for [`JetStreamMessage`] for convenience.
643pub type QueueMessage<T> = JetStreamMessage<T>;