use std::{env, str::from_utf8};
use async_nats::jetstream;
use futures_util::{StreamExt, TryStreamExt};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = async_nats::connect(nats_url).await?;
let jetstream = jetstream::new(client);
let kv = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "profiles".to_string(),
..Default::default()
})
.await?;
kv.put("sue.color", "blue".into()).await?;
let entry = kv.entry("sue.color").await?;
if let Some(entry) = entry {
println!(
"{} @ {} -> {}",
entry.key,
entry.revision,
from_utf8(&entry.value)?
);
}
kv.put("sue.color", "green".into()).await?;
let entry = kv.entry("sue.color").await?;
if let Some(entry) = entry {
println!(
"{} @ {} -> {}",
entry.key,
entry.revision,
from_utf8(&entry.value)?
);
}
kv.update("sue.color", "red".into(), 1)
.await
.expect_err("expected error");
kv.update("sue.color", "red".into(), 2).await?;
let entry = kv.entry("sue.color").await?;
if let Some(entry) = entry {
println!(
"{} @ {} -> {}",
entry.key,
entry.revision,
from_utf8(&entry.value)?
);
}
let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
println!("All keys: {keys:?}");
let name = jetstream.stream_names().next().await.unwrap().unwrap();
println!("KV stream name: {name}");
let consumer = jetstream
.get_stream("KV_profiles")
.await?
.create_consumer(async_nats::jetstream::consumer::pull::Config {
..Default::default()
})
.await?;
let mut messages = consumer.messages().await?;
let message = messages.next().await.unwrap()?;
let metadata = message.info()?;
println!(
"{} @ {} -> {}",
message.subject,
metadata.stream_sequence,
from_utf8(&message.payload)?
);
kv.put("sue.color", "yellow".into()).await?;
let message = messages.next().await.unwrap()?;
let metadata = message.info()?;
println!(
"{} @ {} -> {}",
message.subject,
metadata.stream_sequence,
from_utf8(&message.payload)?
);
kv.delete("sue.color").await?;
let message = messages.next().await.unwrap()?;
let metadata = message.info()?;
println!(
"{} @ {} -> {}",
message.subject,
metadata.stream_sequence,
from_utf8(&message.payload)?
);
println!("headers: {:?}", message.headers.as_ref().unwrap());
let mut watch = kv.watch("sue.*").await?;
kv.put("sue.color", "purple".into()).await?;
let entry = watch.next().await.unwrap()?;
println!(
"{} @ {} -> {} (op: {:?})",
entry.key,
entry.revision,
from_utf8(&entry.value)?,
entry.operation
);
kv.put("sue.food", "pizza".into()).await?;
let entry = watch.next().await.unwrap()?;
println!(
"{} @ {} -> {} (op: {:?})",
entry.key,
entry.revision,
from_utf8(&entry.value)?,
entry.operation
);
Ok(())
}