use async_nats::jetstream::{self, consumer::PullConsumer, stream::StorageType};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let nc = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(nc);
js.delete_stream("ORDERS").await.ok();
let stream = js
.create_stream(jetstream::stream::Config {
name: "ORDERS".to_string(),
subjects: vec!["orders.>".into()],
storage: StorageType::File,
..Default::default()
})
.await?;
js.publish("orders.new", "Order #1001".into()).await?;
js.publish("orders.new", "Order #1002".into()).await?;
js.publish("orders.shipped", "Order #1001 shipped".into())
.await?;
let consumer: PullConsumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("order-processor".to_string()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
})
.await?;
let mut messages = consumer.fetch().max_messages(3).messages().await?;
while let Some(message) = messages.next().await {
let message = message?;
println!(
"Received on {}: {}",
message.subject,
String::from_utf8_lossy(&message.payload)
);
message.ack().await?;
}
Ok(())
}