use async_nats::jetstream::kv::Store;
use async_trait::async_trait;
use futures::StreamExt;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::sync::{RwLock, mpsc::Sender};
use tracing::{debug, error, info, warn};
use crate::kv::{
KvEntry, KvError, KvPurge, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
};
use crate::stores::{Connection, ConnectionCapabilities, DiscardPolicy, KvStore, StoreConfig};
const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
async fn timed<F, T>(fut: F) -> Result<T, KvError>
where
F: std::future::Future<Output = T>,
{
tokio::time::timeout(KV_OP_TIMEOUT, fut)
.await
.map_err(|_| KvError::Timeout)
}
async fn build_connect_options(
url: &str,
creds: Option<&str>,
creds_file: Option<&str>,
) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
if let Some(encoded) = creds {
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let content = String::from_utf8(decoded)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
return Ok((
async_nats::ConnectOptions::with_credentials(&content)?,
url.to_string(),
));
}
if let Some(path) = creds_file {
return Ok((
async_nats::ConnectOptions::with_credentials_file(path).await?,
url.to_string(),
));
}
if let Ok(parsed) = url::Url::parse(url)
&& !parsed.username().is_empty()
{
let user = parsed.username().to_string();
let pass = parsed.password().unwrap_or("").to_string();
let mut clean_url = parsed.clone();
if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
warn!("could not strip credentials from NATS URL; they may appear in logs");
}
return Ok((
async_nats::ConnectOptions::with_user_and_password(user, pass),
clean_url.as_str().to_string(),
));
}
Ok((async_nats::ConnectOptions::new(), url.to_string()))
}
pub async fn nats_connect(
url: &str,
creds: Option<&str>,
creds_file: Option<&str>,
) -> Result<async_nats::Client, async_nats::ConnectError> {
let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
opts.connect(dial_url).await
}
fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
match std::str::from_utf8(payload) {
Ok(s) => std::borrow::Cow::Borrowed(s),
Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
}
}
#[derive(Clone)]
pub struct NatsConnectionConfig {
pub url: String,
pub creds: Option<String>,
pub creds_file: Option<String>,
}
impl std::fmt::Debug for NatsConnectionConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsConnectionConfig")
.field("url", &self.url)
.field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
.field(
"creds_file",
&self.creds_file.as_ref().map(|_| "[redacted]"),
)
.finish()
}
}
pub(crate) async fn create_kv_bucket_raw(
client: &async_nats::Client,
bucket: &str,
max_bytes: i64,
history: i64,
max_age_nanos: i64,
discard: DiscardPolicy,
num_replicas: usize,
) -> Result<(), KvError> {
let stream_name = format!("KV_{}", bucket);
let subject = format!("$KV.{}.>", bucket);
let config = serde_json::json!({
"name": stream_name,
"subjects": [subject],
"max_msgs_per_subject": history,
"max_bytes": max_bytes,
"max_age": max_age_nanos,
"storage": "file",
"allow_rollup_hdrs": true,
"deny_delete": false,
"deny_purge": false,
"allow_direct": true,
"discard": discard.as_nats(),
"num_replicas": num_replicas,
"retention": "limits"
});
let payload = serde_json::to_vec(&config)
.map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
let response = client
.request(
format!("$JS.API.STREAM.CREATE.{}", stream_name),
payload.into(),
)
.await
.map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
match classify_raw_create_response(&response.payload) {
RawCreateOutcome::AlreadyExists => {
info!(bucket, "bucket already exists");
Ok(())
}
RawCreateOutcome::StreamLimit => {
info!(bucket, "stream limit reached, bucket may already exist");
Ok(())
}
RawCreateOutcome::Created => {
info!(bucket, "bucket created successfully via raw API");
Ok(())
}
RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
"JetStream error {}: {}",
code, description
))),
}
}
#[derive(Debug, PartialEq, Eq)]
enum RawCreateOutcome {
Created,
AlreadyExists,
StreamLimit,
Failed { code: i64, description: String },
}
fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
warn!(
response = %payload_for_log(payload),
"unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
);
return RawCreateOutcome::Created;
};
let Some(err) = json.get("error") else {
return RawCreateOutcome::Created;
};
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
let description = err
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("unknown error");
if code == 10058 || err_code == 10058 {
return RawCreateOutcome::AlreadyExists;
}
if code == 400 && description.contains("maximum number of streams") {
return RawCreateOutcome::StreamLimit;
}
RawCreateOutcome::Failed {
code,
description: description.to_string(),
}
}
struct NatsHandle {
client: async_nats::Client,
jetstream: async_nats::jetstream::Context,
}
pub struct NatsConnection {
config: NatsConnectionConfig,
handle: RwLock<Option<NatsHandle>>,
healthy: Arc<AtomicBool>,
state_probe: Option<async_nats::Client>,
}
impl NatsConnection {
pub fn new(config: NatsConnectionConfig) -> Self {
Self {
config,
handle: RwLock::new(None),
healthy: Arc::new(AtomicBool::new(false)),
state_probe: None,
}
}
pub fn from_client(client: async_nats::Client) -> Self {
let jetstream = async_nats::jetstream::new(client.clone());
let config = NatsConnectionConfig {
url: String::new(), creds: None,
creds_file: None,
};
let state_probe = Some(client.clone());
let handle = NatsHandle { client, jetstream };
Self {
config,
handle: RwLock::new(Some(handle)),
healthy: Arc::new(AtomicBool::new(true)),
state_probe,
}
}
async fn get_or_create_bucket(
client: &async_nats::Client,
js: &async_nats::jetstream::Context,
config: &StoreConfig,
) -> Result<Store, KvError> {
match timed(js.get_key_value(&config.name)).await {
Ok(Ok(kv)) => return Ok(kv),
Ok(Err(e)) => {
debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
}
Err(_) => {
warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
}
}
let mut kv_config = async_nats::jetstream::kv::Config {
bucket: config.name.clone(),
num_replicas: config.num_replicas.unwrap_or(1),
..Default::default()
};
let max_age_nanos = if let Some(max_age) = config.max_age {
kv_config.max_age = max_age;
i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
} else {
0
};
let history = if let Some(history) = config.max_history {
let history = i64::from(history);
kv_config.history = history;
history
} else {
1
};
let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); kv_config.max_bytes = max_bytes;
if config.discard == DiscardPolicy::New {
match timed(js.create_key_value(kv_config)).await {
Ok(Ok(kv)) => return Ok(kv),
Ok(Err(e)) => {
warn!(
bucket = config.name,
error = ?e,
"create_key_value failed, trying raw JetStream API"
);
}
Err(_) => {
warn!(
bucket = config.name,
timeout = ?KV_OP_TIMEOUT,
"create_key_value timed out, trying raw JetStream API"
);
}
}
}
create_kv_bucket_raw(
client,
&config.name,
max_bytes,
history,
max_age_nanos,
config.discard,
config.num_replicas.unwrap_or(1),
)
.await?;
timed(js.get_key_value(&config.name)).await?.map_err(|e| {
error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
})
}
}
#[async_trait]
impl Connection for NatsConnection {
async fn connect(&self) -> Result<(), KvError> {
if self.healthy.load(Ordering::Acquire) {
return Ok(());
}
if self.state_probe.is_some() {
return Err(KvError::ConnectionFailed(
"connection was built via NatsConnection::from_client and cannot \
reconnect (no URL or credentials retained); construct \
NatsConnection::new(config) for a reconnectable connection"
.to_string(),
));
}
let (opts, dial_url) = build_connect_options(
&self.config.url,
self.config.creds.as_deref(),
self.config.creds_file.as_deref(),
)
.await
.map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
let installed = Arc::new(AtomicBool::new(false));
let cb_healthy = Arc::clone(&self.healthy);
let cb_installed = Arc::clone(&installed);
let opts = opts.event_callback(move |event| {
let cb_healthy = Arc::clone(&cb_healthy);
let cb_installed = Arc::clone(&cb_installed);
async move {
if !cb_installed.load(Ordering::Acquire) {
return;
}
match event {
async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
_ => {}
}
}
});
let client = opts
.connect(dial_url)
.await
.map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
let jetstream = async_nats::jetstream::new(client.clone());
let conn = NatsHandle { client, jetstream };
let mut handle = self.handle.write().await;
if handle.is_some() {
return Ok(());
}
installed.store(true, Ordering::Release);
*handle = Some(conn);
self.healthy.store(true, Ordering::Release);
Ok(())
}
async fn shutdown(&self) -> Result<(), KvError> {
self.healthy.store(false, Ordering::Release);
*self.handle.write().await = None;
Ok(())
}
fn is_healthy(&self) -> bool {
if !self.healthy.load(Ordering::Acquire) {
return false;
}
match &self.state_probe {
Some(client) => matches!(
client.connection_state(),
async_nats::connection::State::Connected
),
None => true,
}
}
async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
let config = StoreConfig {
name: name.to_string(),
..Default::default()
};
self.store_with_config(config).await
}
async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
let (client, js) = {
let conn = self.handle.read().await;
let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
(conn.client.clone(), conn.jetstream.clone())
};
let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
Ok(Arc::new(NatsKvStore {
name: config.name,
client,
js,
kv,
}))
}
fn capabilities(&self) -> ConnectionCapabilities {
ConnectionCapabilities {
streaming_watch: true,
prefix_watch: true,
ttl: false,
purge: true,
cas: true,
transactions: false,
max_value_size: 0,
global_ordering: false,
}
}
}
struct NatsKvStore {
name: String,
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
}
impl KvStore for NatsKvStore {
fn name(&self) -> &str {
&self.name
}
fn reader(&self) -> Arc<dyn KvReader> {
Arc::new(NatsKvReader {
kv: self.kv.clone(),
client: self.client.clone(),
js: self.js.clone(),
bucket: self.name.clone(),
})
}
fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
Some(Arc::new(NatsKvWatcher {
kv: self.kv.clone(),
client: self.client.clone(),
js: self.js.clone(),
bucket: self.name.clone(),
}))
}
fn writer(&self) -> Option<Arc<dyn KvWriter>> {
Some(Arc::new(NatsKvWriterImpl {
kv: self.kv.clone(),
}))
}
fn purge_writer(&self) -> Option<Arc<dyn KvPurge>> {
Some(Arc::new(NatsKvWriterImpl {
kv: self.kv.clone(),
}))
}
}
struct NatsKvReader {
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
bucket: String,
}
#[async_trait]
impl KvReader for NatsKvReader {
async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
match self.entry(key).await? {
Some(entry) if entry.value.is_empty() => Ok(None),
other => Ok(other),
}
}
async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
use async_nats::jetstream::kv::Operation;
match timed(self.kv.entry(key)).await? {
Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
key: key.to_string(),
value: entry.value.to_vec(),
version: VersionToken::from_u64(entry.revision),
})),
Ok(Some(_)) => Ok(None), Ok(None) => Ok(None),
Err(e) => Err(KvError::OperationFailed(e.to_string())),
}
}
async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
debug!(prefix = %prefix, "listing keys with prefix");
let mut keys = Vec::new();
self.consume_last_per_subject(prefix, true, |msg, key| {
if !is_kv_delete(&msg) && !is_empty_value(&msg) {
keys.push(key);
}
})
.await?;
debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
Ok(keys)
}
async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
let mut entries = Vec::new();
self.consume_last_per_subject(prefix, false, |msg, key| {
if !is_kv_delete(&msg) && !msg.payload.is_empty() {
let revision = msg
.reply
.as_deref()
.and_then(stream_sequence_from_ack)
.unwrap_or(0);
entries.push(KvEntry {
key,
value: msg.payload.to_vec(),
version: VersionToken::from_u64(revision),
});
}
})
.await?;
debug!(prefix = %prefix, entries = entries.len(), "scan complete");
Ok(entries)
}
}
fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
let mut head = [""; 8];
let mut count = 0usize;
for (i, token) in reply.split('.').enumerate() {
if i < head.len() {
head[i] = token;
}
count += 1;
}
if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
return None;
}
let stream_seq_idx = if count == 9 { 5 } else { 7 };
head[stream_seq_idx].parse::<u64>().ok()
}
fn is_kv_delete(msg: &async_nats::Message) -> bool {
msg.headers
.as_ref()
.and_then(|h| h.get("KV-Operation"))
.is_some()
}
fn is_empty_value(msg: &async_nats::Message) -> bool {
msg.headers
.as_ref()
.and_then(|h| h.get("Nats-Msg-Size"))
.map(|v| v.as_str() == "0")
.unwrap_or(false)
}
impl NatsKvReader {
async fn consume_last_per_subject(
&self,
prefix: &str,
headers_only: bool,
mut on_msg: impl FnMut(async_nats::Message, String),
) -> Result<(), KvError> {
use async_nats::jetstream::consumer::push;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
let bucket = self.bucket.as_str();
let nats_filter = if prefix.is_empty() {
format!("$KV.{bucket}.>")
} else {
format!("$KV.{bucket}.{prefix}>")
};
let inbox = self.client.new_inbox();
let mut sub = timed(self.client.subscribe(inbox.clone()))
.await?
.map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
.await?
.map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
let consumer = timed(stream.create_consumer(push::Config {
deliver_subject: inbox,
deliver_policy: DeliverPolicy::LastPerSubject,
filter_subject: nats_filter,
headers_only,
ack_policy: AckPolicy::None,
inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
..Default::default()
}))
.await?
.map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
let num_pending = consumer.cached_info().num_pending;
let mut timed_out = false;
if num_pending > 0 {
let mut delivered = 0u64;
let kv_prefix = format!("$KV.{bucket}.");
while delivered < num_pending {
match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
Ok(Some(msg)) => {
let key = msg
.subject
.strip_prefix(&kv_prefix)
.unwrap_or(msg.subject.as_str())
.to_string();
on_msg(msg, key);
delivered += 1;
}
Ok(None) => break, Err(_) => {
timed_out = true;
break;
}
}
}
}
match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
}
Err(_) => {
warn!("timed out deleting ephemeral consumer (best-effort)");
}
}
if timed_out {
return Err(KvError::Timeout);
}
Ok(())
}
}
fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
use async_nats::jetstream::kv::Operation;
let version = VersionToken::from_u64(entry.revision);
match entry.operation {
Operation::Put => KvUpdate::Put(KvEntry {
key: entry.key,
value: entry.value.to_vec(),
version,
}),
Operation::Delete => KvUpdate::Delete {
key: entry.key,
version,
},
Operation::Purge => KvUpdate::Purge {
key: entry.key,
version,
},
}
}
async fn stream_watch(
mut watcher: async_nats::jetstream::kv::Watch,
tx: &Sender<KvUpdate>,
) -> Result<(), KvError> {
while let Some(entry) = watcher.next().await {
match entry {
Ok(entry) => {
let update = nats_entry_to_kv_update(entry);
if tx.send(update).await.is_err() {
debug!("watch receiver closed");
break;
}
}
Err(e) => {
error!(error = %e, "NATS KV watch error");
return Err(KvError::WatchError(e.to_string()));
}
}
}
Ok(())
}
const FLOOR_GUARD_INTERVAL: Duration = Duration::from_secs(30);
async fn stream_watch_floor_guarded(
mut watcher: async_nats::jetstream::kv::Watch,
tx: &Sender<KvUpdate>,
resume_revision: u64,
js: &async_nats::jetstream::Context,
bucket: &str,
) -> Result<(), KvError> {
let stream_name = format!("KV_{bucket}");
let first_sequence = || async {
let stream = timed(js.get_stream(&stream_name))
.await?
.map_err(|e| KvError::OperationFailed(format!("floor guard stream lookup: {e}")))?;
Ok::<u64, KvError>(stream.cached_info().state.first_sequence)
};
fn trip(frontier: u64, first: u64, bucket: &str) -> KvError {
warn!(
frontier,
first_sequence = first,
bucket,
"stream retention overran this live watch; failing so the restart can resync \
(messages in the gap were evicted unseen)"
);
KvError::WatchError(format!(
"stream retention overran live watch (first_sequence {first} > delivered \
frontier {frontier} + 1); restart will resync"
))
}
let mut frontier = resume_revision;
let mut backstop = tokio::time::interval(FLOOR_GUARD_INTERVAL);
backstop.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
backstop.tick().await;
loop {
tokio::select! {
entry = watcher.next() => {
let Some(entry) = entry else { break };
match entry {
Ok(entry) => {
let revision = entry.revision;
if revision > frontier.saturating_add(1) {
let first = first_sequence().await?;
if !crate::protocol::resume_window_ok(frontier, first) {
return Err(trip(frontier, first, bucket));
}
}
frontier = frontier.max(revision);
let update = nats_entry_to_kv_update(entry);
if tx.send(update).await.is_err() {
debug!("watch receiver closed");
break;
}
}
Err(e) => {
error!(error = %e, "NATS KV watch error");
return Err(KvError::WatchError(e.to_string()));
}
}
}
_ = backstop.tick() => {
let first = first_sequence().await?;
if !crate::protocol::resume_window_ok(frontier, first) {
return Err(trip(frontier, first, bucket));
}
}
}
}
Ok(())
}
fn is_cursor_expired_error(err: &str) -> bool {
use std::sync::OnceLock;
static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
MATCHER
.get_or_init(|| {
aho_corasick::AhoCorasick::builder()
.ascii_case_insensitive(true)
.build([
"start sequence",
"first sequence",
"sequence not found",
"too old",
])
.expect("static needle set always compiles")
})
.is_match(err)
}
struct NatsKvWatcher {
kv: Store,
client: async_nats::Client,
js: async_nats::jetstream::Context,
bucket: String,
}
fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
let version = msg
.reply
.as_deref()
.and_then(stream_sequence_from_ack)
.map(VersionToken::from_u64)
.unwrap_or_else(VersionToken::unknown);
let operation = msg
.headers
.as_ref()
.and_then(|h| h.get("KV-Operation"))
.map(|v| v.as_str());
Some(match operation {
Some("DEL") => KvUpdate::Delete { key, version },
Some("PURGE") => KvUpdate::Purge { key, version },
_ => KvUpdate::Put(KvEntry {
key,
value: msg.payload.to_vec(),
version,
}),
})
}
impl NatsKvWatcher {
async fn check_resume_window(&self, revision: u64) -> Result<(), KvError> {
let stream = timed(self.js.get_stream(format!("KV_{}", self.bucket)))
.await?
.map_err(|e| {
KvError::OperationFailed(format!("get KV stream for resume check: {e}"))
})?;
let first = stream.cached_info().state.first_sequence;
if !crate::protocol::resume_window_ok(revision, first) {
warn!(
revision,
first_sequence = first,
"resume cursor is below the stream's first retained sequence; cursor expired"
);
return Err(KvError::CursorExpired);
}
Ok(())
}
}
#[async_trait]
impl KvWatcher for NatsKvWatcher {
async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
let watcher = timed(self.kv.watch_with_history(">"))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
let nats_key = format!("{prefix}>");
let watcher = timed(self.kv.watch_with_history(&nats_key))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
if prefixes.is_empty() {
return Ok(());
}
let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
let watcher = timed(self.kv.watch_many_with_history(keys))
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
stream_watch(watcher, &tx).await
}
async fn watch_all_from(
&self,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_all(tx).await,
};
self.check_resume_window(revision).await?;
let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
Ok(w) => w,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
info!(revision, "resumed watch from cursor");
stream_watch_floor_guarded(watcher, &tx, revision, &self.js, &self.bucket).await
}
async fn watch_prefix_from(
&self,
prefix: &str,
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_prefix(prefix, tx).await,
};
self.check_resume_window(revision).await?;
let nats_key = format!("{prefix}>");
let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
Ok(w) => w,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
info!(revision, prefix, "resumed prefix watch from cursor");
stream_watch(watcher, &tx).await
}
async fn watch_prefixes_from(
&self,
prefixes: &[&str],
cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
if prefixes.is_empty() {
return Ok(());
}
let revision = match cursor.as_u64() {
Some(rev) if rev > 0 => rev,
_ => return self.watch_prefixes(prefixes, tx).await,
};
let bucket = self.bucket.as_str();
let kv_prefix = format!("$KV.{bucket}.");
let filter_subjects: Vec<String> = prefixes
.iter()
.map(|p| format!("{kv_prefix}{p}>"))
.collect();
let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
.await?
.map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
let first = stream.cached_info().state.first_sequence;
if !crate::protocol::resume_window_ok(revision, first) {
warn!(
revision,
first_sequence = first,
?prefixes,
"resume cursor is below the stream's first retained sequence; cursor expired"
);
return Err(KvError::CursorExpired);
}
let consumer = match timed(stream.create_consumer(push::OrderedConfig {
deliver_subject: self.client.new_inbox(),
description: Some("kv multi-prefix resume consumer".to_string()),
filter_subjects,
replay_policy: ReplayPolicy::Instant,
deliver_policy: DeliverPolicy::ByStartSequence {
start_sequence: revision + 1,
},
..Default::default()
}))
.await?
{
Ok(c) => c,
Err(e) => {
let err_str = e.to_string();
if is_cursor_expired_error(&err_str) {
warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
return Err(KvError::CursorExpired);
}
return Err(KvError::WatchError(err_str));
}
};
self.check_resume_window(revision).await?;
let mut messages = timed(consumer.messages())
.await?
.map_err(|e| KvError::WatchError(e.to_string()))?;
info!(
revision,
?prefixes,
"resumed multi-prefix watch from cursor"
);
while let Some(msg) = messages.next().await {
match msg {
Ok(msg) => {
let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
continue;
};
if tx.send(update).await.is_err() {
debug!("watch receiver closed");
break;
}
}
Err(e) => {
error!(error = %e, "NATS KV multi-prefix watch error");
return Err(KvError::WatchError(e.to_string()));
}
}
}
Ok(())
}
}
struct NatsKvWriterImpl {
kv: Store,
}
#[async_trait]
impl KvWriter for NatsKvWriterImpl {
async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
let rev = timed(self.kv.put(key, value.to_vec().into()))
.await?
.map_err(|e| KvError::OperationFailed(e.to_string()))?;
Ok(VersionToken::from_u64(rev))
}
async fn delete(&self, key: &str) -> Result<bool, KvError> {
timed(self.kv.delete(key))
.await?
.map_err(|e| KvError::OperationFailed(e.to_string()))?;
Ok(true)
}
async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
use async_nats::jetstream::kv::CreateErrorKind;
timed(self.kv.create(key, value.to_vec().into()))
.await?
.map(VersionToken::from_u64)
.map_err(|e| {
if e.kind() == CreateErrorKind::AlreadyExists {
KvError::AlreadyExists
} else {
KvError::OperationFailed(e.to_string())
}
})
}
async fn update(
&self,
key: &str,
value: &[u8],
expected: &VersionToken,
) -> Result<VersionToken, KvError> {
use async_nats::jetstream::kv::UpdateErrorKind;
let rev = expected.as_u64().ok_or_else(|| {
KvError::OperationFailed("invalid version token for NATS update".into())
})?;
timed(self.kv.update(key, value.to_vec().into(), rev))
.await?
.map(VersionToken::from_u64)
.map_err(|e| {
if e.kind() == UpdateErrorKind::WrongLastRevision {
KvError::RevisionMismatch
} else {
KvError::OperationFailed(e.to_string())
}
})
}
async fn delete_with_version(
&self,
key: &str,
expected: &VersionToken,
) -> Result<bool, KvError> {
use async_nats::jetstream::kv::UpdateErrorKind;
let rev = expected.as_u64().ok_or_else(|| {
KvError::OperationFailed("invalid version token for NATS delete".into())
})?;
timed(self.kv.update(key, Vec::new().into(), rev))
.await?
.map(|_| true)
.map_err(|e| {
if e.kind() == UpdateErrorKind::WrongLastRevision {
KvError::RevisionMismatch
} else {
KvError::OperationFailed(e.to_string())
}
})
}
}
#[async_trait]
impl KvPurge for NatsKvWriterImpl {
async fn purge(&self, key: &str) -> Result<(), KvError> {
timed(self.kv.purge(key))
.await?
.map_err(|e| KvError::OperationFailed(e.to_string()))?;
Ok(())
}
}
impl std::fmt::Debug for NatsConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NatsConnection")
.field("url", &self.config.url)
.field("healthy", &self.healthy.load(Ordering::Acquire))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn raw_create_success_has_no_error() {
let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::Created
);
}
#[test]
fn raw_create_swallows_stream_already_exists() {
let payload =
br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::AlreadyExists
);
}
#[test]
fn raw_create_swallows_stream_limit() {
let payload =
br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::StreamLimit
);
}
#[test]
fn raw_create_propagates_unknown_error() {
let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 403);
assert_eq!(description, "insufficient permissions");
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn raw_create_400_without_stream_limit_is_fatal() {
let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 400);
assert!(description.contains("invalid stream config"));
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn raw_create_unparseable_payload_is_treated_as_success() {
assert_eq!(
classify_raw_create_response(b"not json at all"),
RawCreateOutcome::Created
);
}
#[test]
fn ack_subject_legacy_format() {
let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_eq!(stream_sequence_from_ack(reply), Some(42));
}
#[test]
fn ack_subject_modern_format_with_domain_and_account() {
let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_eq!(stream_sequence_from_ack(reply), Some(42));
}
#[test]
fn ack_subject_modern_format_with_trailing_token() {
let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
assert_eq!(stream_sequence_from_ack(reply), Some(99));
}
#[test]
fn ack_subject_last_token_is_not_the_sequence() {
let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
assert_ne!(stream_sequence_from_ack(reply), Some(0));
}
#[test]
fn ack_subject_rejects_garbage() {
assert_eq!(stream_sequence_from_ack(""), None);
assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
}
#[test]
fn cursor_expired_matches_known_nats_error_strings() {
assert!(is_cursor_expired_error(
"consumer start sequence is too old"
));
assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
assert!(is_cursor_expired_error("sequence not found in stream"));
assert!(is_cursor_expired_error("requested revision is too old"));
assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
assert!(!is_cursor_expired_error("connection refused"));
assert!(!is_cursor_expired_error("permission denied"));
assert!(!is_cursor_expired_error("stream not found"));
}
fn raw_kv_msg(
subject: &str,
reply: Option<&str>,
payload: &[u8],
op: Option<&str>,
) -> async_nats::Message {
let headers = op.map(|op| {
let mut h = async_nats::HeaderMap::new();
h.insert("KV-Operation", op);
h
});
async_nats::Message {
subject: subject.to_string().into(),
reply: reply.map(|r| r.to_string().into()),
payload: payload.to_vec().into(),
headers,
status: None,
description: None,
length: 0,
}
}
const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
#[test]
fn kv_message_decodes_put_without_operation_header() {
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
KvUpdate::Put(e) => {
assert_eq!(e.key, "node.a");
assert_eq!(e.value, b"v1");
assert_eq!(e.version.as_u64(), Some(42));
}
other => panic!("expected Put, got {other:?}"),
}
}
#[test]
fn kv_message_decodes_delete_and_purge_markers() {
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
assert!(matches!(
kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
));
let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
assert!(matches!(
kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
KvUpdate::Purge { ref key, .. } if key == "node.a"
));
}
#[test]
fn kv_message_outside_keyspace_is_skipped() {
let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
}
#[test]
fn kv_message_without_reply_gets_unknown_version() {
let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
KvUpdate::Put(e) => {
assert!(e.version.is_unknown());
assert_eq!(e.version.as_u64(), None);
}
other => panic!("expected Put, got {other:?}"),
}
}
#[test]
fn raw_create_already_exists_when_10058_in_code_field() {
let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
assert_eq!(
classify_raw_create_response(payload),
RawCreateOutcome::AlreadyExists
);
}
#[test]
fn raw_create_error_without_code_defaults_to_zero() {
let payload = br#"{"error":{"description":"mystery"}}"#;
match classify_raw_create_response(payload) {
RawCreateOutcome::Failed { code, description } => {
assert_eq!(code, 0);
assert_eq!(description, "mystery");
}
other => panic!("expected Failed, got {other:?}"),
}
}
}
#[cfg(test)]
mod floor_guard_tests {
use super::*;
use std::process::{Child, Command, Stdio};
struct TestServer {
child: Child,
url: String,
_dir: tempfile::TempDir,
}
impl Drop for TestServer {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
async fn start_server() -> TestServer {
let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".into());
let port = std::net::TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap()
.port();
let dir = tempfile::tempdir().unwrap();
let child = Command::new(&bin)
.args([
"--jetstream",
"--addr",
"127.0.0.1",
"--port",
&port.to_string(),
"--store_dir",
dir.path().to_str().unwrap(),
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap_or_else(|e| panic!("spawn {bin}: {e}; run `mise install`"));
let server = TestServer {
child,
url: format!("nats://127.0.0.1:{port}"),
_dir: dir,
};
for _ in 0..100 {
if async_nats::connect(&server.url).await.is_ok() {
return server;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("nats-server never became ready");
}
async fn seeded_bucket(
url: &str,
) -> (
async_nats::jetstream::Context,
async_nats::jetstream::kv::Store,
) {
let client = async_nats::connect(url).await.unwrap();
let js = async_nats::jetstream::new(client);
let kv = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "guard".into(),
history: 1,
..Default::default()
})
.await
.unwrap();
for i in 1..=5u8 {
kv.put(format!("k{i}"), vec![i].into()).await.unwrap();
}
(js, kv)
}
#[tokio::test(flavor = "multi_thread")]
async fn purge_reclaims_bytes() {
use crate::kv::KvPurge;
let server = start_server().await;
let client = async_nats::connect(&server.url).await.unwrap();
let js = async_nats::jetstream::new(client);
let kv = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "purge".into(),
history: 1,
..Default::default()
})
.await
.unwrap();
let val = vec![b'x'; 4096];
for i in 0..50u32 {
kv.put(format!("k{i}"), val.clone().into()).await.unwrap();
}
let before = js
.get_stream("KV_purge")
.await
.unwrap()
.info()
.await
.unwrap()
.state
.bytes;
let writer = NatsKvWriterImpl { kv: kv.clone() };
for i in 0..25u32 {
writer.purge(&format!("k{i}")).await.unwrap();
}
let mut after = before;
for _ in 0..20 {
after = js
.get_stream("KV_purge")
.await
.unwrap()
.info()
.await
.unwrap()
.state
.bytes;
if after < before {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
after < before,
"purge must reclaim bytes: before={before} after={after}"
);
writer.purge("k0").await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn gapped_delivery_with_advanced_floor_trips() {
let server = start_server().await;
let (js, kv) = seeded_bucket(&server.url).await;
let mut stream = js.get_stream("KV_guard").await.unwrap();
stream.purge().sequence(4).await.unwrap();
assert_eq!(stream.info().await.unwrap().state.first_sequence, 4);
let watch = kv.watch_all_from_revision(2).await.unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
let err = tokio::time::timeout(
Duration::from_secs(5),
stream_watch_floor_guarded(watch, &tx, 1, &js, "guard"),
)
.await
.expect("the trip must be IN-BAND (immediate), not backstop-paced")
.expect_err("a gapped delivery over an advanced floor must trip");
assert!(
err.to_string().contains("retention overran live watch"),
"{err}"
);
drop(tx);
let _ = drain.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn benign_interior_gap_passes() {
let server = start_server().await;
let (js, kv) = seeded_bucket(&server.url).await;
kv.put("k2", vec![22].into()).await.unwrap();
kv.put("k3", vec![33].into()).await.unwrap();
let mut stream = js.get_stream("KV_guard").await.unwrap();
assert_eq!(stream.info().await.unwrap().state.first_sequence, 1);
let watch = kv.watch_all_from_revision(2).await.unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
let guard =
tokio::spawn(
async move { stream_watch_floor_guarded(watch, &tx, 1, &js, "guard").await },
);
let mut got = Vec::new();
while got.len() < 4 {
let update = tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("deliveries continue past benign gaps")
.expect("watch alive");
got.push(update.version().as_u64().unwrap());
}
assert_eq!(got, vec![4, 5, 6, 7], "interior gaps jumped, tail dense");
guard.abort(); }
}