use async_nats::jetstream::kv::Store;
use async_trait::async_trait;
use futures::StreamExt;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::sync::{RwLock, mpsc::Sender};
use tracing::{debug, error, info, warn};
use crate::kv::{
KvEntry, KvError, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
};
use crate::stores::{Connection, ConnectionCapabilities, KvStore, StoreConfig};
const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
async fn timed<F, T>(fut: F) -> Result<T, KvError>
where
F: std::future::Future<Output = T>,
{
tokio::time::timeout(KV_OP_TIMEOUT, fut)
.await
.map_err(|_| KvError::Timeout)
}
async fn build_connect_options(
url: &str,
creds: Option<&str>,
creds_file: Option<&str>,
) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
if let Some(encoded) = creds {
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let content = String::from_utf8(decoded)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
return Ok((
async_nats::ConnectOptions::with_credentials(&content)?,
url.to_string(),
));
}
if let Some(path) = creds_file {
return Ok((
async_nats::ConnectOptions::with_credentials_file(path).await?,
url.to_string(),
));
}
if let Ok(parsed) = url::Url::parse(url)
&& !parsed.username().is_empty()
{
let user = parsed.username().to_string();
let pass = parsed.password().unwrap_or("").to_string();
let mut clean_url = parsed.clone();
if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
warn!("could not strip credentials from NATS URL; they may appear in logs");
}
return Ok((
async_nats::ConnectOptions::with_user_and_password(user, pass),
clean_url.as_str().to_string(),
));
}
Ok((async_nats::ConnectOptions::new(), url.to_string()))
}
pub async fn nats_connect(
url: &str,
creds: Option<&str>,
creds_file: Option<&str>,
) -> Result<async_nats::Client, async_nats::ConnectError> {
let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
opts.connect(dial_url).await
}
fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
match std::str::from_utf8(payload) {
Ok(s) => std::borrow::Cow::Borrowed(s),
Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
}
}
#[derive(Clone)]
pub struct NatsConnectionConfig {
pub url: String,
pub creds: Option<String>,
pub creds_file: Option<String>,
}
impl std::fmt::Debug for NatsConnectionConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsConnectionConfig")
.field("url", &self.url)
.field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
.field(
"creds_file",
&self.creds_file.as_ref().map(|_| "[redacted]"),
)
.finish()
}
}
pub(crate) async fn create_kv_bucket_raw(
client: &async_nats::Client,
bucket: &str,
max_bytes: i64,
history: i64,
max_age_nanos: i64,
num_replicas: usize,
) -> Result<(), KvError> {
let stream_name = format!("KV_{}", bucket);
let subject = format!("$KV.{}.>", bucket);
let config = serde_json::json!({
"name": stream_name,
"subjects": [subject],
"max_msgs_per_subject": history,
"max_bytes": max_bytes,
"max_age": max_age_nanos,
"storage": "file",
"allow_rollup_hdrs": true,
"deny_delete": false,
"deny_purge": false,
"allow_direct": true,
"discard": "new",
"num_replicas": num_replicas,
"retention": "limits"
});
let payload = serde_json::to_vec(&config)
.map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
let response = client
.request(
format!("$JS.API.STREAM.CREATE.{}", stream_name),
payload.into(),
)
.await
.map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
match classify_raw_create_response(&response.payload) {
RawCreateOutcome::AlreadyExists => {
info!(bucket, "bucket already exists");
Ok(())
}
RawCreateOutcome::StreamLimit => {
info!(bucket, "stream limit reached, bucket may already exist");
Ok(())
}
RawCreateOutcome::Created => {
info!(bucket, "bucket created successfully via raw API");
Ok(())
}
RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
"JetStream error {}: {}",
code, description
))),
}
}
#[derive(Debug, PartialEq, Eq)]
enum RawCreateOutcome {
Created,
AlreadyExists,
StreamLimit,
Failed { code: i64, description: String },
}
fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
warn!(
response = %payload_for_log(payload),
"unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
);
return RawCreateOutcome::Created;
};
let Some(err) = json.get("error") else {
return RawCreateOutcome::Created;
};
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
let description = err
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("unknown error");
if code == 10058 || err_code == 10058 {
return RawCreateOutcome::AlreadyExists;
}
if code == 400 && description.contains("maximum number of streams") {
return RawCreateOutcome::StreamLimit;
}
RawCreateOutcome::Failed {
code,
description: description.to_string(),
}
}
struct NatsHandle {
#[allow(dead_code)]
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}
pub struct NatsConnection {
config: NatsConnectionConfig,
handle: RwLock<Option<NatsHandle>>,
healthy: Arc<AtomicBool>,
state_probe: Option<async_nats::Client>,
}
impl NatsConnection {
pub fn new(config: NatsConnectionConfig) -> Self {
Self {
config,
handle: RwLock::new(None),
healthy: Arc::new(AtomicBool::new(false)),
state_probe: None,
}
}
pub fn from_client(client: async_nats::Client) -> Self {
let jetstream = async_nats::jetstream::new(client.clone());
let config = NatsConnectionConfig {
url: String::new(), creds: None,
creds_file: None,
};
let state_probe = Some(client.clone());
let handle = NatsHandle { client, jetstream };
Self {
config,
handle: RwLock::new(Some(handle)),
healthy: Arc::new(AtomicBool::new(true)),
state_probe,
}
}
async fn get_or_create_bucket(
client: &async_nats::Client,
js: &async_nats::jetstream::Context,
config: &StoreConfig,
) -> Result<Store, KvError> {
match timed(js.get_key_value(&config.name)).await {
Ok(Ok(kv)) => return Ok(kv),
Ok(Err(e)) => {
debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
}
Err(_) => {
warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
}
}
let mut kv_config = async_nats::jetstream::kv::Config {
bucket: config.name.clone(),
num_replicas: config.num_replicas.unwrap_or(1),
..Default::default()
};
let max_age_nanos = if let Some(max_age) = config.max_age {
kv_config.max_age = max_age;
i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
} else {
0
};
let history = if let Some(history) = config.max_history {
let history = i64::from(history);
kv_config.history = history;
history
} else {
1
};
let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); kv_config.max_bytes = max_bytes;
match timed(js.create_key_value(kv_config)).await? {
Ok(kv) => Ok(kv),
Err(e) => {
warn!(
bucket = config.name,
error = ?e,
"create_key_value failed, trying raw JetStream API"
);
create_kv_bucket_raw(
client,
&config.name,
max_bytes,
history,
max_age_nanos,
config.num_replicas.unwrap_or(1),
)
.await?;
timed(js.get_key_value(&config.name))
.await?
.map_err(|e| {
error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
})
}
}
}
}
#[async_trait]
impl Connection for NatsConnection {
async fn connect(&self) -> Result<(), KvError> {
if self.healthy.load(Ordering::Acquire) {
return Ok(());
}
if self.state_probe.is_some() {
return Err(KvError::ConnectionFailed(
"connection was built via NatsConnection::from_client and cannot \
reconnect (no URL or credentials retained); construct \
NatsConnection::new(config) for a reconnectable connection"
.to_string(),
));
}
let (opts, dial_url) = build_connect_options(
&self.config.url,
self.config.creds.as_deref(),
self.config.creds_file.as_deref(),
)
.await
.map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
let installed = Arc::new(AtomicBool::new(false));
let cb_healthy = Arc::clone(&self.healthy);
let cb_installed = Arc::clone(&installed);
let opts = opts.event_callback(move |event| {
let cb_healthy = Arc::clone(&cb_healthy);
let cb_installed = Arc::clone(&cb_installed);
async move {
if !cb_installed.load(Ordering::Acquire) {
return;
}
match event {
async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
_ => {}
}
}
});
let client = opts
.connect(dial_url)
.await
.map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
let jetstream = async_nats::jetstream::new(client.clone());
let conn = NatsHandle { client, jetstream };
let mut handle = self.handle.write().await;
if handle.is_some() {
return Ok(());
}
installed.store(true, Ordering::Release);
*handle = Some(conn);
self.healthy.store(true, Ordering::Release);
Ok(())
}
async fn shutdown(&self) -> Result<(), KvError> {
self.healthy.store(false, Ordering::Release);
*self.handle.write().await = None;
Ok(())
}
fn is_healthy(&self) -> bool {
if !self.healthy.load(Ordering::Acquire) {
return false;
}
match &self.state_probe {
Some(client) => matches!(
client.connection_state(),
async_nats::connection::State::Connected
),
None => true,
}
}
async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
let config = StoreConfig {
name: name.to_string(),
..Default::default()
};
self.store_with_config(config).await
}
async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
let (client, js) = {
let conn = self.handle.read().await;
let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
(conn.client.clone(), conn.jetstream.clone())
};
let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
Ok(Arc::new(NatsKvStore {
name: config.name,
client,
js,
kv,
}))
}
fn capabilities(&self) -> ConnectionCapabilities {
ConnectionCapabilities {
streaming_watch: true,
prefix_watch: true,
ttl: false,
cas: true,
transactions: false,
max_value_size: 0,
global_ordering: false,
}
}
}
struct NatsKvStore {
name: String,
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
}
impl KvStore for NatsKvStore {
fn name(&self) -> &str {
&self.name
}
fn reader(&self) -> Arc<dyn KvReader> {
Arc::new(NatsKvReader {
kv: self.kv.clone(),
client: self.client.clone(),
js: self.js.clone(),
bucket: self.name.clone(),
})
}
fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
Some(Arc::new(NatsKvWatcher {
kv: self.kv.clone(),
client: self.client.clone(),
js: self.js.clone(),
bucket: self.name.clone(),
}))
}
fn writer(&self) -> Option<Arc<dyn KvWriter>> {
Some(Arc::new(NatsKvWriterImpl {
kv: self.kv.clone(),
}))
}
}
struct NatsKvReader {
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
bucket: String,
}
#[async_trait]
impl KvReader for NatsKvReader {
async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
match self.entry(key).await? {
Some(entry) if entry.value.is_empty() => Ok(None),
other => Ok(other),
}
}
async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
use async_nats::jetstream::kv::Operation;
match timed(self.kv.entry(key)).await? {
Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
key: key.to_string(),
value: entry.value.to_vec(),
version: VersionToken::from_u64(entry.revision),
})),
Ok(Some(_)) => Ok(None), Ok(None) => Ok(None),
Err(e) => Err(KvError::OperationFailed(e.to_string())),
}
}
async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
debug!(prefix = %prefix, "listing keys with prefix");
let mut keys = Vec::new();
self.consume_last_per_subject(prefix, true, |msg, key| {
if !is_kv_delete(&msg) && !is_empty_value(&msg) {
keys.push(key);
}
})
.await?;
debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
Ok(keys)
}
async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
let mut entries = Vec::new();
self.consume_last_per_subject(prefix, false, |msg, key| {
if !is_kv_delete(&msg) && !msg.payload.is_empty() {
let revision = msg
.reply
.as_deref()
.and_then(stream_sequence_from_ack)
.unwrap_or(0);
entries.push(KvEntry {
key,
value: msg.payload.to_vec(),
version: VersionToken::from_u64(revision),
});
}
})
.await?;
debug!(prefix = %prefix, entries = entries.len(), "scan complete");
Ok(entries)
}
}
fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
let mut head = [""; 8];
let mut count = 0usize;
for (i, token) in reply.split('.').enumerate() {
if i < head.len() {
head[i] = token;
}
count += 1;
}
if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
return None;
}
let stream_seq_idx = if count == 9 { 5 } else { 7 };
head[stream_seq_idx].parse::<u64>().ok()
}
fn is_kv_delete(msg: &async_nats::Message) -> bool {
msg.headers
.as_ref()
.and_then(|h| h.get("KV-Operation"))
.is_some()
}
fn is_empty_value(msg: &async_nats::Message) -> bool {
msg.headers
.as_ref()
.and_then(|h| h.get("Nats-Msg-Size"))
.map(|v| v.as_str() == "0")
.unwrap_or(false)
}
impl NatsKvReader {
async fn consume_last_per_subject(
&self,
prefix: &str,
headers_only: bool,
mut on_msg: impl FnMut(async_nats::Message, String),
) -> Result<(), KvError> {
use async_nats::jetstream::consumer::push;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
let bucket = self.bucket.as_str();
let nats_filter = if prefix.is_empty() {
format!("$KV.{bucket}.>")
} else {
format!("$KV.{bucket}.{prefix}>")
};
let inbox = self.client.new_inbox();
let mut sub = timed(self.client.subscribe(inbox.clone()))
.await?
.map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
.await?
.map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
let consumer = timed(stream.create_consumer(push::Config {
deliver_subject: inbox,
deliver_policy: DeliverPolicy::LastPerSubject,
filter_subject: nats_filter,
headers_only,
ack_policy: AckPolicy::None,
inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
..Default::default()
}))
.await?
.map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
let num_pending = consumer.cached_info().num_pending;
let mut timed_out = false;
if num_pending > 0 {
let mut delivered = 0u64;
let kv_prefix = format!("$KV.{bucket}.");
while delivered < num_pending {
match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
Ok(Some(msg)) => {
let key = msg
.subject
.strip_prefix(&kv_prefix)
.unwrap_or(msg.subject.as_str())
.to_string();
on_msg(msg, key);
delivered += 1;
}
Ok(None) => break, Err(_) => {
timed_out = true;
break;
}
}
}
}
match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
}
Err(_) => {
warn!("timed out deleting ephemeral consumer (best-effort)");
}
}
if timed_out {
return Err(KvError::Timeout);
}
Ok(())
}
}
fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
use async_nats::jetstream::kv::Operation;
let version = VersionToken::from_u64(entry.revision);
match entry.operation {
Operation::Put => KvUpdate::Put(KvEntry {
key: entry.key,
value: entry.value.to_vec(),
version,
}),
Operation::Delete => KvUpdate::Delete {
key: entry.key,
version,
},
Operation::Purge => KvUpdate::Purge {
key: entry.key,
version,
},
}
}
async fn stream_watch(
mut watcher: async_nats::jetstream::kv::Watch,
tx: &Sender<KvUpdate>,
) -> Result<(), KvError> {
while let Some(entry) = watcher.next().await {
match entry {
Ok(entry) => {
let update = nats_entry_to_kv_update(entry);
if tx.send(update).await.is_err() {
debug!("watch receiver closed");
break;
}
}
Err(e) => {
error!(error = %e, "NATS KV watch error");
return Err(KvError::WatchError(e.to_string()));
}
}
}
Ok(())
}
fn is_cursor_expired_error(err: &str) -> bool {
use std::sync::OnceLock;
static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
MATCHER
.get_or_init(|| {
aho_corasick::AhoCorasick::builder()
.ascii_case_insensitive(true)
.build([
"start sequence",
"first sequence",
"sequence not found",
"too old",
])
.expect("static needle set always compiles")
})
.is_match(err)
}
struct NatsKvWatcher {
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
bucket: String,
}
fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
let revision = msg
.reply
.as_deref()
.and_then(stream_sequence_from_ack)
.unwrap_or(0);
let version = VersionToken::from_u64(revision);
let operation = msg
.headers
.as_ref()
.and_then(|h| h.get("KV-Operation"))
.map(|v| v.as_str());
Some(match operation {
Some("DEL") => KvUpdate::Delete { key, version },
Some("PURGE") => KvUpdate::Purge { key, version },
_ => KvUpdate::Put(KvEntry {
key,
value: msg.payload.to_vec(),
version,
}),
})
}
impl NatsKvWatcher {
async fn check_resume_window(&self, revision: u64) -> Result<(), KvError> {
let stream = timed(self.js.get_stream(format!("KV_{}", self.bucket)))
.await?
.map_err(|e| {
KvError::OperationFailed(format!("get KV stream for resume check: {e}"))
})?;
let first = stream.cached_info().state.first_sequence;
if !crate::protocol::resume_window_ok(revision, first) {
warn!(
revision,
first_sequence = first,
"resume cursor is below the stream's first retained sequence; cursor expired"
);
return Err(KvError::CursorExpired);
}
Ok(())
}
}
#[async_trait]
impl KvWatcher for NatsKvWatcher {
async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
let watcher = timed(self.kv.watch_with_history(">"))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
let nats_key = format!("{prefix}>");
let watcher = timed(self.kv.watch_with_history(&nats_key))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
if prefixes.is_empty() {
return Ok(());
}
let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
let watcher = timed(self.kv.watch_many_with_history(keys))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_all_from(
&self,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_all(tx).await,
};
self.check_resume_window(revision).await?;
let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
Ok(w) => w,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
info!(revision, "resumed watch from cursor");
stream_watch(watcher, &tx).await
}
async fn watch_prefix_from(
&self,
prefix: &str,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_prefix(prefix, tx).await,
};
self.check_resume_window(revision).await?;
let nats_key = format!("{prefix}>");
let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
Ok(w) => w,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
info!(revision, prefix, "resumed prefix watch from cursor");
stream_watch(watcher, &tx).await
}
async fn watch_prefixes_from(
&self,
prefixes: &[&str],
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
if prefixes.is_empty() {
return Ok(());
}
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_prefixes(prefixes, tx).await,
};
let bucket = self.bucket.as_str();
let kv_prefix = format!("$KV.{bucket}.");
let filter_subjects: Vec<String> = prefixes
.iter()
.map(|p| format!("{kv_prefix}{p}>"))
.collect();
let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
.await?
.map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
let first = stream.cached_info().state.first_sequence;
if !crate::protocol::resume_window_ok(revision, first) {
warn!(
revision,
first_sequence = first,
?prefixes,
"resume cursor is below the stream's first retained sequence; cursor expired"
);
return Err(KvError::CursorExpired);
}
let consumer = match timed(stream.create_consumer(push::OrderedConfig {
deliver_subject: self.client.new_inbox(),
description: Some("kv multi-prefix resume consumer".to_string()),
filter_subjects,
replay_policy: ReplayPolicy::Instant,
deliver_policy: DeliverPolicy::ByStartSequence {
start_sequence: revision + 1,
},
..Default::default()
}))
.await?
{
Ok(c) => c,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
let mut messages = timed(consumer.messages())
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
info!(
revision,
?prefixes,
"resumed multi-prefix watch from cursor"
);
while let Some(msg) = messages.next().await {
match msg {
Ok(msg) => {
let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
continue;
};
if tx.send(update).await.is_err() {
debug!("watch receiver closed");
break;
}
}
Err(e) => {
error!(error = %e, "NATS KV multi-prefix watch error");
return Err(KvError::WatchError(e.to_string()));
}
}
}
Ok(())
}
}
struct NatsKvWriterImpl {
kv: Store,
}
#[async_trait]
impl KvWriter for NatsKvWriterImpl {
async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
let rev = timed(self.kv.put(key, value.to_vec().into()))
.await?
.map_err(|e| KvError::OperationFailed(e.to_string()))?;
Ok(VersionToken::from_u64(rev))
}
async fn delete(&self, key: &str) -> Result<bool, KvError> {
timed(self.kv.delete(key))
.await?
.map_err(|e| KvError::OperationFailed(e.to_string()))?;
Ok(true)
}
async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
use async_nats::jetstream::kv::CreateErrorKind;
timed(self.kv.create(key, value.to_vec().into()))
.await?
.map(VersionToken::from_u64)
.map_err(|e| {
if e.kind() == CreateErrorKind::AlreadyExists {
KvError::AlreadyExists
} else {
KvError::OperationFailed(e.to_string())
}
})
}
async fn update(
&self,
key: &str,
value: &[u8],
expected: &VersionToken,
) -> Result<VersionToken, KvError> {
use async_nats::jetstream::kv::UpdateErrorKind;
let rev = expected.as_u64().ok_or_else(|| {
KvError::OperationFailed("invalid version token for NATS update".into())
})?;
timed(self.kv.update(key, value.to_vec().into(), rev))
.await?
.map(VersionToken::from_u64)
.map_err(|e| {
if e.kind() == UpdateErrorKind::WrongLastRevision {
KvError::RevisionMismatch
} else {
KvError::OperationFailed(e.to_string())
}
})
}
async fn delete_with_version(
&self,
key: &str,
expected: &VersionToken,
) -> Result<bool, KvError> {
use async_nats::jetstream::kv::UpdateErrorKind;
let rev = expected.as_u64().ok_or_else(|| {
KvError::OperationFailed("invalid version token for NATS delete".into())
})?;
timed(self.kv.update(key, Vec::new().into(), rev))
.await?
.map(|_| true)
.map_err(|e| {
if e.kind() == UpdateErrorKind::WrongLastRevision {
KvError::RevisionMismatch
} else {
KvError::OperationFailed(e.to_string())
}
})
}
}
impl std::fmt::Debug for NatsConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsConnection")
.field("url", &self.config.url)
.field("healthy", &self.healthy.load(Ordering::Acquire))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn raw_create_success_has_no_error() {
let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::Created
);
}
#[test]
fn raw_create_swallows_stream_already_exists() {
let payload =
br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::AlreadyExists
);
}
#[test]
fn raw_create_swallows_stream_limit() {
let payload =
br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::StreamLimit
);
}
#[test]
fn raw_create_propagates_unknown_error() {
let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 403);
assert_eq!(description, "insufficient permissions");
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn raw_create_400_without_stream_limit_is_fatal() {
let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 400);
assert!(description.contains("invalid stream config"));
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn raw_create_unparseable_payload_is_treated_as_success() {
assert_eq!(
classify_raw_create_response(b"not json at all"),
RawCreateOutcome::Created
);
}
#[test]
fn ack_subject_legacy_format() {
let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_eq!(stream_sequence_from_ack(reply), Some(42));
}
#[test]
fn ack_subject_modern_format_with_domain_and_account() {
let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_eq!(stream_sequence_from_ack(reply), Some(42));
}
#[test]
fn ack_subject_modern_format_with_trailing_token() {
let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
assert_eq!(stream_sequence_from_ack(reply), Some(99));
}
#[test]
fn ack_subject_last_token_is_not_the_sequence() {
let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_ne!(stream_sequence_from_ack(reply), Some(0));
}
#[test]
fn ack_subject_rejects_garbage() {
assert_eq!(stream_sequence_from_ack(""), None);
assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
}
#[test]
fn cursor_expired_matches_known_nats_error_strings() {
assert!(is_cursor_expired_error(
"consumer start sequence is too old"
));
assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
assert!(is_cursor_expired_error("sequence not found in stream"));
assert!(is_cursor_expired_error("requested revision is too old"));
assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
assert!(!is_cursor_expired_error("connection refused"));
assert!(!is_cursor_expired_error("permission denied"));
assert!(!is_cursor_expired_error("stream not found"));
}
fn raw_kv_msg(
subject: &str,
reply: Option<&str>,
payload: &[u8],
op: Option<&str>,
) -> async_nats::Message {
let headers = op.map(|op| {
let mut h = async_nats::HeaderMap::new();
h.insert("KV-Operation", op);
h
});
async_nats::Message {
subject: subject.to_string().into(),
reply: reply.map(|r| r.to_string().into()),
payload: payload.to_vec().into(),
headers,
status: None,
description: None,
length: 0,
}
}
const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
#[test]
fn kv_message_decodes_put_without_operation_header() {
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
KvUpdate::Put(e) => {
assert_eq!(e.key, "node.a");
assert_eq!(e.value, b"v1");
assert_eq!(e.version.as_u64(), Some(42));
}
other => panic!("expected Put, got {other:?}"),
}
}
#[test]
fn kv_message_decodes_delete_and_purge_markers() {
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
assert!(matches!(
kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
));
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
assert!(matches!(
kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
KvUpdate::Purge { ref key, .. } if key == "node.a"
));
}
#[test]
fn kv_message_outside_keyspace_is_skipped() {
let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
}
#[test]
fn kv_message_without_reply_gets_revision_zero() {
let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
KvUpdate::Put(e) => assert_eq!(e.version.as_u64(), Some(0)),
other => panic!("expected Put, got {other:?}"),
}
}
#[test]
fn raw_create_already_exists_when_10058_in_code_field() {
let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::AlreadyExists
);
}
#[test]
fn raw_create_error_without_code_defaults_to_zero() {
let payload = br#"{"error":{"description":"mystery"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 0);
assert_eq!(description, "mystery");
}
other => panic!("expected Failed, got {other:?}"),
}
}
}