pub struct WorkQueue<T, C: CodecType> { /* private fields */ }Expand description
A typed work queue backed by JetStream with configurable codec.
Work queues provide at-least-once delivery with automatic acknowledgment tracking. Messages are removed from the queue once acknowledged.
§Type Parameters
T- The message type for this queueC- The codec type used for serialization
§Example
use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Job {
id: u64,
payload: String,
}
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Create a work queue
let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
.max_messages(10_000)
.create()
.await?;
// Push a job
queue.push(&Job { id: 1, payload: "do work".into() }).await?;
// Option 1: Use as a Stream (pulls one job at a time)
let mut queue = queue.into_stream().await?;
while let Some(result) = queue.next().await {
let job = result?;
println!("Processing: {:?}", job.payload);
job.ack().await?;
}
// Option 2: Pull a batch of jobs
// let mut jobs = queue.pull(10).await?;
// while let Some(result) = jobs.next().await {
// let job = result?;
// job.ack().await?;
// }Implementations§
Source§impl<T, C: CodecType> WorkQueue<T, C>
impl<T, C: CodecType> WorkQueue<T, C>
Sourcepub fn builder(
context: &JetStreamContext<C>,
name: &str,
) -> WorkQueueBuilder<T, C>
pub fn builder( context: &JetStreamContext<C>, name: &str, ) -> WorkQueueBuilder<T, C>
Create a work queue builder.
Sourcepub fn consumer(&self) -> &PullConsumer<T, C>
pub fn consumer(&self) -> &PullConsumer<T, C>
Get the consumer for this queue.
Source§impl<T: DeserializeOwned, C: CodecType> WorkQueue<T, C>
impl<T: DeserializeOwned, C: CodecType> WorkQueue<T, C>
Sourcepub async fn pull(&self, batch_size: usize) -> Result<PullBatch<T, C>>
pub async fn pull(&self, batch_size: usize) -> Result<PullBatch<T, C>>
Pull a batch of messages from the queue.
Sourcepub async fn messages(&self) -> Result<PullMessages<T, C>>
pub async fn messages(&self) -> Result<PullMessages<T, C>>
Get a continuous stream of messages.
Sourcepub async fn into_stream(self) -> Result<StreamingWorkQueue<T, C>>
pub async fn into_stream(self) -> Result<StreamingWorkQueue<T, C>>
Convert this queue into a Stream that continuously pulls messages one at a time.
This is the most ergonomic way to process queue messages when you want to handle them one at a time in a loop.
§Example
use intercom::{Client, MsgPackCodec, jetstream::queue::WorkQueue};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Job { id: u64 }
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs").create().await?;
// Convert to a Stream and iterate
let mut queue = queue.into_stream().await?;
while let Some(job) = queue.next().await {
let job = job?;
println!("Processing job: {:?}", job.payload);
job.ack().await?;
}Auto Trait Implementations§
impl<T, C> Freeze for WorkQueue<T, C>
impl<T, C> !RefUnwindSafe for WorkQueue<T, C>
impl<T, C> Send for WorkQueue<T, C>where
T: Send,
impl<T, C> Sync for WorkQueue<T, C>where
T: Sync,
impl<T, C> Unpin for WorkQueue<T, C>
impl<T, C> !UnwindSafe for WorkQueue<T, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more