slipstream/nats.rs
1use async_nats::jetstream::kv::Store;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7};
8use std::time::Duration;
9use tokio::sync::{RwLock, mpsc::Sender};
10use tracing::{debug, error, info, warn};
11
12use crate::kv::{
13 KvEntry, KvError, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
14};
15use crate::stores::{Connection, ConnectionCapabilities, KvStore, StoreConfig};
16
17/// Default per-operation timeout for NATS KV ops. async-nats's request/response
18/// futures don't fail in-flight requests when the underlying TCP connection
19/// goes half-dead (CLOSE_WAIT) — they just await forever. Without a timeout
20/// here, ANY hung NATS connection translates into a tokio runtime deadlock as
21/// soon as enough callers queue behind the dead connection. 30s is generous
22/// for legitimate slow ops (cold JetStream stream sync, leader election under
23/// load) and short enough that a dead connection recovers within reasonable
24/// human-debug latency.
25const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
26
27/// Server-side inactivity reaper for the ephemeral consumers `scan()`/`keys()`
28/// create. Our code deletes each consumer explicitly when the drain finishes,
29/// but that delete is best-effort: on a half-dead (CLOSE_WAIT) connection it
30/// times out, orphaning the consumer server-side where it counts against the
31/// per-stream consumer limit. Setting `inactive_threshold` makes JetStream reap
32/// any consumer that sees no activity for this long, so a failed explicit delete
33/// self-heals instead of accumulating until the limit is hit. Comfortably longer
34/// than [`KV_OP_TIMEOUT`] so it never reaps a legitimately slow in-flight drain
35/// (each delivery resets the inactivity timer).
36const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
37
38/// Run a future under [`KV_OP_TIMEOUT`], returning [`KvError::Timeout`] if it
39/// doesn't complete in time. Preserves the inner future's `Result` so callers
40/// keep their existing error-mapping logic.
41async fn timed<F, T>(fut: F) -> Result<T, KvError>
42where
43 F: std::future::Future<Output = T>,
44{
45 tokio::time::timeout(KV_OP_TIMEOUT, fut)
46 .await
47 .map_err(|_| KvError::Timeout)
48}
49
50/// Build NATS connect options (with auth applied) and resolve the URL to dial.
51///
52/// Split out from [`nats_connect`] so the connection lifecycle can attach an
53/// event callback (for health tracking) to the *same* options before dialing,
54/// without duplicating the auth-priority logic. Returns the options plus the URL
55/// to connect to (which may differ from `url` when credentials are stripped out
56/// of a `user:pass@host` URL).
57///
58/// Auth priority (first match wins):
59/// 1. Inline credentials (base64-encoded .creds content)
60/// 2. Credentials file (if provided)
61/// 3. URL-embedded credentials (user:pass@host)
62/// 4. No authentication
63async fn build_connect_options(
64 url: &str,
65 creds: Option<&str>,
66 creds_file: Option<&str>,
67) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
68 // Priority 1: Inline credentials (base64-encoded)
69 if let Some(encoded) = creds {
70 use base64::Engine;
71 let decoded = base64::engine::general_purpose::STANDARD
72 .decode(encoded)
73 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
74 let content = String::from_utf8(decoded)
75 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
76 return Ok((
77 async_nats::ConnectOptions::with_credentials(&content)?,
78 url.to_string(),
79 ));
80 }
81
82 // Priority 2: Credentials file
83 if let Some(path) = creds_file {
84 return Ok((
85 async_nats::ConnectOptions::with_credentials_file(path).await?,
86 url.to_string(),
87 ));
88 }
89
90 // Priority 3: URL-embedded credentials
91 if let Ok(parsed) = url::Url::parse(url)
92 && !parsed.username().is_empty()
93 {
94 let user = parsed.username().to_string();
95 let pass = parsed.password().unwrap_or("").to_string();
96 // Rebuild URL without credentials. If the scheme doesn't support
97 // userinfo, these calls fail and the credentials would remain embedded
98 // in the URL we later log — warn loudly rather than silently leak them.
99 let mut clean_url = parsed.clone();
100 if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
101 warn!("could not strip credentials from NATS URL; they may appear in logs");
102 }
103 return Ok((
104 async_nats::ConnectOptions::with_user_and_password(user, pass),
105 clean_url.as_str().to_string(),
106 ));
107 }
108
109 // Priority 4: No authentication
110 Ok((async_nats::ConnectOptions::new(), url.to_string()))
111}
112
113/// Connect to NATS with various authentication methods.
114///
115/// Supports the auth-priority order documented on [`build_connect_options`].
116/// This is the standalone helper; the [`Connection`] impl builds options the
117/// same way but also installs a health-tracking event callback.
118pub async fn nats_connect(
119 url: &str,
120 creds: Option<&str>,
121 creds_file: Option<&str>,
122) -> Result<async_nats::Client, async_nats::ConnectError> {
123 let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
124 opts.connect(dial_url).await
125}
126
127/// Render an untrusted server payload for logging: borrowed as-is when valid
128/// UTF-8, lowercase hex otherwise. `from_utf8_lossy` would mash every invalid
129/// byte into U+FFFD — exactly the bytes an incident needs to see — so the
130/// fallback preserves them losslessly instead.
131fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
132 match std::str::from_utf8(payload) {
133 Ok(s) => std::borrow::Cow::Borrowed(s),
134 Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
135 }
136}
137
138/// Configuration for NATS connection.
139///
140/// `Debug` is hand-written, not derived: `creds` holds decoded credential
141/// material and `creds_file` a filesystem path to secrets. A derived `Debug`
142/// would print both verbatim the moment anyone `{:?}`-formats the config (a
143/// `tracing` span, an error context, a test dump), leaking credentials into
144/// logs. The redacting impl below keeps that from being one careless format
145/// string away.
146#[derive(Clone)]
147pub struct NatsConnectionConfig {
148 pub url: String,
149 /// Base64-encoded .creds file content (for ECS / containerized environments).
150 pub creds: Option<String>,
151 /// Path to .creds file on disk (for bare-metal / local development).
152 pub creds_file: Option<String>,
153}
154
155impl std::fmt::Debug for NatsConnectionConfig {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("NatsConnectionConfig")
158 .field("url", &self.url)
159 // Presence, never content: enough to debug "are creds set?" without
160 // ever rendering the secret itself. The same applies to `creds_file`:
161 // a path like `/run/secrets/prod.creds` leaks the secrets layout into
162 // logs/traces, so redact it to presence too.
163 .field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
164 .field(
165 "creds_file",
166 &self.creds_file.as_ref().map(|_| "[redacted]"),
167 )
168 .finish()
169 }
170}
171
172/// Create a KV bucket using raw JetStream API (bypasses async-nats response parsing issues).
173///
174/// Synadia Cloud returns responses that `async_nats` can't parse. This function
175/// uses the raw JetStream API directly, bypassing the client's response deserialization.
176///
177/// `pub(crate)`: this is an internal Synadia Cloud workaround invoked by
178/// `get_or_create_bucket`, not a stable entry point. Exposing it would pin a
179/// vendor quirk into the crate's semver surface.
180pub(crate) async fn create_kv_bucket_raw(
181 client: &async_nats::Client,
182 bucket: &str,
183 max_bytes: i64,
184 history: i64,
185 max_age_nanos: i64,
186 num_replicas: usize,
187) -> Result<(), KvError> {
188 let stream_name = format!("KV_{}", bucket);
189 let subject = format!("$KV.{}.>", bucket);
190
191 // JetStream stream config for KV bucket
192 let config = serde_json::json!({
193 "name": stream_name,
194 "subjects": [subject],
195 "max_msgs_per_subject": history,
196 "max_bytes": max_bytes,
197 "max_age": max_age_nanos,
198 "storage": "file",
199 "allow_rollup_hdrs": true,
200 "deny_delete": false,
201 "deny_purge": false,
202 "allow_direct": true,
203 "discard": "new",
204 "num_replicas": num_replicas,
205 "retention": "limits"
206 });
207
208 let payload = serde_json::to_vec(&config)
209 .map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
210
211 let response = client
212 .request(
213 format!("$JS.API.STREAM.CREATE.{}", stream_name),
214 payload.into(),
215 )
216 .await
217 .map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
218
219 debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
220
221 match classify_raw_create_response(&response.payload) {
222 RawCreateOutcome::AlreadyExists => {
223 info!(bucket, "bucket already exists");
224 Ok(())
225 }
226 RawCreateOutcome::StreamLimit => {
227 info!(bucket, "stream limit reached, bucket may already exist");
228 Ok(())
229 }
230 RawCreateOutcome::Created => {
231 info!(bucket, "bucket created successfully via raw API");
232 Ok(())
233 }
234 RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
235 "JetStream error {}: {}",
236 code, description
237 ))),
238 }
239}
240
241/// Classification of a raw `$JS.API.STREAM.CREATE` response payload.
242///
243/// Separated from the I/O in [`create_kv_bucket_raw`] so the Synadia Cloud
244/// error-code logic — the reason this raw path exists — is unit-testable
245/// without a live NATS server.
246#[derive(Debug, PartialEq, Eq)]
247enum RawCreateOutcome {
248 /// No error in the response: the bucket was created.
249 Created,
250 /// `10058` — stream name already in use; the bucket exists. Non-fatal.
251 AlreadyExists,
252 /// `400` "maximum number of streams"; Synadia Cloud returns this at the
253 /// stream limit even when the bucket already exists. Non-fatal.
254 StreamLimit,
255 /// Any other JetStream error — fatal.
256 Failed { code: i64, description: String },
257}
258
259fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
260 // Unparseable payloads are treated as success: the caller re-verifies the
261 // bucket with a follow-up `get_key_value`, so a garbled body here does not
262 // mask a real failure. Warn so the fallback assumption is auditable — if the
263 // re-verify step is ever refactored away, this log is the breadcrumb.
264 //
265 // INVARIANT: this `Created`-on-garbage path is only sound because every
266 // caller re-verifies the bucket exists after `create_kv_bucket_raw` returns
267 // Ok. The sole caller — `get_or_create_bucket` — does so via the
268 // `timed(js.get_key_value(...))` immediately after the raw-create fallback.
269 // Do not remove that re-verify without making this path return `Failed`.
270 let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
271 warn!(
272 response = %payload_for_log(payload),
273 "unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
274 );
275 return RawCreateOutcome::Created;
276 };
277
278 let Some(err) = json.get("error") else {
279 return RawCreateOutcome::Created;
280 };
281
282 // JetStream splits its error codes: `code` is the HTTP-style status (400,
283 // 404, 500) while `err_code` carries the granular code (e.g. 10058). The
284 // already-exists code can surface in either field depending on the server
285 // (standard NATS puts 10058 in `err_code` with `code` = 400; some managed
286 // deployments echo it in `code`), so we accept it in either.
287 let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
288 let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
289 let description = err
290 .get("description")
291 .and_then(|d| d.as_str())
292 .unwrap_or("unknown error");
293
294 // 10058 = stream name already in use (bucket exists) - that's OK
295 if code == 10058 || err_code == 10058 {
296 return RawCreateOutcome::AlreadyExists;
297 }
298
299 // 400 "maximum number of streams reached" may also mean bucket exists
300 // (Synadia Cloud returns this when at stream limit but bucket exists)
301 if code == 400 && description.contains("maximum number of streams") {
302 return RawCreateOutcome::StreamLimit;
303 }
304
305 RawCreateOutcome::Failed {
306 code,
307 description: description.to_string(),
308 }
309}
310
311struct NatsHandle {
312 // Held to keep the NATS connection alive for the lifetime of the handle.
313 // The `jetstream` context clones an internal reference, but this field is
314 // the authoritative owner — dropping the handle drops the connection.
315 // `dead_code` because we never read it directly after construction.
316 #[allow(dead_code)]
317 client: async_nats::Client,
318 jetstream: async_nats::jetstream::Context,
319}
320
321/// NATS JetStream KV connection.
322pub struct NatsConnection {
323 config: NatsConnectionConfig,
324 handle: RwLock<Option<NatsHandle>>,
325 // Shared with the installed client's event callback so `is_healthy()`
326 // tracks real connection state (Connected/Disconnected) rather than staying
327 // pinned at its connect-time value. `Arc` because the callback outlives this
328 // struct's borrow — it runs on the client's event-loop task.
329 healthy: Arc<AtomicBool>,
330 // Set only for connections built via `from_client`, where no health-tracking
331 // event callback could be installed (the client was already connected).
332 // `is_healthy()` consults this client's live `connection_state()` instead of
333 // the callback-driven `healthy` flag. `None` for the `new()` + `connect()`
334 // path, whose flag is kept current by the installed event callback.
335 //
336 // `Some(_)` is also the marker that this connection borrows a caller-owned
337 // client: it carries no URL or credentials of its own (see `from_client`),
338 // so it cannot redial. `connect()` refuses to reconnect such a connection
339 // rather than dialing the empty config URL and surfacing a cryptic error.
340 state_probe: Option<async_nats::Client>,
341}
342
343impl NatsConnection {
344 pub fn new(config: NatsConnectionConfig) -> Self {
345 Self {
346 config,
347 handle: RwLock::new(None),
348 healthy: Arc::new(AtomicBool::new(false)),
349 state_probe: None,
350 }
351 }
352
353 /// Create a NatsConnection from an existing NATS client.
354 ///
355 /// This is useful when the caller already has a NATS connection and wants
356 /// to reuse it for KV stores instead of creating a new connection.
357 pub fn from_client(client: async_nats::Client) -> Self {
358 let jetstream = async_nats::jetstream::new(client.clone());
359 let config = NatsConnectionConfig {
360 url: String::new(), // Not used when pre-connected
361 creds: None,
362 creds_file: None,
363 };
364
365 // Clone a probe handle before the client moves into `NatsHandle`.
366 // `async_nats::Client` is cheap to clone (internally an `Arc`), and
367 // `connection_state()` just reads a watch channel — no I/O.
368 let state_probe = Some(client.clone());
369 let handle = NatsHandle { client, jetstream };
370
371 Self {
372 config,
373 handle: RwLock::new(Some(handle)),
374 // A pre-connected client carries no health-tracking callback (we
375 // didn't build its options), so `is_healthy()` reads the client's
376 // live `connection_state()` via `state_probe`. The flag below only
377 // gates explicit `shutdown()`.
378 healthy: Arc::new(AtomicBool::new(true)),
379 state_probe,
380 }
381 }
382
383 async fn get_or_create_bucket(
384 client: &async_nats::Client,
385 js: &async_nats::jetstream::Context,
386 config: &StoreConfig,
387 ) -> Result<Store, KvError> {
388 // Try to get existing bucket first. Bound the call so a slow/dead
389 // NATS connection at startup can't park the daemon's init thread
390 // forever — the rest of startup (HTTP listener bind, etc.) happens
391 // after this. Without the timeout, a single bad NATS round-trip
392 // here held HTTP bind for 30s+ in observed cases.
393 //
394 // A failure here (permission denied, JetStream disabled, timeout) is not
395 // necessarily fatal — the bucket may simply not exist yet, so we fall
396 // through to create. But surface the original error first: otherwise a
397 // later create failure (e.g. "permission denied on STREAM.CREATE") masks
398 // the real cause ("permission denied on STREAM.INFO") and makes the
399 // failure undebuggable under load.
400 match timed(js.get_key_value(&config.name)).await {
401 Ok(Ok(kv)) => return Ok(kv),
402 Ok(Err(e)) => {
403 debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
404 }
405 Err(_) => {
406 warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
407 }
408 }
409
410 // Bucket doesn't exist, create it
411 let mut kv_config = async_nats::jetstream::kv::Config {
412 bucket: config.name.clone(),
413 num_replicas: config.num_replicas.unwrap_or(1),
414 ..Default::default()
415 };
416
417 // Apply max_age (bucket-level TTL) if specified. `as_nanos()` is u128;
418 // saturate to i64::MAX rather than `as i64`, which would silently wrap a
419 // >292-year duration into a negative (and thus meaningless) TTL.
420 let max_age_nanos = if let Some(max_age) = config.max_age {
421 kv_config.max_age = max_age;
422 i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
423 } else {
424 0
425 };
426
427 // Apply max_history if specified. `i64::from` is lossless for a u32 and
428 // states the widening intent, where `as i64` would quietly mask a future
429 // type change that no longer fits.
430 let history = if let Some(history) = config.max_history {
431 let history = i64::from(history);
432 kv_config.history = history;
433 history
434 } else {
435 1
436 };
437
438 // Apply max_bytes if specified (required by Synadia Cloud)
439 let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); // Default 10MB for Synadia Cloud
440 kv_config.max_bytes = max_bytes;
441
442 // Try normal create first, fall back to raw API if it fails (Synadia Cloud compatibility)
443 match timed(js.create_key_value(kv_config)).await? {
444 Ok(kv) => Ok(kv),
445 Err(e) => {
446 warn!(
447 bucket = config.name,
448 error = ?e,
449 "create_key_value failed, trying raw JetStream API"
450 );
451
452 // Try raw JetStream API as fallback
453 create_kv_bucket_raw(
454 client,
455 &config.name,
456 max_bytes,
457 history,
458 max_age_nanos,
459 config.num_replicas.unwrap_or(1),
460 )
461 .await?;
462
463 // Re-verify the bucket exists. This upholds the INVARIANT in
464 // `classify_raw_create_response`: the raw path reports `Created`
465 // on an unparseable response, so this round-trip is what actually
466 // confirms the bucket — do not remove it.
467 timed(js.get_key_value(&config.name))
468 .await?
469 .map_err(|e| {
470 error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
471 KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
472 })
473 }
474 }
475 }
476}
477
478#[async_trait]
479impl Connection for NatsConnection {
480 async fn connect(&self) -> Result<(), KvError> {
481 // Fast path: skip if already connected.
482 if self.healthy.load(Ordering::Acquire) {
483 return Ok(());
484 }
485
486 // A `from_client` connection borrows a caller-owned client and kept no
487 // URL or credentials, so it cannot redial. Refuse here with an actionable
488 // message instead of dialing the empty config URL (which fails with an
489 // opaque parse/connect error). This is reachable only after `shutdown()`
490 // cleared the fast-path flag above — a live borrowed client short-circuits
491 // there. The caller must construct a `NatsConnection::new(config)` if it
492 // needs reconnect semantics.
493 if self.state_probe.is_some() {
494 return Err(KvError::ConnectionFailed(
495 "connection was built via NatsConnection::from_client and cannot \
496 reconnect (no URL or credentials retained); construct \
497 NatsConnection::new(config) for a reconnectable connection"
498 .to_string(),
499 ));
500 }
501
502 let (opts, dial_url) = build_connect_options(
503 &self.config.url,
504 self.config.creds.as_deref(),
505 self.config.creds_file.as_deref(),
506 )
507 .await
508 .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
509
510 // Drive `healthy` from the client's own connection events so it reflects
511 // reality through async-nats's transparent reconnects — without this the
512 // flag stays `true` straight through a NATS outage, and a readiness probe
513 // built on `is_healthy()` keeps routing traffic to a node that can't
514 // reach NATS.
515 //
516 // `installed` gates the callback: a caller that loses the connect race
517 // (see the double-check below) tears down its freshly built client, and
518 // that teardown fires `Disconnected`. Without the gate, the loser's drop
519 // would clobber the *winner's* `healthy` flag. Only the client we
520 // actually install ever flips `installed` to `true`, so the losers'
521 // callbacks are inert.
522 let installed = Arc::new(AtomicBool::new(false));
523 let cb_healthy = Arc::clone(&self.healthy);
524 let cb_installed = Arc::clone(&installed);
525 let opts = opts.event_callback(move |event| {
526 let cb_healthy = Arc::clone(&cb_healthy);
527 let cb_installed = Arc::clone(&cb_installed);
528 async move {
529 if !cb_installed.load(Ordering::Acquire) {
530 return;
531 }
532 match event {
533 async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
534 async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
535 _ => {}
536 }
537 }
538 });
539
540 let client = opts
541 .connect(dial_url)
542 .await
543 .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
544
545 let jetstream = async_nats::jetstream::new(client.clone());
546
547 let conn = NatsHandle { client, jetstream };
548
549 // Re-check under the write lock: a concurrent caller may have connected
550 // while we were awaiting the dial. If so, drop our freshly built handle
551 // (closing its connection) instead of replacing the live one, which would
552 // orphan a connection the first caller still believes is installed.
553 // Leaving `installed = false` keeps our about-to-drop client's teardown
554 // events from touching `healthy`.
555 let mut handle = self.handle.write().await;
556 if handle.is_some() {
557 return Ok(());
558 }
559 installed.store(true, Ordering::Release);
560 *handle = Some(conn);
561 self.healthy.store(true, Ordering::Release);
562
563 Ok(())
564 }
565
566 async fn shutdown(&self) -> Result<(), KvError> {
567 self.healthy.store(false, Ordering::Release);
568 *self.handle.write().await = None;
569 Ok(())
570 }
571
572 fn is_healthy(&self) -> bool {
573 // `healthy` is the shutdown gate for both construction paths: once
574 // `shutdown()` clears it the connection is down regardless of socket
575 // state, so check it first.
576 if !self.healthy.load(Ordering::Acquire) {
577 return false;
578 }
579 match &self.state_probe {
580 // `from_client`: no event callback could be installed, so consult the
581 // client's live connection state instead of a stale connect-time
582 // value. A dead or reconnecting socket reports Pending/Disconnected,
583 // so a readiness probe correctly sees the node as unhealthy. A
584 // borrowed client is never replaced (connect() refuses to reconnect
585 // it), so this probe never goes stale.
586 Some(client) => matches!(
587 client.connection_state(),
588 async_nats::connection::State::Connected
589 ),
590 // `new()` + `connect()`: `healthy` is kept current by the installed
591 // Connected/Disconnected event callback.
592 None => true,
593 }
594 }
595
596 async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
597 let config = StoreConfig {
598 name: name.to_string(),
599 ..Default::default()
600 };
601 self.store_with_config(config).await
602 }
603
604 async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
605 // Clone the client/jetstream out from under the read lock before the
606 // (up to 60s) bucket get-or-create. Holding the read guard across that
607 // await would block `shutdown()`'s `write().await` for the full
608 // duration, stalling graceful shutdown behind an in-flight store call.
609 let (client, js) = {
610 let conn = self.handle.read().await;
611 let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
612 (conn.client.clone(), conn.jetstream.clone())
613 };
614
615 let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
616
617 Ok(Arc::new(NatsKvStore {
618 name: config.name,
619 client,
620 js,
621 kv,
622 }))
623 }
624
625 fn capabilities(&self) -> ConnectionCapabilities {
626 ConnectionCapabilities {
627 streaming_watch: true,
628 prefix_watch: true,
629 // `KvTtl` is not implemented for the NATS backend yet (only `KvWriter`
630 // is vended by `writer()`), so advertising `ttl: true` would lead
631 // callers that branch on this flag down a path that can never
632 // succeed. Flip to `true` together with the `KvTtl` impl.
633 ttl: false,
634 cas: true,
635 transactions: false,
636 // 0 = unlimited from this layer's perspective: we impose no cap, but
637 // the NATS server still enforces its own max payload (~1MB by
638 // default). Callers that branch on this must not read 0 as "any size
639 // is safe" — an oversized value is rejected server-side at write time.
640 max_value_size: 0,
641 global_ordering: false,
642 }
643 }
644}
645
646struct NatsKvStore {
647 name: String,
648 kv: Store,
649 client: async_nats::Client,
650 js: async_nats::jetstream::Context,
651}
652
653impl KvStore for NatsKvStore {
654 fn name(&self) -> &str {
655 &self.name
656 }
657
658 fn reader(&self) -> Arc<dyn KvReader> {
659 Arc::new(NatsKvReader {
660 kv: self.kv.clone(),
661 client: self.client.clone(),
662 js: self.js.clone(),
663 bucket: self.name.clone(),
664 })
665 }
666
667 fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
668 Some(Arc::new(NatsKvWatcher {
669 kv: self.kv.clone(),
670 client: self.client.clone(),
671 js: self.js.clone(),
672 bucket: self.name.clone(),
673 }))
674 }
675
676 fn writer(&self) -> Option<Arc<dyn KvWriter>> {
677 Some(Arc::new(NatsKvWriterImpl {
678 kv: self.kv.clone(),
679 }))
680 }
681}
682
683struct NatsKvReader {
684 kv: Store,
685 client: async_nats::Client,
686 js: async_nats::jetstream::Context,
687 // The bucket name is known at construction (it's the store's name), so
688 // `consume_last_per_subject` builds its subject filters from this field
689 // instead of issuing a `kv.status()` round-trip per `scan()`/`keys()` call
690 // just to read it back from the server.
691 bucket: String,
692}
693
694#[async_trait]
695impl KvReader for NatsKvReader {
696 async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
697 // Empty value → treat as absent. This unifies a real stored `b""` and a
698 // `delete_with_version` tombstone (empty-value Put) under one "absent =
699 // None" contract, consistent with `scan()`/`keys()`. Callers needing
700 // zero-length semantics use `entry()`. See the `KvReader::get` trait doc.
701 match self.entry(key).await? {
702 Some(entry) if entry.value.is_empty() => Ok(None),
703 other => Ok(other),
704 }
705 }
706
707 async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
708 use async_nats::jetstream::kv::Operation;
709 // Use entry() instead of get() to access revision.
710 // Return Put entries even with empty values — delete_with_version
711 // writes empty bytes as a tombstone and callers need the version
712 // for CAS conflict detection. Only filter real Delete/Purge markers.
713 match timed(self.kv.entry(key)).await? {
714 Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
715 key: key.to_string(),
716 value: entry.value.to_vec(),
717 version: VersionToken::from_u64(entry.revision),
718 })),
719 Ok(Some(_)) => Ok(None), // Delete/Purge marker
720 Ok(None) => Ok(None),
721 Err(e) => Err(KvError::OperationFailed(e.to_string())),
722 }
723 }
724
725 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
726 debug!(prefix = %prefix, "listing keys with prefix");
727
728 let mut keys = Vec::new();
729 self.consume_last_per_subject(prefix, true, |msg, key| {
730 // Skip both real KV deletes and CAS tombstones (empty-value Puts
731 // written by delete_with_version). get()/scan() hide the latter, so
732 // keys() must too — otherwise a list-then-get returns phantom keys.
733 // With headers_only the payload is stripped, but NATS adds a
734 // `Nats-Msg-Size` header we use to detect the empty value.
735 if !is_kv_delete(&msg) && !is_empty_value(&msg) {
736 keys.push(key);
737 }
738 })
739 .await?;
740
741 debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
742 Ok(keys)
743 }
744
745 async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
746 let mut entries = Vec::new();
747 self.consume_last_per_subject(prefix, false, |msg, key| {
748 if !is_kv_delete(&msg) && !msg.payload.is_empty() {
749 // The KV revision is the stream sequence, carried in the JetStream
750 // ACK subject (the message's reply subject). A revision of 0 means
751 // we couldn't parse it; callers treat that as "unknown version".
752 let revision = msg
753 .reply
754 .as_deref()
755 .and_then(stream_sequence_from_ack)
756 .unwrap_or(0);
757
758 entries.push(KvEntry {
759 key,
760 value: msg.payload.to_vec(),
761 version: VersionToken::from_u64(revision),
762 });
763 }
764 })
765 .await?;
766
767 debug!(prefix = %prefix, entries = entries.len(), "scan complete");
768 Ok(entries)
769 }
770}
771
772/// Extract the stream sequence (== KV revision) from a JetStream ACK subject.
773///
774/// The ACK subject — delivered as a push message's reply subject — comes in two
775/// shapes, and the stream sequence sits at different offsets in each:
776///
777/// ```text
778/// legacy (9 tokens): $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
779/// modern (11–12): $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
780/// ```
781///
782/// The previous implementation took the *last* token, which is `num_pending`
783/// (typically 0 on the final delivery), not the sequence — corrupting the
784/// version on every scanned entry. We instead parse from the front, accounting
785/// for the optional `<domain>.<account>` prefix that modern servers prepend.
786fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
787 // The stream-seq field sits at index 5 (legacy) or 7 (modern), so we only
788 // ever read the first 8 tokens. Keep those in a stack array and count the
789 // remainder with the iterator — no heap `Vec`, which on a large `scan()`
790 // would be one allocation per delivered message.
791 let mut head = [""; 8];
792 let mut count = 0usize;
793 for (i, token) in reply.split('.').enumerate() {
794 if i < head.len() {
795 head[i] = token;
796 }
797 count += 1;
798 }
799 if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
800 return None;
801 }
802 // Legacy form has exactly 9 tokens with no domain/account; anything longer
803 // carries the two-token `<domain>.<account>` prefix, shifting fields right.
804 let stream_seq_idx = if count == 9 { 5 } else { 7 };
805 head[stream_seq_idx].parse::<u64>().ok()
806}
807
808/// Check if a NATS message represents a KV delete/purge operation.
809fn is_kv_delete(msg: &async_nats::Message) -> bool {
810 msg.headers
811 .as_ref()
812 .and_then(|h| h.get("KV-Operation"))
813 .is_some()
814}
815
816/// Check if a `headers_only` delivery carries an empty value (a CAS tombstone
817/// written by `delete_with_version`).
818///
819/// When a consumer is created with `headers_only`, NATS strips the body and adds
820/// a `Nats-Msg-Size` header with the original payload length. Size 0 means the
821/// stored value is empty, which `get()`/`scan()` treat as absent. Messages
822/// without the header (e.g. non-`headers_only` deliveries) are not classified as
823/// empty here — callers on that path inspect the payload directly instead.
824fn is_empty_value(msg: &async_nats::Message) -> bool {
825 msg.headers
826 .as_ref()
827 .and_then(|h| h.get("Nats-Msg-Size"))
828 .map(|v| v.as_str() == "0")
829 .unwrap_or(false)
830}
831
832impl NatsKvReader {
833 /// Subscribe to last-per-subject messages for a KV prefix, calling `on_msg`
834 /// for each delivered message. Handles the subscribe-first race workaround,
835 /// consumer lifecycle, and cleanup.
836 async fn consume_last_per_subject(
837 &self,
838 prefix: &str,
839 headers_only: bool,
840 mut on_msg: impl FnMut(async_nats::Message, String),
841 ) -> Result<(), KvError> {
842 use async_nats::jetstream::consumer::push;
843 use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
844
845 // The bucket name is known at construction, so the subject filters are
846 // built directly from `self.bucket` — no `kv.status()` round-trip just
847 // to read it back. Every *remaining* setup await below is still bounded
848 // by `timed()`: a half-dead NATS connection (CLOSE_WAIT) would otherwise
849 // park here before the per-message drain timer downstream ever starts,
850 // hanging scan()/keys() indefinitely — the same failure `timed()` guards
851 // on the write path.
852 let bucket = self.bucket.as_str();
853
854 let nats_filter = if prefix.is_empty() {
855 format!("$KV.{bucket}.>")
856 } else {
857 format!("$KV.{bucket}.{prefix}>")
858 };
859
860 // Work around async-nats <=0.46 subscribe-after-create race:
861 // subscribe to the inbox FIRST, then create the consumer.
862 let inbox = self.client.new_inbox();
863 let mut sub = timed(self.client.subscribe(inbox.clone()))
864 .await?
865 .map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
866
867 let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
868 .await?
869 .map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
870
871 let consumer = timed(stream.create_consumer(push::Config {
872 deliver_subject: inbox,
873 deliver_policy: DeliverPolicy::LastPerSubject,
874 filter_subject: nats_filter,
875 headers_only,
876 // This is a one-shot point-in-time drain — we never ack. Under
877 // the default `AckPolicy::Explicit`, JetStream stops delivering
878 // once `max_ack_pending` (default 1000) messages sit unacked,
879 // which would silently truncate scan()/keys() to the first ~1000
880 // keys (or stall waiting for deliveries that never come) on any
881 // larger bucket. `None` removes the ack-pending gate entirely.
882 ack_policy: AckPolicy::None,
883 // Safety net for the best-effort `delete_consumer` below: if that
884 // cleanup times out on a half-dead connection, JetStream still reaps
885 // this consumer after `CONSUMER_INACTIVE_THRESHOLD` of inactivity, so
886 // repeated timed-out scans can't pile orphaned consumers up against
887 // the per-stream limit.
888 inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
889 ..Default::default()
890 }))
891 .await?
892 .map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
893
894 let num_pending = consumer.cached_info().num_pending;
895
896 // Drain exactly `num_pending` messages, but bound each await: a half-dead
897 // connection (CLOSE_WAIT) would otherwise park this loop forever, the same
898 // failure `timed()` guards on the write path. On timeout we still fall
899 // through to consumer cleanup, then surface `Timeout`.
900 let mut timed_out = false;
901 if num_pending > 0 {
902 let mut delivered = 0u64;
903 let kv_prefix = format!("$KV.{bucket}.");
904
905 while delivered < num_pending {
906 match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
907 Ok(Some(msg)) => {
908 let key = msg
909 .subject
910 .strip_prefix(&kv_prefix)
911 .unwrap_or(msg.subject.as_str())
912 .to_string();
913
914 on_msg(msg, key);
915 delivered += 1;
916 }
917 Ok(None) => break, // subscription closed early
918 Err(_) => {
919 timed_out = true;
920 break;
921 }
922 }
923 }
924 }
925
926 // Clean up ephemeral consumer (best-effort), even on timeout — a stalled
927 // scan shouldn't also leak a server-side consumer. A leaked consumer
928 // lingers on the server and counts against per-stream limits, so surface
929 // failures in observability without failing the operation. Bound the
930 // delete with `timed()`: on the same half-dead (CLOSE_WAIT) connection
931 // that tripped the drain timeout above, an unbounded delete would re-park
932 // here forever, defeating the timeout recovery we just performed.
933 match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
934 Ok(Ok(_)) => {}
935 Ok(Err(e)) => {
936 // `warn!`, not `debug!`: a leaked ephemeral consumer lingers on
937 // the server and counts against per-stream limits. Under a flaky
938 // NATS connection every scan()/keys() leaks one, so this must be
939 // visible in default spans before the pile-up hits the limit.
940 warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
941 }
942 Err(_) => {
943 warn!("timed out deleting ephemeral consumer (best-effort)");
944 }
945 }
946
947 if timed_out {
948 return Err(KvError::Timeout);
949 }
950 Ok(())
951 }
952}
953
954/// Convert a NATS KV entry to a KvUpdate.
955///
956/// Takes the entry by value so the key `String` moves into the `KvUpdate`
957/// instead of allocating a fresh copy per watch event.
958fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
959 use async_nats::jetstream::kv::Operation;
960 let version = VersionToken::from_u64(entry.revision);
961 match entry.operation {
962 Operation::Put => KvUpdate::Put(KvEntry {
963 key: entry.key,
964 value: entry.value.to_vec(),
965 version,
966 }),
967 Operation::Delete => KvUpdate::Delete {
968 key: entry.key,
969 version,
970 },
971 Operation::Purge => KvUpdate::Purge {
972 key: entry.key,
973 version,
974 },
975 }
976}
977
978/// Stream updates from a NATS Watch into a channel until it ends or the receiver drops.
979async fn stream_watch(
980 mut watcher: async_nats::jetstream::kv::Watch,
981 tx: &Sender<KvUpdate>,
982) -> Result<(), KvError> {
983 while let Some(entry) = watcher.next().await {
984 match entry {
985 Ok(entry) => {
986 let update = nats_entry_to_kv_update(entry);
987 if tx.send(update).await.is_err() {
988 debug!("watch receiver closed");
989 break;
990 }
991 }
992 Err(e) => {
993 error!(error = %e, "NATS KV watch error");
994 return Err(KvError::WatchError(e.to_string()));
995 }
996 }
997 }
998 Ok(())
999}
1000
1001/// Check if a NATS watch error indicates the requested start sequence is
1002/// too old (compacted), meaning callers should fall back to a full watch.
1003///
1004/// async-nats has no granular error kind for this: `WatchErrorKind` is only
1005/// `InvalidKey`/`TimedOut`/`ConsumerCreate`/`Other`, and "start sequence too old"
1006/// arrives as `ConsumerCreate`/`Other` with the real reason buried in the source
1007/// error's *message*. So we substring-match the full error string — which already
1008/// includes the source, since `Error`'s `Display` renders `"{kind}: {source}"`.
1009///
1010/// Two deliberate choices make this robust to wording drift:
1011/// - We lowercase first, so a capitalization change in NATS/async-nats can't slip
1012/// past.
1013/// - Detection is biased toward `true`. A false positive only costs an
1014/// unnecessary (but always-safe) full `watch_all()` replay; a false negative
1015/// propagates `WatchError` and strands a caller that would otherwise recover.
1016///
1017/// If these messages ever change, `cursor_expired_matches_known_nats_error_strings`
1018/// is the canary that fails loudly on the next dependency bump.
1019fn is_cursor_expired_error(err: &str) -> bool {
1020 use std::sync::OnceLock;
1021 // One Aho-Corasick automaton over all needles: a single pass over the error
1022 // string regardless of how many needles accumulate as NATS versions reword
1023 // their messages, vs. one `windows()` scan per needle. Case-insensitivity is
1024 // baked into the automaton, so no lowercased copy is allocated either.
1025 static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
1026 MATCHER
1027 .get_or_init(|| {
1028 aho_corasick::AhoCorasick::builder()
1029 .ascii_case_insensitive(true)
1030 .build([
1031 "start sequence",
1032 "first sequence",
1033 "sequence not found",
1034 "too old",
1035 ])
1036 .expect("static needle set always compiles")
1037 })
1038 .is_match(err)
1039}
1040
1041struct NatsKvWatcher {
1042 kv: Store,
1043 // `watch_prefixes_from` has no async-nats equivalent (there is no
1044 // `watch_many_from_revision`), so it hand-builds the multi-filter ordered
1045 // consumer itself — which needs the raw client (inbox allocation), the
1046 // JetStream context (stream lookup), and the bucket name (subject filters),
1047 // same as the reader's scan path.
1048 client: async_nats::Client,
1049 js: async_nats::jetstream::Context,
1050 bucket: String,
1051}
1052
1053/// Decode a raw KV stream message (as delivered by a hand-built ordered push
1054/// consumer) into a [`KvUpdate`] — the same mapping `async-nats`'s `kv::Watch`
1055/// performs internally for the `watch_*` paths: key from the subject (stripping
1056/// the `$KV.{bucket}.` prefix), operation from the `KV-Operation` header
1057/// (absent = Put), revision from the stream sequence in the ACK reply subject.
1058///
1059/// Returns `None` for a subject outside the bucket's keyspace, which a
1060/// subject-filtered consumer should never deliver — skipped rather than
1061/// surfaced, matching `kv::Watch`'s behavior.
1062fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
1063 let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
1064 let revision = msg
1065 .reply
1066 .as_deref()
1067 .and_then(stream_sequence_from_ack)
1068 .unwrap_or(0);
1069 let version = VersionToken::from_u64(revision);
1070 let operation = msg
1071 .headers
1072 .as_ref()
1073 .and_then(|h| h.get("KV-Operation"))
1074 .map(|v| v.as_str());
1075 Some(match operation {
1076 Some("DEL") => KvUpdate::Delete { key, version },
1077 Some("PURGE") => KvUpdate::Purge { key, version },
1078 // No header (or an explicit "PUT") is a put — the common case carries
1079 // no KV-Operation header at all.
1080 _ => KvUpdate::Put(KvEntry {
1081 key,
1082 value: msg.payload.to_vec(),
1083 version,
1084 }),
1085 })
1086}
1087
1088#[async_trait]
1089impl KvWatcher for NatsKvWatcher {
1090 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1091 // `watch_with_history` (DeliverPolicy::LastPerSubject), NOT `watch_all`
1092 // (DeliverPolicy::New): the trait contract is state-sync — current value
1093 // of every key first, then live updates. async-nats's `watch_all` only
1094 // delivers messages published AFTER the consumer exists, which would
1095 // leave a no-cursor consumer empty until keys happen to change.
1096 //
1097 // Bound the watch *setup* with `timed()` for the same reason every KV op
1098 // is bounded: a half-dead (CLOSE_WAIT) NATS connection parks this await
1099 // forever instead of failing. The streaming drain in `stream_watch` is
1100 // intentionally unbounded (a watch is long-lived), but establishing it
1101 // must not be able to hang a reconnecting caller.
1102 let watcher = timed(self.kv.watch_with_history(">"))
1103 .await?
1104 .map_err(|e| KvError::WatchError(e.to_string()))?;
1105 stream_watch(watcher, &tx).await
1106 }
1107
1108 async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1109 // Use native NATS subject-based filtering. KV key "node.abc" maps to
1110 // subject "$KV.BUCKET.node.abc", and ">" is the multi-level wildcard.
1111 // `_with_history` for the same state-sync contract as `watch_all`.
1112 let nats_key = format!("{prefix}>");
1113 let watcher = timed(self.kv.watch_with_history(&nats_key))
1114 .await?
1115 .map_err(|e| KvError::WatchError(e.to_string()))?;
1116 stream_watch(watcher, &tx).await
1117 }
1118
1119 async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
1120 if prefixes.is_empty() {
1121 // Nothing to watch. Critically, do NOT fall through to `watch_many`
1122 // with an empty filter set — an unfiltered ordered consumer would
1123 // watch the WHOLE bucket, the opposite of a scoped watch.
1124 return Ok(());
1125 }
1126 // ONE multi-filter consumer for every prefix (NATS 2.10 `filter_subjects`)
1127 // rather than one consumer per prefix. `watch_many_with_history` builds a
1128 // single ordered push consumer with `filter_subjects = [{p}> ...]` and
1129 // yields the same `Entry` stream as `watch`, so `stream_watch` is reused
1130 // verbatim. This is the per-stream-consumer-count fix: a node scoped to N
1131 // prefixes costs 1 consumer, not N. `_with_history` for the same
1132 // state-sync contract as `watch_all`.
1133 let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
1134 let watcher = timed(self.kv.watch_many_with_history(keys))
1135 .await?
1136 .map_err(|e| KvError::WatchError(e.to_string()))?;
1137 stream_watch(watcher, &tx).await
1138 }
1139
1140 async fn watch_all_from(
1141 &self,
1142 cursor: &WatchCursor,
1143 tx: Sender<KvUpdate>,
1144 ) -> Result<(), KvError> {
1145 let revision = match cursor.as_u64() {
1146 Some(rev) if rev > 0 => rev,
1147 _ => return self.watch_all(tx).await,
1148 };
1149
1150 let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
1151 Ok(w) => w,
1152 Err(e) => {
1153 let err_str = e.to_string();
1154 if is_cursor_expired_error(&err_str) {
1155 warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
1156 return Err(KvError::CursorExpired);
1157 }
1158 return Err(KvError::WatchError(err_str));
1159 }
1160 };
1161
1162 info!(revision, "resumed watch from cursor");
1163 stream_watch(watcher, &tx).await
1164 }
1165
1166 async fn watch_prefix_from(
1167 &self,
1168 prefix: &str,
1169 cursor: &WatchCursor,
1170 tx: Sender<KvUpdate>,
1171 ) -> Result<(), KvError> {
1172 let revision = match cursor.as_u64() {
1173 Some(rev) if rev > 0 => rev,
1174 _ => return self.watch_prefix(prefix, tx).await,
1175 };
1176
1177 let nats_key = format!("{prefix}>");
1178 let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
1179 Ok(w) => w,
1180 Err(e) => {
1181 let err_str = e.to_string();
1182 if is_cursor_expired_error(&err_str) {
1183 warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
1184 return Err(KvError::CursorExpired);
1185 }
1186 return Err(KvError::WatchError(err_str));
1187 }
1188 };
1189
1190 info!(revision, prefix, "resumed prefix watch from cursor");
1191 stream_watch(watcher, &tx).await
1192 }
1193
1194 async fn watch_prefixes_from(
1195 &self,
1196 prefixes: &[&str],
1197 cursor: &WatchCursor,
1198 tx: Sender<KvUpdate>,
1199 ) -> Result<(), KvError> {
1200 use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
1201
1202 if prefixes.is_empty() {
1203 // Same guard as watch_prefixes: an empty filter set must not become
1204 // an unfiltered whole-bucket consumer.
1205 return Ok(());
1206 }
1207 let revision = match cursor.as_u64() {
1208 Some(rev) if rev > 0 => rev,
1209 _ => return self.watch_prefixes(prefixes, tx).await,
1210 };
1211
1212 // async-nats has `watch_many` (multi-filter) and `watch_from_revision`
1213 // (seek) but no combination of the two, so build the multi-filter
1214 // ordered push consumer ourselves — the exact consumer
1215 // `watch_many_with_deliver_policy` would build, with
1216 // `ByStartSequence(cursor+1)` for the delta seek. The ordered-consumer
1217 // machinery (gap detection, auto-recreate from the last delivered
1218 // sequence) comes with `OrderedConfig` for free.
1219 let bucket = self.bucket.as_str();
1220 let kv_prefix = format!("$KV.{bucket}.");
1221 let filter_subjects: Vec<String> = prefixes
1222 .iter()
1223 .map(|p| format!("{kv_prefix}{p}>"))
1224 .collect();
1225
1226 let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
1227 .await?
1228 .map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
1229
1230 let consumer = match timed(stream.create_consumer(push::OrderedConfig {
1231 deliver_subject: self.client.new_inbox(),
1232 description: Some("kv multi-prefix resume consumer".to_string()),
1233 filter_subjects,
1234 replay_policy: ReplayPolicy::Instant,
1235 deliver_policy: DeliverPolicy::ByStartSequence {
1236 start_sequence: revision + 1,
1237 },
1238 ..Default::default()
1239 }))
1240 .await?
1241 {
1242 Ok(c) => c,
1243 Err(e) => {
1244 // Same expiry classification as watch_all_from: a start sequence
1245 // the stream has compacted past surfaces as a consumer-create
1246 // error whose message names the sequence problem.
1247 let err_str = e.to_string();
1248 if is_cursor_expired_error(&err_str) {
1249 warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
1250 return Err(KvError::CursorExpired);
1251 }
1252 return Err(KvError::WatchError(err_str));
1253 }
1254 };
1255
1256 let mut messages = timed(consumer.messages())
1257 .await?
1258 .map_err(|e| KvError::WatchError(e.to_string()))?;
1259
1260 info!(
1261 revision,
1262 ?prefixes,
1263 "resumed multi-prefix watch from cursor"
1264 );
1265 while let Some(msg) = messages.next().await {
1266 match msg {
1267 Ok(msg) => {
1268 // A subject-filtered consumer only delivers in-keyspace
1269 // subjects; `None` here would be a server bug, skipped to
1270 // match kv::Watch's tolerance.
1271 let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
1272 continue;
1273 };
1274 if tx.send(update).await.is_err() {
1275 debug!("watch receiver closed");
1276 break;
1277 }
1278 }
1279 Err(e) => {
1280 error!(error = %e, "NATS KV multi-prefix watch error");
1281 return Err(KvError::WatchError(e.to_string()));
1282 }
1283 }
1284 }
1285 Ok(())
1286 }
1287}
1288
1289struct NatsKvWriterImpl {
1290 kv: Store,
1291}
1292
1293#[async_trait]
1294impl KvWriter for NatsKvWriterImpl {
1295 async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1296 let rev = timed(self.kv.put(key, value.to_vec().into()))
1297 .await?
1298 .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1299 Ok(VersionToken::from_u64(rev))
1300 }
1301
1302 async fn delete(&self, key: &str) -> Result<bool, KvError> {
1303 // NATS delete doesn't tell us if key existed, so we always return true
1304 timed(self.kv.delete(key))
1305 .await?
1306 .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1307 Ok(true)
1308 }
1309
1310 async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1311 use async_nats::jetstream::kv::CreateErrorKind;
1312 timed(self.kv.create(key, value.to_vec().into()))
1313 .await?
1314 .map(VersionToken::from_u64)
1315 .map_err(|e| {
1316 if e.kind() == CreateErrorKind::AlreadyExists {
1317 KvError::AlreadyExists
1318 } else {
1319 KvError::OperationFailed(e.to_string())
1320 }
1321 })
1322 }
1323
1324 async fn update(
1325 &self,
1326 key: &str,
1327 value: &[u8],
1328 expected: &VersionToken,
1329 ) -> Result<VersionToken, KvError> {
1330 use async_nats::jetstream::kv::UpdateErrorKind;
1331 let rev = expected.as_u64().ok_or_else(|| {
1332 KvError::OperationFailed("invalid version token for NATS update".into())
1333 })?;
1334 timed(self.kv.update(key, value.to_vec().into(), rev))
1335 .await?
1336 .map(VersionToken::from_u64)
1337 .map_err(|e| {
1338 if e.kind() == UpdateErrorKind::WrongLastRevision {
1339 KvError::RevisionMismatch
1340 } else {
1341 KvError::OperationFailed(e.to_string())
1342 }
1343 })
1344 }
1345
1346 async fn delete_with_version(
1347 &self,
1348 key: &str,
1349 expected: &VersionToken,
1350 ) -> Result<bool, KvError> {
1351 use async_nats::jetstream::kv::UpdateErrorKind;
1352 let rev = expected.as_u64().ok_or_else(|| {
1353 KvError::OperationFailed("invalid version token for NATS delete".into())
1354 })?;
1355 // Write empty value with CAS — logically deletes while preserving conflict detection
1356 timed(self.kv.update(key, Vec::new().into(), rev))
1357 .await?
1358 .map(|_| true)
1359 .map_err(|e| {
1360 if e.kind() == UpdateErrorKind::WrongLastRevision {
1361 KvError::RevisionMismatch
1362 } else {
1363 KvError::OperationFailed(e.to_string())
1364 }
1365 })
1366 }
1367}
1368
1369impl std::fmt::Debug for NatsConnection {
1370 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1371 f.debug_struct("NatsConnection")
1372 .field("url", &self.config.url)
1373 // `Acquire` to match every other read of `healthy` — a `Relaxed`
1374 // outlier here reads like a deliberate exception during an atomics
1375 // audit, and the fmt path is far too cold for the ordering to cost
1376 // anything.
1377 .field("healthy", &self.healthy.load(Ordering::Acquire))
1378 .finish()
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use super::*;
1385
1386 #[test]
1387 fn raw_create_success_has_no_error() {
1388 // A successful STREAM.CREATE echoes back the stream config, no "error".
1389 let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
1390 assert_eq!(
1391 classify_raw_create_response(payload),
1392 RawCreateOutcome::Created
1393 );
1394 }
1395
1396 #[test]
1397 fn raw_create_swallows_stream_already_exists() {
1398 // 10058 = stream name already in use → the bucket already exists, OK.
1399 let payload =
1400 br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
1401 assert_eq!(
1402 classify_raw_create_response(payload),
1403 RawCreateOutcome::AlreadyExists
1404 );
1405 }
1406
1407 #[test]
1408 fn raw_create_swallows_stream_limit() {
1409 // Synadia Cloud returns 400 + "maximum number of streams" at the limit,
1410 // but the bucket may already exist — treat as non-fatal.
1411 let payload =
1412 br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
1413 assert_eq!(
1414 classify_raw_create_response(payload),
1415 RawCreateOutcome::StreamLimit
1416 );
1417 }
1418
1419 #[test]
1420 fn raw_create_propagates_unknown_error() {
1421 // Any other JetStream error is fatal and must surface code + description.
1422 let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
1423 match classify_raw_create_response(payload) {
1424 RawCreateOutcome::Failed { code, description } => {
1425 assert_eq!(code, 403);
1426 assert_eq!(description, "insufficient permissions");
1427 }
1428 other => panic!("expected Failed, got {other:?}"),
1429 }
1430 }
1431
1432 #[test]
1433 fn raw_create_400_without_stream_limit_is_fatal() {
1434 // A bare 400 that isn't the stream-limit message must NOT be swallowed,
1435 // otherwise a genuine bad-config rejection would masquerade as success.
1436 let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
1437 match classify_raw_create_response(payload) {
1438 RawCreateOutcome::Failed { code, description } => {
1439 assert_eq!(code, 400);
1440 assert!(description.contains("invalid stream config"));
1441 }
1442 other => panic!("expected Failed, got {other:?}"),
1443 }
1444 }
1445
1446 #[test]
1447 fn raw_create_unparseable_payload_is_treated_as_success() {
1448 // The caller re-verifies with get_key_value, so a garbled body must not
1449 // be reported as a hard failure here.
1450 assert_eq!(
1451 classify_raw_create_response(b"not json at all"),
1452 RawCreateOutcome::Created
1453 );
1454 }
1455
1456 #[test]
1457 fn ack_subject_legacy_format() {
1458 // $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1459 let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1460 assert_eq!(stream_sequence_from_ack(reply), Some(42));
1461 }
1462
1463 #[test]
1464 fn ack_subject_modern_format_with_domain_and_account() {
1465 // $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1466 let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
1467 assert_eq!(stream_sequence_from_ack(reply), Some(42));
1468 }
1469
1470 #[test]
1471 fn ack_subject_modern_format_with_trailing_token() {
1472 // Some servers append a random trailing token (12 tokens total).
1473 let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
1474 assert_eq!(stream_sequence_from_ack(reply), Some(99));
1475 }
1476
1477 #[test]
1478 fn ack_subject_last_token_is_not_the_sequence() {
1479 // Regression guard: the final token is num_pending, never the sequence.
1480 // The old code returned this (0), corrupting every scanned entry's version.
1481 let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1482 assert_ne!(stream_sequence_from_ack(reply), Some(0));
1483 }
1484
1485 #[test]
1486 fn ack_subject_rejects_garbage() {
1487 assert_eq!(stream_sequence_from_ack(""), None);
1488 assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
1489 assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
1490 // Right shape, non-numeric sequence field.
1491 assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
1492 }
1493
1494 #[test]
1495 fn cursor_expired_matches_known_nats_error_strings() {
1496 // These substrings come from async-nats error messages. If the library
1497 // rewrites them, watch_all_from would return WatchError instead of
1498 // CursorExpired, breaking callers that fall back to watch_all() on expiry.
1499 assert!(is_cursor_expired_error(
1500 "consumer start sequence is too old"
1501 ));
1502 assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
1503 assert!(is_cursor_expired_error("sequence not found in stream"));
1504 // "too old" on its own (no "sequence" wording) must still be caught.
1505 assert!(is_cursor_expired_error("requested revision is too old"));
1506 // Case-insensitive: a capitalization change upstream must not slip past.
1507 assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
1508 assert!(!is_cursor_expired_error("connection refused"));
1509 assert!(!is_cursor_expired_error("permission denied"));
1510 assert!(!is_cursor_expired_error("stream not found"));
1511 }
1512
1513 fn raw_kv_msg(
1514 subject: &str,
1515 reply: Option<&str>,
1516 payload: &[u8],
1517 op: Option<&str>,
1518 ) -> async_nats::Message {
1519 let headers = op.map(|op| {
1520 let mut h = async_nats::HeaderMap::new();
1521 h.insert("KV-Operation", op);
1522 h
1523 });
1524 async_nats::Message {
1525 subject: subject.to_string().into(),
1526 reply: reply.map(|r| r.to_string().into()),
1527 payload: payload.to_vec().into(),
1528 headers,
1529 status: None,
1530 description: None,
1531 length: 0,
1532 }
1533 }
1534
1535 const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1536
1537 #[test]
1538 fn kv_message_decodes_put_without_operation_header() {
1539 // The common case: a put carries no KV-Operation header at all.
1540 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
1541 match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1542 KvUpdate::Put(e) => {
1543 assert_eq!(e.key, "node.a");
1544 assert_eq!(e.value, b"v1");
1545 assert_eq!(e.version.as_u64(), Some(42));
1546 }
1547 other => panic!("expected Put, got {other:?}"),
1548 }
1549 }
1550
1551 #[test]
1552 fn kv_message_decodes_delete_and_purge_markers() {
1553 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
1554 assert!(matches!(
1555 kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1556 KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
1557 ));
1558
1559 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
1560 assert!(matches!(
1561 kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1562 KvUpdate::Purge { ref key, .. } if key == "node.a"
1563 ));
1564 }
1565
1566 #[test]
1567 fn kv_message_outside_keyspace_is_skipped() {
1568 // A subject-filtered consumer should never deliver this; the decode
1569 // skips rather than mis-keys it.
1570 let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
1571 assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
1572 }
1573
1574 #[test]
1575 fn kv_message_without_reply_gets_revision_zero() {
1576 // No ACK reply subject → revision unparseable → 0, the same "unknown
1577 // version" convention scan() uses.
1578 let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
1579 match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1580 KvUpdate::Put(e) => assert_eq!(e.version.as_u64(), Some(0)),
1581 other => panic!("expected Put, got {other:?}"),
1582 }
1583 }
1584
1585 #[test]
1586 fn raw_create_already_exists_when_10058_in_code_field() {
1587 // Some Synadia Cloud deployments echo 10058 in `code` rather than
1588 // `err_code`. Both paths must return AlreadyExists, not Failed.
1589 let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
1590 assert_eq!(
1591 classify_raw_create_response(payload),
1592 RawCreateOutcome::AlreadyExists
1593 );
1594 }
1595
1596 #[test]
1597 fn raw_create_error_without_code_defaults_to_zero() {
1598 // Defensive: a malformed error object still classifies as Failed rather
1599 // than silently passing, with code defaulting to 0.
1600 let payload = br#"{"error":{"description":"mystery"}}"#;
1601 match classify_raw_create_response(payload) {
1602 RawCreateOutcome::Failed { code, description } => {
1603 assert_eq!(code, 0);
1604 assert_eq!(description, "mystery");
1605 }
1606 other => panic!("expected Failed, got {other:?}"),
1607 }
1608 }
1609}