slipstream/nats.rs
1use async_nats::jetstream::kv::Store;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7};
8use std::time::Duration;
9use tokio::sync::{RwLock, mpsc::Sender};
10use tracing::{debug, error, info, warn};
11
12use crate::kv::{
13 KvEntry, KvError, KvPurge, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
14};
15use crate::stores::{Connection, ConnectionCapabilities, DiscardPolicy, KvStore, StoreConfig};
16
17/// Default per-operation timeout for NATS KV ops. async-nats's request/response
18/// futures don't fail in-flight requests when the underlying TCP connection
19/// goes half-dead (CLOSE_WAIT) — they just await forever. Without a timeout
20/// here, ANY hung NATS connection translates into a tokio runtime deadlock as
21/// soon as enough callers queue behind the dead connection. 30s is generous
22/// for legitimate slow ops (cold JetStream stream sync, leader election under
23/// load) and short enough that a dead connection recovers within reasonable
24/// human-debug latency.
25const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
26
27/// Server-side inactivity reaper for the ephemeral consumers `scan()`/`keys()`
28/// create. Our code deletes each consumer explicitly when the drain finishes,
29/// but that delete is best-effort: on a half-dead (CLOSE_WAIT) connection it
30/// times out, orphaning the consumer server-side where it counts against the
31/// per-stream consumer limit. Setting `inactive_threshold` makes JetStream reap
32/// any consumer that sees no activity for this long, so a failed explicit delete
33/// self-heals instead of accumulating until the limit is hit. Comfortably longer
34/// than [`KV_OP_TIMEOUT`] so it never reaps a legitimately slow in-flight drain
35/// (each delivery resets the inactivity timer).
36const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
37
38/// Run a future under [`KV_OP_TIMEOUT`], returning [`KvError::Timeout`] if it
39/// doesn't complete in time. Preserves the inner future's `Result` so callers
40/// keep their existing error-mapping logic.
41async fn timed<F, T>(fut: F) -> Result<T, KvError>
42where
43 F: std::future::Future<Output = T>,
44{
45 tokio::time::timeout(KV_OP_TIMEOUT, fut)
46 .await
47 .map_err(|_| KvError::Timeout)
48}
49
50/// Build NATS connect options (with auth applied) and resolve the URL to dial.
51///
52/// Split out from [`nats_connect`] so the connection lifecycle can attach an
53/// event callback (for health tracking) to the *same* options before dialing,
54/// without duplicating the auth-priority logic. Returns the options plus the URL
55/// to connect to (which may differ from `url` when credentials are stripped out
56/// of a `user:pass@host` URL).
57///
58/// Auth priority (first match wins):
59/// 1. Inline credentials (base64-encoded .creds content)
60/// 2. Credentials file (if provided)
61/// 3. URL-embedded credentials (user:pass@host)
62/// 4. No authentication
63async fn build_connect_options(
64 url: &str,
65 creds: Option<&str>,
66 creds_file: Option<&str>,
67) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
68 // Priority 1: Inline credentials (base64-encoded)
69 if let Some(encoded) = creds {
70 use base64::Engine;
71 let decoded = base64::engine::general_purpose::STANDARD
72 .decode(encoded)
73 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
74 let content = String::from_utf8(decoded)
75 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
76 return Ok((
77 async_nats::ConnectOptions::with_credentials(&content)?,
78 url.to_string(),
79 ));
80 }
81
82 // Priority 2: Credentials file
83 if let Some(path) = creds_file {
84 return Ok((
85 async_nats::ConnectOptions::with_credentials_file(path).await?,
86 url.to_string(),
87 ));
88 }
89
90 // Priority 3: URL-embedded credentials
91 if let Ok(parsed) = url::Url::parse(url)
92 && !parsed.username().is_empty()
93 {
94 let user = parsed.username().to_string();
95 let pass = parsed.password().unwrap_or("").to_string();
96 // Rebuild URL without credentials. If the scheme doesn't support
97 // userinfo, these calls fail and the credentials would remain embedded
98 // in the URL we later log — warn loudly rather than silently leak them.
99 let mut clean_url = parsed.clone();
100 if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
101 warn!("could not strip credentials from NATS URL; they may appear in logs");
102 }
103 return Ok((
104 async_nats::ConnectOptions::with_user_and_password(user, pass),
105 clean_url.as_str().to_string(),
106 ));
107 }
108
109 // Priority 4: No authentication
110 Ok((async_nats::ConnectOptions::new(), url.to_string()))
111}
112
113/// Connect to NATS with various authentication methods.
114///
115/// Supports the auth-priority order documented on [`build_connect_options`].
116/// This is the standalone helper; the [`Connection`] impl builds options the
117/// same way but also installs a health-tracking event callback.
118pub async fn nats_connect(
119 url: &str,
120 creds: Option<&str>,
121 creds_file: Option<&str>,
122) -> Result<async_nats::Client, async_nats::ConnectError> {
123 let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
124 opts.connect(dial_url).await
125}
126
127/// Render an untrusted server payload for logging: borrowed as-is when valid
128/// UTF-8, lowercase hex otherwise. `from_utf8_lossy` would mash every invalid
129/// byte into U+FFFD — exactly the bytes an incident needs to see — so the
130/// fallback preserves them losslessly instead.
131fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
132 match std::str::from_utf8(payload) {
133 Ok(s) => std::borrow::Cow::Borrowed(s),
134 Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
135 }
136}
137
138/// Configuration for NATS connection.
139///
140/// `Debug` is hand-written, not derived: `creds` holds decoded credential
141/// material and `creds_file` a filesystem path to secrets. A derived `Debug`
142/// would print both verbatim the moment anyone `{:?}`-formats the config (a
143/// `tracing` span, an error context, a test dump), leaking credentials into
144/// logs. The redacting impl below keeps that from being one careless format
145/// string away.
146#[derive(Clone)]
147pub struct NatsConnectionConfig {
148 pub url: String,
149 /// Base64-encoded .creds file content (for ECS / containerized environments).
150 pub creds: Option<String>,
151 /// Path to .creds file on disk (for bare-metal / local development).
152 pub creds_file: Option<String>,
153}
154
155impl std::fmt::Debug for NatsConnectionConfig {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("NatsConnectionConfig")
158 .field("url", &self.url)
159 // Presence, never content: enough to debug "are creds set?" without
160 // ever rendering the secret itself. The same applies to `creds_file`:
161 // a path like `/run/secrets/prod.creds` leaks the secrets layout into
162 // logs/traces, so redact it to presence too.
163 .field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
164 .field(
165 "creds_file",
166 &self.creds_file.as_ref().map(|_| "[redacted]"),
167 )
168 .finish()
169 }
170}
171
172/// Create a KV bucket using raw JetStream API (bypasses async-nats response parsing issues).
173///
174/// Synadia Cloud returns responses that `async_nats` can't parse. This function
175/// uses the raw JetStream API directly, bypassing the client's response deserialization.
176///
177/// `pub(crate)`: this is an internal Synadia Cloud workaround invoked by
178/// `get_or_create_bucket`, not a stable entry point. Exposing it would pin a
179/// vendor quirk into the crate's semver surface.
180pub(crate) async fn create_kv_bucket_raw(
181 client: &async_nats::Client,
182 bucket: &str,
183 max_bytes: i64,
184 history: i64,
185 max_age_nanos: i64,
186 discard: DiscardPolicy,
187 num_replicas: usize,
188) -> Result<(), KvError> {
189 let stream_name = format!("KV_{}", bucket);
190 let subject = format!("$KV.{}.>", bucket);
191
192 // JetStream stream config for KV bucket
193 let config = serde_json::json!({
194 "name": stream_name,
195 "subjects": [subject],
196 "max_msgs_per_subject": history,
197 "max_bytes": max_bytes,
198 "max_age": max_age_nanos,
199 "storage": "file",
200 "allow_rollup_hdrs": true,
201 "deny_delete": false,
202 "deny_purge": false,
203 "allow_direct": true,
204 "discard": discard.as_nats(),
205 "num_replicas": num_replicas,
206 "retention": "limits"
207 });
208
209 let payload = serde_json::to_vec(&config)
210 .map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
211
212 let response = client
213 .request(
214 format!("$JS.API.STREAM.CREATE.{}", stream_name),
215 payload.into(),
216 )
217 .await
218 .map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
219
220 debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
221
222 match classify_raw_create_response(&response.payload) {
223 RawCreateOutcome::AlreadyExists => {
224 info!(bucket, "bucket already exists");
225 Ok(())
226 }
227 RawCreateOutcome::StreamLimit => {
228 info!(bucket, "stream limit reached, bucket may already exist");
229 Ok(())
230 }
231 RawCreateOutcome::Created => {
232 info!(bucket, "bucket created successfully via raw API");
233 Ok(())
234 }
235 RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
236 "JetStream error {}: {}",
237 code, description
238 ))),
239 }
240}
241
242/// Classification of a raw `$JS.API.STREAM.CREATE` response payload.
243///
244/// Separated from the I/O in [`create_kv_bucket_raw`] so the Synadia Cloud
245/// error-code logic — the reason this raw path exists — is unit-testable
246/// without a live NATS server.
247#[derive(Debug, PartialEq, Eq)]
248enum RawCreateOutcome {
249 /// No error in the response: the bucket was created.
250 Created,
251 /// `10058` — stream name already in use; the bucket exists. Non-fatal.
252 AlreadyExists,
253 /// `400` "maximum number of streams"; Synadia Cloud returns this at the
254 /// stream limit even when the bucket already exists. Non-fatal.
255 StreamLimit,
256 /// Any other JetStream error — fatal.
257 Failed { code: i64, description: String },
258}
259
260fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
261 // Unparseable payloads are treated as success: the caller re-verifies the
262 // bucket with a follow-up `get_key_value`, so a garbled body here does not
263 // mask a real failure. Warn so the fallback assumption is auditable — if the
264 // re-verify step is ever refactored away, this log is the breadcrumb.
265 //
266 // INVARIANT: this `Created`-on-garbage path is only sound because every
267 // caller re-verifies the bucket exists after `create_kv_bucket_raw` returns
268 // Ok. The sole caller — `get_or_create_bucket` — does so via the
269 // `timed(js.get_key_value(...))` immediately after the raw-create fallback.
270 // Do not remove that re-verify without making this path return `Failed`.
271 let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
272 warn!(
273 response = %payload_for_log(payload),
274 "unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
275 );
276 return RawCreateOutcome::Created;
277 };
278
279 let Some(err) = json.get("error") else {
280 return RawCreateOutcome::Created;
281 };
282
283 // JetStream splits its error codes: `code` is the HTTP-style status (400,
284 // 404, 500) while `err_code` carries the granular code (e.g. 10058). The
285 // already-exists code can surface in either field depending on the server
286 // (standard NATS puts 10058 in `err_code` with `code` = 400; some managed
287 // deployments echo it in `code`), so we accept it in either.
288 let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
289 let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
290 let description = err
291 .get("description")
292 .and_then(|d| d.as_str())
293 .unwrap_or("unknown error");
294
295 // 10058 = stream name already in use (bucket exists) - that's OK
296 if code == 10058 || err_code == 10058 {
297 return RawCreateOutcome::AlreadyExists;
298 }
299
300 // 400 "maximum number of streams reached" may also mean bucket exists
301 // (Synadia Cloud returns this when at stream limit but bucket exists)
302 if code == 400 && description.contains("maximum number of streams") {
303 return RawCreateOutcome::StreamLimit;
304 }
305
306 RawCreateOutcome::Failed {
307 code,
308 description: description.to_string(),
309 }
310}
311
312struct NatsHandle {
313 // Held to keep the NATS connection alive for the lifetime of the handle.
314 // The `jetstream` context clones an internal reference, but this field is
315 // the authoritative owner — dropping the handle drops the connection.
316 // Also read by `store_with_config`, which clones it out from under the
317 // handle lock.
318 client: async_nats::Client,
319 jetstream: async_nats::jetstream::Context,
320}
321
322/// NATS JetStream KV connection.
323pub struct NatsConnection {
324 config: NatsConnectionConfig,
325 handle: RwLock<Option<NatsHandle>>,
326 // Shared with the installed client's event callback so `is_healthy()`
327 // tracks real connection state (Connected/Disconnected) rather than staying
328 // pinned at its connect-time value. `Arc` because the callback outlives this
329 // struct's borrow — it runs on the client's event-loop task.
330 healthy: Arc<AtomicBool>,
331 // Set only for connections built via `from_client`, where no health-tracking
332 // event callback could be installed (the client was already connected).
333 // `is_healthy()` consults this client's live `connection_state()` instead of
334 // the callback-driven `healthy` flag. `None` for the `new()` + `connect()`
335 // path, whose flag is kept current by the installed event callback.
336 //
337 // `Some(_)` is also the marker that this connection borrows a caller-owned
338 // client: it carries no URL or credentials of its own (see `from_client`),
339 // so it cannot redial. `connect()` refuses to reconnect such a connection
340 // rather than dialing the empty config URL and surfacing a cryptic error.
341 state_probe: Option<async_nats::Client>,
342}
343
344impl NatsConnection {
345 pub fn new(config: NatsConnectionConfig) -> Self {
346 Self {
347 config,
348 handle: RwLock::new(None),
349 healthy: Arc::new(AtomicBool::new(false)),
350 state_probe: None,
351 }
352 }
353
354 /// Create a NatsConnection from an existing NATS client.
355 ///
356 /// This is useful when the caller already has a NATS connection and wants
357 /// to reuse it for KV stores instead of creating a new connection.
358 pub fn from_client(client: async_nats::Client) -> Self {
359 let jetstream = async_nats::jetstream::new(client.clone());
360 let config = NatsConnectionConfig {
361 url: String::new(), // Not used when pre-connected
362 creds: None,
363 creds_file: None,
364 };
365
366 // Clone a probe handle before the client moves into `NatsHandle`.
367 // `async_nats::Client` is cheap to clone (internally an `Arc`), and
368 // `connection_state()` just reads a watch channel — no I/O.
369 let state_probe = Some(client.clone());
370 let handle = NatsHandle { client, jetstream };
371
372 Self {
373 config,
374 handle: RwLock::new(Some(handle)),
375 // A pre-connected client carries no health-tracking callback (we
376 // didn't build its options), so `is_healthy()` reads the client's
377 // live `connection_state()` via `state_probe`. The flag below only
378 // gates explicit `shutdown()`.
379 healthy: Arc::new(AtomicBool::new(true)),
380 state_probe,
381 }
382 }
383
384 async fn get_or_create_bucket(
385 client: &async_nats::Client,
386 js: &async_nats::jetstream::Context,
387 config: &StoreConfig,
388 ) -> Result<Store, KvError> {
389 // Try to get existing bucket first. Bound the call so a slow/dead
390 // NATS connection at startup can't park the daemon's init thread
391 // forever — the rest of startup (HTTP listener bind, etc.) happens
392 // after this. Without the timeout, a single bad NATS round-trip
393 // here held HTTP bind for 30s+ in observed cases.
394 //
395 // A failure here (permission denied, JetStream disabled, timeout) is not
396 // necessarily fatal — the bucket may simply not exist yet, so we fall
397 // through to create. But surface the original error first: otherwise a
398 // later create failure (e.g. "permission denied on STREAM.CREATE") masks
399 // the real cause ("permission denied on STREAM.INFO") and makes the
400 // failure undebuggable under load.
401 match timed(js.get_key_value(&config.name)).await {
402 Ok(Ok(kv)) => return Ok(kv),
403 Ok(Err(e)) => {
404 debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
405 }
406 Err(_) => {
407 warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
408 }
409 }
410
411 // Bucket doesn't exist, create it
412 let mut kv_config = async_nats::jetstream::kv::Config {
413 bucket: config.name.clone(),
414 num_replicas: config.num_replicas.unwrap_or(1),
415 ..Default::default()
416 };
417
418 // Apply max_age (bucket-level TTL) if specified. `as_nanos()` is u128;
419 // saturate to i64::MAX rather than `as i64`, which would silently wrap a
420 // >292-year duration into a negative (and thus meaningless) TTL.
421 let max_age_nanos = if let Some(max_age) = config.max_age {
422 kv_config.max_age = max_age;
423 i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
424 } else {
425 0
426 };
427
428 // Apply max_history if specified. `i64::from` is lossless for a u32 and
429 // states the widening intent, where `as i64` would quietly mask a future
430 // type change that no longer fits.
431 let history = if let Some(history) = config.max_history {
432 let history = i64::from(history);
433 kv_config.history = history;
434 history
435 } else {
436 1
437 };
438
439 // Apply max_bytes if specified (required by Synadia Cloud)
440 let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); // Default 10MB for Synadia Cloud
441 kv_config.max_bytes = max_bytes;
442
443 // Try normal create first, fall back to raw API if it fails (Synadia Cloud compatibility).
444 //
445 // A TIMEOUT also takes the fallback, not an early return: the raw path
446 // exists for Synadia Cloud, which is the deployment most likely to be
447 // slow or distant — `?`-propagating the timeout here would skip the
448 // exact workaround built for it. If the connection is genuinely dead,
449 // the raw path's own `timed()` bounds surface that promptly anyway.
450 //
451 // EXCEPTION: `discard:old` cannot go through the normal path —
452 // async-nats' `kv::Config` exposes no discard field, so `create_key_value`
453 // would silently produce a `discard:new` bucket. Skip straight to the raw
454 // stream API (the only place the policy is expressible) for that case.
455 if config.discard == DiscardPolicy::New {
456 match timed(js.create_key_value(kv_config)).await {
457 Ok(Ok(kv)) => return Ok(kv),
458 Ok(Err(e)) => {
459 warn!(
460 bucket = config.name,
461 error = ?e,
462 "create_key_value failed, trying raw JetStream API"
463 );
464 }
465 Err(_) => {
466 warn!(
467 bucket = config.name,
468 timeout = ?KV_OP_TIMEOUT,
469 "create_key_value timed out, trying raw JetStream API"
470 );
471 }
472 }
473 }
474
475 // Raw JetStream API: the fallback for `discard:new`, and the ONLY path for
476 // `discard:old`.
477 create_kv_bucket_raw(
478 client,
479 &config.name,
480 max_bytes,
481 history,
482 max_age_nanos,
483 config.discard,
484 config.num_replicas.unwrap_or(1),
485 )
486 .await?;
487
488 // Re-verify the bucket exists. This upholds the INVARIANT in
489 // `classify_raw_create_response`: the raw path reports `Created`
490 // on an unparseable response, so this round-trip is what actually
491 // confirms the bucket — do not remove it.
492 timed(js.get_key_value(&config.name)).await?.map_err(|e| {
493 error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
494 KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
495 })
496 }
497}
498
499#[async_trait]
500impl Connection for NatsConnection {
501 async fn connect(&self) -> Result<(), KvError> {
502 // Fast path: skip if already connected.
503 if self.healthy.load(Ordering::Acquire) {
504 return Ok(());
505 }
506
507 // A `from_client` connection borrows a caller-owned client and kept no
508 // URL or credentials, so it cannot redial. Refuse here with an actionable
509 // message instead of dialing the empty config URL (which fails with an
510 // opaque parse/connect error). This is reachable only after `shutdown()`
511 // cleared the fast-path flag above — a live borrowed client short-circuits
512 // there. The caller must construct a `NatsConnection::new(config)` if it
513 // needs reconnect semantics.
514 if self.state_probe.is_some() {
515 return Err(KvError::ConnectionFailed(
516 "connection was built via NatsConnection::from_client and cannot \
517 reconnect (no URL or credentials retained); construct \
518 NatsConnection::new(config) for a reconnectable connection"
519 .to_string(),
520 ));
521 }
522
523 let (opts, dial_url) = build_connect_options(
524 &self.config.url,
525 self.config.creds.as_deref(),
526 self.config.creds_file.as_deref(),
527 )
528 .await
529 .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
530
531 // Drive `healthy` from the client's own connection events so it reflects
532 // reality through async-nats's transparent reconnects — without this the
533 // flag stays `true` straight through a NATS outage, and a readiness probe
534 // built on `is_healthy()` keeps routing traffic to a node that can't
535 // reach NATS.
536 //
537 // `installed` gates the callback: a caller that loses the connect race
538 // (see the double-check below) tears down its freshly built client, and
539 // that teardown fires `Disconnected`. Without the gate, the loser's drop
540 // would clobber the *winner's* `healthy` flag. Only the client we
541 // actually install ever flips `installed` to `true`, so the losers'
542 // callbacks are inert.
543 let installed = Arc::new(AtomicBool::new(false));
544 let cb_healthy = Arc::clone(&self.healthy);
545 let cb_installed = Arc::clone(&installed);
546 let opts = opts.event_callback(move |event| {
547 let cb_healthy = Arc::clone(&cb_healthy);
548 let cb_installed = Arc::clone(&cb_installed);
549 async move {
550 if !cb_installed.load(Ordering::Acquire) {
551 return;
552 }
553 match event {
554 async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
555 async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
556 _ => {}
557 }
558 }
559 });
560
561 let client = opts
562 .connect(dial_url)
563 .await
564 .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
565
566 let jetstream = async_nats::jetstream::new(client.clone());
567
568 let conn = NatsHandle { client, jetstream };
569
570 // Re-check under the write lock: a concurrent caller may have connected
571 // while we were awaiting the dial. If so, drop our freshly built handle
572 // (closing its connection) instead of replacing the live one, which would
573 // orphan a connection the first caller still believes is installed.
574 // Leaving `installed = false` keeps our about-to-drop client's teardown
575 // events from touching `healthy`.
576 let mut handle = self.handle.write().await;
577 if handle.is_some() {
578 return Ok(());
579 }
580 installed.store(true, Ordering::Release);
581 *handle = Some(conn);
582 self.healthy.store(true, Ordering::Release);
583
584 Ok(())
585 }
586
587 async fn shutdown(&self) -> Result<(), KvError> {
588 self.healthy.store(false, Ordering::Release);
589 *self.handle.write().await = None;
590 Ok(())
591 }
592
593 fn is_healthy(&self) -> bool {
594 // `healthy` is the shutdown gate for both construction paths: once
595 // `shutdown()` clears it the connection is down regardless of socket
596 // state, so check it first.
597 if !self.healthy.load(Ordering::Acquire) {
598 return false;
599 }
600 match &self.state_probe {
601 // `from_client`: no event callback could be installed, so consult the
602 // client's live connection state instead of a stale connect-time
603 // value. A dead or reconnecting socket reports Pending/Disconnected,
604 // so a readiness probe correctly sees the node as unhealthy. A
605 // borrowed client is never replaced (connect() refuses to reconnect
606 // it), so this probe never goes stale.
607 Some(client) => matches!(
608 client.connection_state(),
609 async_nats::connection::State::Connected
610 ),
611 // `new()` + `connect()`: `healthy` is kept current by the installed
612 // Connected/Disconnected event callback.
613 None => true,
614 }
615 }
616
617 async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
618 let config = StoreConfig {
619 name: name.to_string(),
620 ..Default::default()
621 };
622 self.store_with_config(config).await
623 }
624
625 async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
626 // Clone the client/jetstream out from under the read lock before the
627 // (up to 60s) bucket get-or-create. Holding the read guard across that
628 // await would block `shutdown()`'s `write().await` for the full
629 // duration, stalling graceful shutdown behind an in-flight store call.
630 let (client, js) = {
631 let conn = self.handle.read().await;
632 let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
633 (conn.client.clone(), conn.jetstream.clone())
634 };
635
636 let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
637
638 Ok(Arc::new(NatsKvStore {
639 name: config.name,
640 client,
641 js,
642 kv,
643 }))
644 }
645
646 fn capabilities(&self) -> ConnectionCapabilities {
647 ConnectionCapabilities {
648 streaming_watch: true,
649 prefix_watch: true,
650 // `KvTtl` is not implemented for the NATS backend yet (only `KvWriter`
651 // is vended by `writer()`), so advertising `ttl: true` would lead
652 // callers that branch on this flag down a path that can never
653 // succeed. Flip to `true` together with the `KvTtl` impl.
654 ttl: false,
655 // Byte-reclaiming purge IS implemented for NATS (rollup delete) and
656 // vended via `KvStore::purge_writer`.
657 purge: true,
658 cas: true,
659 transactions: false,
660 // 0 = unlimited from this layer's perspective: we impose no cap, but
661 // the NATS server still enforces its own max payload (~1MB by
662 // default). Callers that branch on this must not read 0 as "any size
663 // is safe" — an oversized value is rejected server-side at write time.
664 max_value_size: 0,
665 global_ordering: false,
666 }
667 }
668}
669
670struct NatsKvStore {
671 name: String,
672 kv: Store,
673 client: async_nats::Client,
674 js: async_nats::jetstream::Context,
675}
676
677impl KvStore for NatsKvStore {
678 fn name(&self) -> &str {
679 &self.name
680 }
681
682 fn reader(&self) -> Arc<dyn KvReader> {
683 Arc::new(NatsKvReader {
684 kv: self.kv.clone(),
685 client: self.client.clone(),
686 js: self.js.clone(),
687 bucket: self.name.clone(),
688 })
689 }
690
691 fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
692 Some(Arc::new(NatsKvWatcher {
693 kv: self.kv.clone(),
694 client: self.client.clone(),
695 js: self.js.clone(),
696 bucket: self.name.clone(),
697 }))
698 }
699
700 fn writer(&self) -> Option<Arc<dyn KvWriter>> {
701 Some(Arc::new(NatsKvWriterImpl {
702 kv: self.kv.clone(),
703 }))
704 }
705
706 fn purge_writer(&self) -> Option<Arc<dyn KvPurge>> {
707 Some(Arc::new(NatsKvWriterImpl {
708 kv: self.kv.clone(),
709 }))
710 }
711}
712
713struct NatsKvReader {
714 kv: Store,
715 client: async_nats::Client,
716 js: async_nats::jetstream::Context,
717 // The bucket name is known at construction (it's the store's name), so
718 // `consume_last_per_subject` builds its subject filters from this field
719 // instead of issuing a `kv.status()` round-trip per `scan()`/`keys()` call
720 // just to read it back from the server.
721 bucket: String,
722}
723
724#[async_trait]
725impl KvReader for NatsKvReader {
726 async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
727 // Empty value → treat as absent. This unifies a real stored `b""` and a
728 // `delete_with_version` tombstone (empty-value Put) under one "absent =
729 // None" contract, consistent with `scan()`/`keys()`. Callers needing
730 // zero-length semantics use `entry()`. See the `KvReader::get` trait doc.
731 match self.entry(key).await? {
732 Some(entry) if entry.value.is_empty() => Ok(None),
733 other => Ok(other),
734 }
735 }
736
737 async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
738 use async_nats::jetstream::kv::Operation;
739 // Use entry() instead of get() to access revision.
740 // Return Put entries even with empty values — delete_with_version
741 // writes empty bytes as a tombstone and callers need the version
742 // for CAS conflict detection. Only filter real Delete/Purge markers.
743 match timed(self.kv.entry(key)).await? {
744 Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
745 key: key.to_string(),
746 value: entry.value.to_vec(),
747 version: VersionToken::from_u64(entry.revision),
748 })),
749 Ok(Some(_)) => Ok(None), // Delete/Purge marker
750 Ok(None) => Ok(None),
751 Err(e) => Err(KvError::OperationFailed(e.to_string())),
752 }
753 }
754
755 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
756 debug!(prefix = %prefix, "listing keys with prefix");
757
758 let mut keys = Vec::new();
759 self.consume_last_per_subject(prefix, true, |msg, key| {
760 // Skip both real KV deletes and CAS tombstones (empty-value Puts
761 // written by delete_with_version). get()/scan() hide the latter, so
762 // keys() must too — otherwise a list-then-get returns phantom keys.
763 // With headers_only the payload is stripped, but NATS adds a
764 // `Nats-Msg-Size` header we use to detect the empty value.
765 if !is_kv_delete(&msg) && !is_empty_value(&msg) {
766 keys.push(key);
767 }
768 })
769 .await?;
770
771 debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
772 Ok(keys)
773 }
774
775 async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
776 let mut entries = Vec::new();
777 self.consume_last_per_subject(prefix, false, |msg, key| {
778 if !is_kv_delete(&msg) && !msg.payload.is_empty() {
779 // The KV revision is the stream sequence, carried in the JetStream
780 // ACK subject (the message's reply subject). A revision of 0 means
781 // we couldn't parse it; callers treat that as "unknown version".
782 let revision = msg
783 .reply
784 .as_deref()
785 .and_then(stream_sequence_from_ack)
786 .unwrap_or(0);
787
788 entries.push(KvEntry {
789 key,
790 value: msg.payload.to_vec(),
791 version: VersionToken::from_u64(revision),
792 });
793 }
794 })
795 .await?;
796
797 debug!(prefix = %prefix, entries = entries.len(), "scan complete");
798 Ok(entries)
799 }
800}
801
802/// Extract the stream sequence (== KV revision) from a JetStream ACK subject.
803///
804/// The ACK subject — delivered as a push message's reply subject — comes in two
805/// shapes, and the stream sequence sits at different offsets in each:
806///
807/// ```text
808/// legacy (9 tokens): $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
809/// modern (11–12): $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
810/// ```
811///
812/// The previous implementation took the *last* token, which is `num_pending`
813/// (typically 0 on the final delivery), not the sequence — corrupting the
814/// version on every scanned entry. We instead parse from the front, accounting
815/// for the optional `<domain>.<account>` prefix that modern servers prepend.
816fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
817 // The stream-seq field sits at index 5 (legacy) or 7 (modern), so we only
818 // ever read the first 8 tokens. Keep those in a stack array and count the
819 // remainder with the iterator — no heap `Vec`, which on a large `scan()`
820 // would be one allocation per delivered message.
821 let mut head = [""; 8];
822 let mut count = 0usize;
823 for (i, token) in reply.split('.').enumerate() {
824 if i < head.len() {
825 head[i] = token;
826 }
827 count += 1;
828 }
829 if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
830 return None;
831 }
832 // Legacy form has exactly 9 tokens with no domain/account; anything longer
833 // carries the two-token `<domain>.<account>` prefix, shifting fields right.
834 let stream_seq_idx = if count == 9 { 5 } else { 7 };
835 head[stream_seq_idx].parse::<u64>().ok()
836}
837
838/// Check if a NATS message represents a KV delete/purge operation.
839fn is_kv_delete(msg: &async_nats::Message) -> bool {
840 msg.headers
841 .as_ref()
842 .and_then(|h| h.get("KV-Operation"))
843 .is_some()
844}
845
846/// Check if a `headers_only` delivery carries an empty value (a CAS tombstone
847/// written by `delete_with_version`).
848///
849/// When a consumer is created with `headers_only`, NATS strips the body and adds
850/// a `Nats-Msg-Size` header with the original payload length. Size 0 means the
851/// stored value is empty, which `get()`/`scan()` treat as absent. Messages
852/// without the header (e.g. non-`headers_only` deliveries) are not classified as
853/// empty here — callers on that path inspect the payload directly instead.
854fn is_empty_value(msg: &async_nats::Message) -> bool {
855 msg.headers
856 .as_ref()
857 .and_then(|h| h.get("Nats-Msg-Size"))
858 .map(|v| v.as_str() == "0")
859 .unwrap_or(false)
860}
861
862impl NatsKvReader {
863 /// Subscribe to last-per-subject messages for a KV prefix, calling `on_msg`
864 /// for each delivered message. Handles the subscribe-first race workaround,
865 /// consumer lifecycle, and cleanup.
866 async fn consume_last_per_subject(
867 &self,
868 prefix: &str,
869 headers_only: bool,
870 mut on_msg: impl FnMut(async_nats::Message, String),
871 ) -> Result<(), KvError> {
872 use async_nats::jetstream::consumer::push;
873 use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
874
875 // The bucket name is known at construction, so the subject filters are
876 // built directly from `self.bucket` — no `kv.status()` round-trip just
877 // to read it back. Every *remaining* setup await below is still bounded
878 // by `timed()`: a half-dead NATS connection (CLOSE_WAIT) would otherwise
879 // park here before the per-message drain timer downstream ever starts,
880 // hanging scan()/keys() indefinitely — the same failure `timed()` guards
881 // on the write path.
882 let bucket = self.bucket.as_str();
883
884 let nats_filter = if prefix.is_empty() {
885 format!("$KV.{bucket}.>")
886 } else {
887 format!("$KV.{bucket}.{prefix}>")
888 };
889
890 // Work around async-nats <=0.46 subscribe-after-create race:
891 // subscribe to the inbox FIRST, then create the consumer.
892 let inbox = self.client.new_inbox();
893 let mut sub = timed(self.client.subscribe(inbox.clone()))
894 .await?
895 .map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
896
897 let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
898 .await?
899 .map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
900
901 let consumer = timed(stream.create_consumer(push::Config {
902 deliver_subject: inbox,
903 deliver_policy: DeliverPolicy::LastPerSubject,
904 filter_subject: nats_filter,
905 headers_only,
906 // This is a one-shot point-in-time drain — we never ack. Under
907 // the default `AckPolicy::Explicit`, JetStream stops delivering
908 // once `max_ack_pending` (default 1000) messages sit unacked,
909 // which would silently truncate scan()/keys() to the first ~1000
910 // keys (or stall waiting for deliveries that never come) on any
911 // larger bucket. `None` removes the ack-pending gate entirely.
912 ack_policy: AckPolicy::None,
913 // Safety net for the best-effort `delete_consumer` below: if that
914 // cleanup times out on a half-dead connection, JetStream still reaps
915 // this consumer after `CONSUMER_INACTIVE_THRESHOLD` of inactivity, so
916 // repeated timed-out scans can't pile orphaned consumers up against
917 // the per-stream limit.
918 inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
919 ..Default::default()
920 }))
921 .await?
922 .map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
923
924 let num_pending = consumer.cached_info().num_pending;
925
926 // Drain exactly `num_pending` messages, but bound each await: a half-dead
927 // connection (CLOSE_WAIT) would otherwise park this loop forever, the same
928 // failure `timed()` guards on the write path. On timeout we still fall
929 // through to consumer cleanup, then surface `Timeout`.
930 let mut timed_out = false;
931 if num_pending > 0 {
932 let mut delivered = 0u64;
933 let kv_prefix = format!("$KV.{bucket}.");
934
935 while delivered < num_pending {
936 match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
937 Ok(Some(msg)) => {
938 let key = msg
939 .subject
940 .strip_prefix(&kv_prefix)
941 .unwrap_or(msg.subject.as_str())
942 .to_string();
943
944 on_msg(msg, key);
945 delivered += 1;
946 }
947 Ok(None) => break, // subscription closed early
948 Err(_) => {
949 timed_out = true;
950 break;
951 }
952 }
953 }
954 }
955
956 // Clean up ephemeral consumer (best-effort), even on timeout — a stalled
957 // scan shouldn't also leak a server-side consumer. A leaked consumer
958 // lingers on the server and counts against per-stream limits, so surface
959 // failures in observability without failing the operation. Bound the
960 // delete with `timed()`: on the same half-dead (CLOSE_WAIT) connection
961 // that tripped the drain timeout above, an unbounded delete would re-park
962 // here forever, defeating the timeout recovery we just performed.
963 match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
964 Ok(Ok(_)) => {}
965 Ok(Err(e)) => {
966 // `warn!`, not `debug!`: a leaked ephemeral consumer lingers on
967 // the server and counts against per-stream limits. Under a flaky
968 // NATS connection every scan()/keys() leaks one, so this must be
969 // visible in default spans before the pile-up hits the limit.
970 warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
971 }
972 Err(_) => {
973 warn!("timed out deleting ephemeral consumer (best-effort)");
974 }
975 }
976
977 if timed_out {
978 return Err(KvError::Timeout);
979 }
980 Ok(())
981 }
982}
983
984/// Convert a NATS KV entry to a KvUpdate.
985///
986/// Takes the entry by value so the key `String` moves into the `KvUpdate`
987/// instead of allocating a fresh copy per watch event.
988fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
989 use async_nats::jetstream::kv::Operation;
990 let version = VersionToken::from_u64(entry.revision);
991 match entry.operation {
992 Operation::Put => KvUpdate::Put(KvEntry {
993 key: entry.key,
994 value: entry.value.to_vec(),
995 version,
996 }),
997 Operation::Delete => KvUpdate::Delete {
998 key: entry.key,
999 version,
1000 },
1001 Operation::Purge => KvUpdate::Purge {
1002 key: entry.key,
1003 version,
1004 },
1005 }
1006}
1007
1008/// Stream updates from a NATS Watch into a channel until it ends or the receiver drops.
1009async fn stream_watch(
1010 mut watcher: async_nats::jetstream::kv::Watch,
1011 tx: &Sender<KvUpdate>,
1012) -> Result<(), KvError> {
1013 while let Some(entry) = watcher.next().await {
1014 match entry {
1015 Ok(entry) => {
1016 let update = nats_entry_to_kv_update(entry);
1017 if tx.send(update).await.is_err() {
1018 debug!("watch receiver closed");
1019 break;
1020 }
1021 }
1022 Err(e) => {
1023 error!(error = %e, "NATS KV watch error");
1024 return Err(KvError::WatchError(e.to_string()));
1025 }
1026 }
1027 }
1028 Ok(())
1029}
1030
1031/// Cadence of the floor guard's no-traffic backstop probe (one stream-info
1032/// RPC per interval per guarded watch). The PRIMARY detection is in-band —
1033/// the gapped-delivery check fires the moment evidence surfaces — so this
1034/// interval only bounds detection latency when NOTHING is being delivered;
1035/// it is not load-bearing for eventual detection.
1036const FLOOR_GUARD_INTERVAL: Duration = Duration::from_secs(30);
1037
1038/// [`stream_watch`] for the dense ALL-scope resume path, with the LIVE
1039/// retention floor guard (`tests/model_live_watch.rs` — the live twin of
1040/// [`NatsKvWatcher::check_resume_window`]).
1041///
1042/// The hazard: retention overrunning a live consumer makes JetStream
1043/// silently skip evicted messages — delete markers included — with no error
1044/// anywhere (the same clamp behavior as resumes, mid-stream). Unguarded,
1045/// that is PERMANENT silent fold divergence; the model proves it reachable.
1046///
1047/// Detection is primarily **in-band**: an unfiltered `ByStartSequence`
1048/// consumer sees every retained message, so a delivered revision that jumps
1049/// the frontier by more than one is evidence of eviction inside the gap.
1050/// The model checker REJECTED a periodic-only design with exactly the trace
1051/// this closes — deliveries can catch the frontier up past the gap between
1052/// probes, erasing the evidence — so the check runs AT the gapped delivery,
1053/// before the entry is processed: fetch `first_sequence` and apply the
1054/// shared kernel (`protocol::resume_window_ok`) to the frontier. A benign
1055/// gap (interior per-subject eviction with the floor still at or below the
1056/// frontier) passes; head eviction past the frontier fails the watch, and
1057/// the caller's restart routes into the verified resume → `CursorExpired` →
1058/// resync repair path. The periodic probe backstops the no-traffic case.
1059///
1060/// Scope: sound only where density holds — the unfiltered resume watch.
1061/// Prefix-scoped watches deliver sparse revisions by design and cannot
1062/// distinguish benign from hazardous eviction client-side; they retain the
1063/// (narrowed) retention-outlives-lag operating axiom plus the resume-time
1064/// check on every restart (model axiom 5).
1065///
1066/// The guarantee split, precisely: the SAFETY half — never folding past
1067/// unexamined evidence of loss — is unconditional in this loop (the gap
1068/// check precedes processing, and a stalled downstream stalls folding too).
1069/// The REPAIR half is conditional on the caller restarting the failed watch
1070/// (standard supervision; same posture as the resync fail-stop): a trip
1071/// with no restart is a loudly dead watch, never a silently wrong one.
1072async fn stream_watch_floor_guarded(
1073 mut watcher: async_nats::jetstream::kv::Watch,
1074 tx: &Sender<KvUpdate>,
1075 resume_revision: u64,
1076 js: &async_nats::jetstream::Context,
1077 bucket: &str,
1078) -> Result<(), KvError> {
1079 let stream_name = format!("KV_{bucket}");
1080 let first_sequence = || async {
1081 let stream = timed(js.get_stream(&stream_name))
1082 .await?
1083 .map_err(|e| KvError::OperationFailed(format!("floor guard stream lookup: {e}")))?;
1084 Ok::<u64, KvError>(stream.cached_info().state.first_sequence)
1085 };
1086 fn trip(frontier: u64, first: u64, bucket: &str) -> KvError {
1087 warn!(
1088 frontier,
1089 first_sequence = first,
1090 bucket,
1091 "stream retention overran this live watch; failing so the restart can resync \
1092 (messages in the gap were evicted unseen)"
1093 );
1094 KvError::WatchError(format!(
1095 "stream retention overran live watch (first_sequence {first} > delivered \
1096 frontier {frontier} + 1); restart will resync"
1097 ))
1098 }
1099
1100 let mut frontier = resume_revision;
1101 let mut backstop = tokio::time::interval(FLOOR_GUARD_INTERVAL);
1102 backstop.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1103 backstop.tick().await; // consume the immediate first tick
1104
1105 loop {
1106 tokio::select! {
1107 entry = watcher.next() => {
1108 let Some(entry) = entry else { break };
1109 match entry {
1110 Ok(entry) => {
1111 let revision = entry.revision;
1112 // In-band gap check BEFORE processing: never fold
1113 // past unexamined evidence of eviction.
1114 if revision > frontier.saturating_add(1) {
1115 let first = first_sequence().await?;
1116 if !crate::protocol::resume_window_ok(frontier, first) {
1117 return Err(trip(frontier, first, bucket));
1118 }
1119 // Benign interior gap: every evicted revision
1120 // below a still-low floor was a per-subject
1121 // overwrite, whose later revision the fold will
1122 // see — safe for last-write-wins.
1123 }
1124 frontier = frontier.max(revision);
1125 let update = nats_entry_to_kv_update(entry);
1126 if tx.send(update).await.is_err() {
1127 debug!("watch receiver closed");
1128 break;
1129 }
1130 }
1131 Err(e) => {
1132 error!(error = %e, "NATS KV watch error");
1133 return Err(KvError::WatchError(e.to_string()));
1134 }
1135 }
1136 }
1137 _ = backstop.tick() => {
1138 // No-traffic backstop: nothing is being delivered, so the
1139 // in-band check has no evidence to act on; probe the floor
1140 // directly.
1141 let first = first_sequence().await?;
1142 if !crate::protocol::resume_window_ok(frontier, first) {
1143 return Err(trip(frontier, first, bucket));
1144 }
1145 }
1146 }
1147 }
1148 Ok(())
1149}
1150
1151/// Check if a NATS watch error indicates the requested start sequence is
1152/// too old (compacted), meaning callers should fall back to a full watch.
1153///
1154/// SECOND line of defense only: live nats-server (2.14) does not error on a
1155/// below-head start sequence at all — it silently clamps to the first
1156/// retained message (pinned by `tests/resync.rs`), so the PRIMARY expiry
1157/// detection is [`NatsKvWatcher::check_resume_window`]'s proactive
1158/// `first_sequence` comparison. This matcher remains for server versions or
1159/// paths that do error, where mapping to [`KvError::CursorExpired`] keeps
1160/// the fallback reachable instead of stranding the caller.
1161///
1162/// async-nats has no granular error kind for this: `WatchErrorKind` is only
1163/// `InvalidKey`/`TimedOut`/`ConsumerCreate`/`Other`, and "start sequence too old"
1164/// arrives as `ConsumerCreate`/`Other` with the real reason buried in the source
1165/// error's *message*. So we substring-match the full error string — which already
1166/// includes the source, since `Error`'s `Display` renders `"{kind}: {source}"`.
1167///
1168/// Two deliberate choices make this robust to wording drift:
1169/// - We lowercase first, so a capitalization change in NATS/async-nats can't slip
1170/// past.
1171/// - Detection is biased toward `true`. A false positive only costs an
1172/// unnecessary (but always-safe) full `watch_all()` replay; a false negative
1173/// propagates `WatchError` and strands a caller that would otherwise recover.
1174///
1175/// If these messages ever change, `cursor_expired_matches_known_nats_error_strings`
1176/// is the canary that fails loudly on the next dependency bump.
1177fn is_cursor_expired_error(err: &str) -> bool {
1178 use std::sync::OnceLock;
1179 // One Aho-Corasick automaton over all needles: a single pass over the error
1180 // string regardless of how many needles accumulate as NATS versions reword
1181 // their messages, vs. one `windows()` scan per needle. Case-insensitivity is
1182 // baked into the automaton, so no lowercased copy is allocated either.
1183 static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
1184 MATCHER
1185 .get_or_init(|| {
1186 aho_corasick::AhoCorasick::builder()
1187 .ascii_case_insensitive(true)
1188 .build([
1189 "start sequence",
1190 "first sequence",
1191 "sequence not found",
1192 "too old",
1193 ])
1194 .expect("static needle set always compiles")
1195 })
1196 .is_match(err)
1197}
1198
1199struct NatsKvWatcher {
1200 kv: Store,
1201 // `watch_prefixes_from` has no async-nats equivalent (there is no
1202 // `watch_many_from_revision`), so it hand-builds the multi-filter ordered
1203 // consumer itself — which needs the raw client (inbox allocation), the
1204 // JetStream context (stream lookup), and the bucket name (subject filters),
1205 // same as the reader's scan path.
1206 client: async_nats::Client,
1207 js: async_nats::jetstream::Context,
1208 bucket: String,
1209}
1210
1211/// Decode a raw KV stream message (as delivered by a hand-built ordered push
1212/// consumer) into a [`KvUpdate`] — the same mapping `async-nats`'s `kv::Watch`
1213/// performs internally for the `watch_*` paths: key from the subject (stripping
1214/// the `$KV.{bucket}.` prefix), operation from the `KV-Operation` header
1215/// (absent = Put), revision from the stream sequence in the ACK reply subject.
1216///
1217/// Returns `None` for a subject outside the bucket's keyspace, which a
1218/// subject-filtered consumer should never deliver — skipped rather than
1219/// surfaced, matching `kv::Watch`'s behavior.
1220fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
1221 let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
1222 // An unparseable (or absent) ACK subject yields the UNKNOWN version, not a
1223 // fabricated revision 0: `from_u64(0)` is a *parseable* position that
1224 // `watch_applied` would adopt as its batch high-water — regressing the
1225 // persisted cursor to 0 and forcing a full replay on the next restart.
1226 // `unknown()` is the honest value; the cursor-authority loop skips it.
1227 let version = msg
1228 .reply
1229 .as_deref()
1230 .and_then(stream_sequence_from_ack)
1231 .map(VersionToken::from_u64)
1232 .unwrap_or_else(VersionToken::unknown);
1233 let operation = msg
1234 .headers
1235 .as_ref()
1236 .and_then(|h| h.get("KV-Operation"))
1237 .map(|v| v.as_str());
1238 Some(match operation {
1239 Some("DEL") => KvUpdate::Delete { key, version },
1240 Some("PURGE") => KvUpdate::Purge { key, version },
1241 // No header (or an explicit "PUT") is a put — the common case carries
1242 // no KV-Operation header at all.
1243 _ => KvUpdate::Put(KvEntry {
1244 key,
1245 value: msg.payload.to_vec(),
1246 version,
1247 }),
1248 })
1249}
1250
1251impl NatsKvWatcher {
1252 /// Proactive cursor-expiry detection, REQUIRED before any `*_from` resume.
1253 ///
1254 /// NATS does **not** error when an ordered consumer's `ByStartSequence`
1255 /// falls below the stream's first retained sequence — it silently delivers
1256 /// from the first available message (pinned against a live nats-server by
1257 /// `tests/resync.rs::nats_silently_clamps_resume_below_first_seq`). A
1258 /// silent clamp skips the gap's evicted messages — delete markers
1259 /// included — without ever taking the `CursorExpired` → resync path, so
1260 /// expiry MUST be detected by comparing the stream's `first_sequence`
1261 /// against the resume point before trusting the consumer. The
1262 /// error-string matching at the consumer-create sites stays as a second
1263 /// line of defense for server versions that do error.
1264 ///
1265 /// Why `first_sequence` is the right boundary: interior (per-subject
1266 /// history) eviction inside the gap is safe for a last-write-wins fold —
1267 /// an overwrite-evicted revision implies a LATER revision of the same
1268 /// subject exists and will be delivered. Lost *deletes* come from head
1269 /// eviction (stream limits/age), which is exactly what advances
1270 /// `first_sequence`. (An admin interior purge of a subject can also
1271 /// destroy a delete marker without moving the head — that is a manual
1272 /// destructive operation, same trust class as deleting the stream.)
1273 ///
1274 /// Head eviction racing the window between this check and consumer
1275 /// creation is the same exposure any live consumer has against
1276 /// aggressive retention; the check bounds the silent gap to that
1277 /// milliseconds-scale window, where the prior behavior left it unbounded.
1278 async fn check_resume_window(&self, revision: u64) -> Result<(), KvError> {
1279 let stream = timed(self.js.get_stream(format!("KV_{}", self.bucket)))
1280 .await?
1281 .map_err(|e| {
1282 KvError::OperationFailed(format!("get KV stream for resume check: {e}"))
1283 })?;
1284 let first = stream.cached_info().state.first_sequence;
1285 // The shared protocol kernel — the same guard the model checker's
1286 // Resume transition executes (`crate::protocol::resume_window_ok`).
1287 if !crate::protocol::resume_window_ok(revision, first) {
1288 warn!(
1289 revision,
1290 first_sequence = first,
1291 "resume cursor is below the stream's first retained sequence; cursor expired"
1292 );
1293 return Err(KvError::CursorExpired);
1294 }
1295 Ok(())
1296 }
1297}
1298
1299#[async_trait]
1300impl KvWatcher for NatsKvWatcher {
1301 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1302 // `watch_with_history` (DeliverPolicy::LastPerSubject), NOT `watch_all`
1303 // (DeliverPolicy::New): the trait contract is state-sync — current value
1304 // of every key first, then live updates. async-nats's `watch_all` only
1305 // delivers messages published AFTER the consumer exists, which would
1306 // leave a no-cursor consumer empty until keys happen to change.
1307 //
1308 // Bound the watch *setup* with `timed()` for the same reason every KV op
1309 // is bounded: a half-dead (CLOSE_WAIT) NATS connection parks this await
1310 // forever instead of failing. The streaming drain in `stream_watch` is
1311 // intentionally unbounded (a watch is long-lived), but establishing it
1312 // must not be able to hang a reconnecting caller.
1313 let watcher = timed(self.kv.watch_with_history(">"))
1314 .await?
1315 .map_err(|e| KvError::WatchError(e.to_string()))?;
1316 stream_watch(watcher, &tx).await
1317 }
1318
1319 async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1320 // Use native NATS subject-based filtering. KV key "node.abc" maps to
1321 // subject "$KV.BUCKET.node.abc", and ">" is the multi-level wildcard.
1322 // `_with_history` for the same state-sync contract as `watch_all`.
1323 let nats_key = format!("{prefix}>");
1324 let watcher = timed(self.kv.watch_with_history(&nats_key))
1325 .await?
1326 .map_err(|e| KvError::WatchError(e.to_string()))?;
1327 stream_watch(watcher, &tx).await
1328 }
1329
1330 async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
1331 if prefixes.is_empty() {
1332 // Nothing to watch. Critically, do NOT fall through to `watch_many`
1333 // with an empty filter set — an unfiltered ordered consumer would
1334 // watch the WHOLE bucket, the opposite of a scoped watch.
1335 return Ok(());
1336 }
1337 // ONE multi-filter consumer for every prefix (NATS 2.10 `filter_subjects`)
1338 // rather than one consumer per prefix. `watch_many_with_history` builds a
1339 // single ordered push consumer with `filter_subjects = [{p}> ...]` and
1340 // yields the same `Entry` stream as `watch`, so `stream_watch` is reused
1341 // verbatim. This is the per-stream-consumer-count fix: a node scoped to N
1342 // prefixes costs 1 consumer, not N. `_with_history` for the same
1343 // state-sync contract as `watch_all`.
1344 let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
1345 let watcher = timed(self.kv.watch_many_with_history(keys))
1346 .await?
1347 .map_err(|e| KvError::WatchError(e.to_string()))?;
1348 stream_watch(watcher, &tx).await
1349 }
1350
1351 async fn watch_all_from(
1352 &self,
1353 cursor: &WatchCursor,
1354 tx: Sender<KvUpdate>,
1355 ) -> Result<(), KvError> {
1356 let revision = match cursor.as_u64() {
1357 Some(rev) if rev > 0 => rev,
1358 _ => return self.watch_all(tx).await,
1359 };
1360 self.check_resume_window(revision).await?;
1361
1362 let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
1363 Ok(w) => w,
1364 Err(e) => {
1365 let err_str = e.to_string();
1366 if is_cursor_expired_error(&err_str) {
1367 warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
1368 return Err(KvError::CursorExpired);
1369 }
1370 return Err(KvError::WatchError(err_str));
1371 }
1372 };
1373 // Re-check AFTER the consumer exists: head eviction in the window
1374 // between the pre-flight check and consumer creation would otherwise
1375 // clamp silently.
1376 self.check_resume_window(revision).await?;
1377
1378 info!(revision, "resumed watch from cursor");
1379 // The LIVE floor guard takes over from here: in-band gapped-delivery
1380 // checks plus a no-traffic backstop, so retention overrunning this
1381 // watch mid-stream fail-stops into the restart→resync repair path
1382 // instead of silently skipping evicted deletes (model:
1383 // tests/model_live_watch.rs).
1384 stream_watch_floor_guarded(watcher, &tx, revision, &self.js, &self.bucket).await
1385 }
1386
1387 async fn watch_prefix_from(
1388 &self,
1389 prefix: &str,
1390 cursor: &WatchCursor,
1391 tx: Sender<KvUpdate>,
1392 ) -> Result<(), KvError> {
1393 let revision = match cursor.as_u64() {
1394 Some(rev) if rev > 0 => rev,
1395 _ => return self.watch_prefix(prefix, tx).await,
1396 };
1397 self.check_resume_window(revision).await?;
1398
1399 let nats_key = format!("{prefix}>");
1400 let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
1401 Ok(w) => w,
1402 Err(e) => {
1403 let err_str = e.to_string();
1404 if is_cursor_expired_error(&err_str) {
1405 warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
1406 return Err(KvError::CursorExpired);
1407 }
1408 return Err(KvError::WatchError(err_str));
1409 }
1410 };
1411 // Same post-create re-check as watch_all_from: close the
1412 // check→create eviction window.
1413 self.check_resume_window(revision).await?;
1414
1415 info!(revision, prefix, "resumed prefix watch from cursor");
1416 stream_watch(watcher, &tx).await
1417 }
1418
1419 async fn watch_prefixes_from(
1420 &self,
1421 prefixes: &[&str],
1422 cursor: &WatchCursor,
1423 tx: Sender<KvUpdate>,
1424 ) -> Result<(), KvError> {
1425 use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
1426
1427 if prefixes.is_empty() {
1428 // Same guard as watch_prefixes: an empty filter set must not become
1429 // an unfiltered whole-bucket consumer.
1430 return Ok(());
1431 }
1432 let revision = match cursor.as_u64() {
1433 Some(rev) if rev > 0 => rev,
1434 _ => return self.watch_prefixes(prefixes, tx).await,
1435 };
1436
1437 // async-nats has `watch_many` (multi-filter) and `watch_from_revision`
1438 // (seek) but no combination of the two, so build the multi-filter
1439 // ordered push consumer ourselves — the exact consumer
1440 // `watch_many_with_deliver_policy` would build, with
1441 // `ByStartSequence(cursor+1)` for the delta seek. The ordered-consumer
1442 // machinery (gap detection, auto-recreate from the last delivered
1443 // sequence) comes with `OrderedConfig` for free.
1444 let bucket = self.bucket.as_str();
1445 let kv_prefix = format!("$KV.{bucket}.");
1446 let filter_subjects: Vec<String> = prefixes
1447 .iter()
1448 .map(|p| format!("{kv_prefix}{p}>"))
1449 .collect();
1450
1451 let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
1452 .await?
1453 .map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
1454
1455 // Same proactive expiry detection as `check_resume_window` (NATS
1456 // silently clamps a below-head ByStartSequence; see that method's
1457 // docs) — checked on the stream handle this path already fetched,
1458 // via the shared protocol kernel.
1459 let first = stream.cached_info().state.first_sequence;
1460 if !crate::protocol::resume_window_ok(revision, first) {
1461 warn!(
1462 revision,
1463 first_sequence = first,
1464 ?prefixes,
1465 "resume cursor is below the stream's first retained sequence; cursor expired"
1466 );
1467 return Err(KvError::CursorExpired);
1468 }
1469
1470 let consumer = match timed(stream.create_consumer(push::OrderedConfig {
1471 deliver_subject: self.client.new_inbox(),
1472 description: Some("kv multi-prefix resume consumer".to_string()),
1473 filter_subjects,
1474 replay_policy: ReplayPolicy::Instant,
1475 deliver_policy: DeliverPolicy::ByStartSequence {
1476 start_sequence: revision + 1,
1477 },
1478 ..Default::default()
1479 }))
1480 .await?
1481 {
1482 Ok(c) => c,
1483 Err(e) => {
1484 // Same expiry classification as watch_all_from: a start sequence
1485 // the stream has compacted past surfaces as a consumer-create
1486 // error whose message names the sequence problem.
1487 let err_str = e.to_string();
1488 if is_cursor_expired_error(&err_str) {
1489 warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
1490 return Err(KvError::CursorExpired);
1491 }
1492 return Err(KvError::WatchError(err_str));
1493 }
1494 };
1495
1496 // Re-check AFTER the consumer exists (fresh stream info, not the
1497 // handle's cached copy): closes the check→create eviction window,
1498 // same as the single-filter resume paths.
1499 self.check_resume_window(revision).await?;
1500
1501 let mut messages = timed(consumer.messages())
1502 .await?
1503 .map_err(|e| KvError::WatchError(e.to_string()))?;
1504
1505 info!(
1506 revision,
1507 ?prefixes,
1508 "resumed multi-prefix watch from cursor"
1509 );
1510 while let Some(msg) = messages.next().await {
1511 match msg {
1512 Ok(msg) => {
1513 // A subject-filtered consumer only delivers in-keyspace
1514 // subjects; `None` here would be a server bug, skipped to
1515 // match kv::Watch's tolerance.
1516 let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
1517 continue;
1518 };
1519 if tx.send(update).await.is_err() {
1520 debug!("watch receiver closed");
1521 break;
1522 }
1523 }
1524 Err(e) => {
1525 error!(error = %e, "NATS KV multi-prefix watch error");
1526 return Err(KvError::WatchError(e.to_string()));
1527 }
1528 }
1529 }
1530 Ok(())
1531 }
1532}
1533
1534struct NatsKvWriterImpl {
1535 kv: Store,
1536}
1537
1538#[async_trait]
1539impl KvWriter for NatsKvWriterImpl {
1540 async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1541 let rev = timed(self.kv.put(key, value.to_vec().into()))
1542 .await?
1543 .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1544 Ok(VersionToken::from_u64(rev))
1545 }
1546
1547 async fn delete(&self, key: &str) -> Result<bool, KvError> {
1548 // NATS delete doesn't tell us if key existed, so we always return true
1549 timed(self.kv.delete(key))
1550 .await?
1551 .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1552 Ok(true)
1553 }
1554
1555 async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1556 use async_nats::jetstream::kv::CreateErrorKind;
1557 timed(self.kv.create(key, value.to_vec().into()))
1558 .await?
1559 .map(VersionToken::from_u64)
1560 .map_err(|e| {
1561 if e.kind() == CreateErrorKind::AlreadyExists {
1562 KvError::AlreadyExists
1563 } else {
1564 KvError::OperationFailed(e.to_string())
1565 }
1566 })
1567 }
1568
1569 async fn update(
1570 &self,
1571 key: &str,
1572 value: &[u8],
1573 expected: &VersionToken,
1574 ) -> Result<VersionToken, KvError> {
1575 use async_nats::jetstream::kv::UpdateErrorKind;
1576 let rev = expected.as_u64().ok_or_else(|| {
1577 KvError::OperationFailed("invalid version token for NATS update".into())
1578 })?;
1579 timed(self.kv.update(key, value.to_vec().into(), rev))
1580 .await?
1581 .map(VersionToken::from_u64)
1582 .map_err(|e| {
1583 if e.kind() == UpdateErrorKind::WrongLastRevision {
1584 KvError::RevisionMismatch
1585 } else {
1586 KvError::OperationFailed(e.to_string())
1587 }
1588 })
1589 }
1590
1591 async fn delete_with_version(
1592 &self,
1593 key: &str,
1594 expected: &VersionToken,
1595 ) -> Result<bool, KvError> {
1596 use async_nats::jetstream::kv::UpdateErrorKind;
1597 let rev = expected.as_u64().ok_or_else(|| {
1598 KvError::OperationFailed("invalid version token for NATS delete".into())
1599 })?;
1600 // Write empty value with CAS — logically deletes while preserving conflict detection
1601 timed(self.kv.update(key, Vec::new().into(), rev))
1602 .await?
1603 .map(|_| true)
1604 .map_err(|e| {
1605 if e.kind() == UpdateErrorKind::WrongLastRevision {
1606 KvError::RevisionMismatch
1607 } else {
1608 KvError::OperationFailed(e.to_string())
1609 }
1610 })
1611 }
1612}
1613
1614#[async_trait]
1615impl KvPurge for NatsKvWriterImpl {
1616 async fn purge(&self, key: &str) -> Result<(), KvError> {
1617 // Rollup purge (`Nats-Rollup: sub`): drops all prior revisions of the
1618 // subject, reclaiming bytes against `max_bytes` — unlike `delete`, which
1619 // only appends a marker. Idempotent: purging an absent key is a no-op.
1620 timed(self.kv.purge(key))
1621 .await?
1622 .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1623 Ok(())
1624 }
1625}
1626
1627impl std::fmt::Debug for NatsConnection {
1628 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1629 f.debug_struct("NatsConnection")
1630 .field("url", &self.config.url)
1631 // `Acquire` to match every other read of `healthy` — a `Relaxed`
1632 // outlier here reads like a deliberate exception during an atomics
1633 // audit, and the fmt path is far too cold for the ordering to cost
1634 // anything.
1635 .field("healthy", &self.healthy.load(Ordering::Acquire))
1636 .finish()
1637 }
1638}
1639
1640#[cfg(test)]
1641mod tests {
1642 use super::*;
1643
1644 #[test]
1645 fn raw_create_success_has_no_error() {
1646 // A successful STREAM.CREATE echoes back the stream config, no "error".
1647 let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
1648 assert_eq!(
1649 classify_raw_create_response(payload),
1650 RawCreateOutcome::Created
1651 );
1652 }
1653
1654 #[test]
1655 fn raw_create_swallows_stream_already_exists() {
1656 // 10058 = stream name already in use → the bucket already exists, OK.
1657 let payload =
1658 br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
1659 assert_eq!(
1660 classify_raw_create_response(payload),
1661 RawCreateOutcome::AlreadyExists
1662 );
1663 }
1664
1665 #[test]
1666 fn raw_create_swallows_stream_limit() {
1667 // Synadia Cloud returns 400 + "maximum number of streams" at the limit,
1668 // but the bucket may already exist — treat as non-fatal.
1669 let payload =
1670 br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
1671 assert_eq!(
1672 classify_raw_create_response(payload),
1673 RawCreateOutcome::StreamLimit
1674 );
1675 }
1676
1677 #[test]
1678 fn raw_create_propagates_unknown_error() {
1679 // Any other JetStream error is fatal and must surface code + description.
1680 let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
1681 match classify_raw_create_response(payload) {
1682 RawCreateOutcome::Failed { code, description } => {
1683 assert_eq!(code, 403);
1684 assert_eq!(description, "insufficient permissions");
1685 }
1686 other => panic!("expected Failed, got {other:?}"),
1687 }
1688 }
1689
1690 #[test]
1691 fn raw_create_400_without_stream_limit_is_fatal() {
1692 // A bare 400 that isn't the stream-limit message must NOT be swallowed,
1693 // otherwise a genuine bad-config rejection would masquerade as success.
1694 let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
1695 match classify_raw_create_response(payload) {
1696 RawCreateOutcome::Failed { code, description } => {
1697 assert_eq!(code, 400);
1698 assert!(description.contains("invalid stream config"));
1699 }
1700 other => panic!("expected Failed, got {other:?}"),
1701 }
1702 }
1703
1704 #[test]
1705 fn raw_create_unparseable_payload_is_treated_as_success() {
1706 // The caller re-verifies with get_key_value, so a garbled body must not
1707 // be reported as a hard failure here.
1708 assert_eq!(
1709 classify_raw_create_response(b"not json at all"),
1710 RawCreateOutcome::Created
1711 );
1712 }
1713
1714 #[test]
1715 fn ack_subject_legacy_format() {
1716 // $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1717 let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1718 assert_eq!(stream_sequence_from_ack(reply), Some(42));
1719 }
1720
1721 #[test]
1722 fn ack_subject_modern_format_with_domain_and_account() {
1723 // $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1724 let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
1725 assert_eq!(stream_sequence_from_ack(reply), Some(42));
1726 }
1727
1728 #[test]
1729 fn ack_subject_modern_format_with_trailing_token() {
1730 // Some servers append a random trailing token (12 tokens total).
1731 let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
1732 assert_eq!(stream_sequence_from_ack(reply), Some(99));
1733 }
1734
1735 #[test]
1736 fn ack_subject_last_token_is_not_the_sequence() {
1737 // Regression guard: the final token is num_pending, never the sequence.
1738 // The old code returned this (0), corrupting every scanned entry's version.
1739 let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1740 assert_ne!(stream_sequence_from_ack(reply), Some(0));
1741 }
1742
1743 #[test]
1744 fn ack_subject_rejects_garbage() {
1745 assert_eq!(stream_sequence_from_ack(""), None);
1746 assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
1747 assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
1748 // Right shape, non-numeric sequence field.
1749 assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
1750 }
1751
1752 #[test]
1753 fn cursor_expired_matches_known_nats_error_strings() {
1754 // These substrings come from async-nats error messages. If the library
1755 // rewrites them, watch_all_from would return WatchError instead of
1756 // CursorExpired, breaking callers that fall back to watch_all() on expiry.
1757 assert!(is_cursor_expired_error(
1758 "consumer start sequence is too old"
1759 ));
1760 assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
1761 assert!(is_cursor_expired_error("sequence not found in stream"));
1762 // "too old" on its own (no "sequence" wording) must still be caught.
1763 assert!(is_cursor_expired_error("requested revision is too old"));
1764 // Case-insensitive: a capitalization change upstream must not slip past.
1765 assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
1766 assert!(!is_cursor_expired_error("connection refused"));
1767 assert!(!is_cursor_expired_error("permission denied"));
1768 assert!(!is_cursor_expired_error("stream not found"));
1769 }
1770
1771 fn raw_kv_msg(
1772 subject: &str,
1773 reply: Option<&str>,
1774 payload: &[u8],
1775 op: Option<&str>,
1776 ) -> async_nats::Message {
1777 let headers = op.map(|op| {
1778 let mut h = async_nats::HeaderMap::new();
1779 h.insert("KV-Operation", op);
1780 h
1781 });
1782 async_nats::Message {
1783 subject: subject.to_string().into(),
1784 reply: reply.map(|r| r.to_string().into()),
1785 payload: payload.to_vec().into(),
1786 headers,
1787 status: None,
1788 description: None,
1789 length: 0,
1790 }
1791 }
1792
1793 const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1794
1795 #[test]
1796 fn kv_message_decodes_put_without_operation_header() {
1797 // The common case: a put carries no KV-Operation header at all.
1798 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
1799 match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1800 KvUpdate::Put(e) => {
1801 assert_eq!(e.key, "node.a");
1802 assert_eq!(e.value, b"v1");
1803 assert_eq!(e.version.as_u64(), Some(42));
1804 }
1805 other => panic!("expected Put, got {other:?}"),
1806 }
1807 }
1808
1809 #[test]
1810 fn kv_message_decodes_delete_and_purge_markers() {
1811 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
1812 assert!(matches!(
1813 kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1814 KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
1815 ));
1816
1817 let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
1818 assert!(matches!(
1819 kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1820 KvUpdate::Purge { ref key, .. } if key == "node.a"
1821 ));
1822 }
1823
1824 #[test]
1825 fn kv_message_outside_keyspace_is_skipped() {
1826 // A subject-filtered consumer should never deliver this; the decode
1827 // skips rather than mis-keys it.
1828 let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
1829 assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
1830 }
1831
1832 #[test]
1833 fn kv_message_without_reply_gets_unknown_version() {
1834 // No ACK reply subject → revision unparseable → the UNKNOWN token,
1835 // never a fabricated revision 0. A parseable `Some(0)` would be
1836 // adopted by watch_applied as a real batch high-water and regress
1837 // the persisted cursor to 0 (full replay on next restart); unknown
1838 // is skipped by the cursor-authority loop instead.
1839 let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
1840 match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1841 KvUpdate::Put(e) => {
1842 assert!(e.version.is_unknown());
1843 assert_eq!(e.version.as_u64(), None);
1844 }
1845 other => panic!("expected Put, got {other:?}"),
1846 }
1847 }
1848
1849 #[test]
1850 fn raw_create_already_exists_when_10058_in_code_field() {
1851 // Some Synadia Cloud deployments echo 10058 in `code` rather than
1852 // `err_code`. Both paths must return AlreadyExists, not Failed.
1853 let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
1854 assert_eq!(
1855 classify_raw_create_response(payload),
1856 RawCreateOutcome::AlreadyExists
1857 );
1858 }
1859
1860 #[test]
1861 fn raw_create_error_without_code_defaults_to_zero() {
1862 // Defensive: a malformed error object still classifies as Failed rather
1863 // than silently passing, with code defaulting to 0.
1864 let payload = br#"{"error":{"description":"mystery"}}"#;
1865 match classify_raw_create_response(payload) {
1866 RawCreateOutcome::Failed { code, description } => {
1867 assert_eq!(code, 0);
1868 assert_eq!(description, "mystery");
1869 }
1870 other => panic!("expected Failed, got {other:?}"),
1871 }
1872 }
1873}
1874
1875/// Live-server conformance tests for the floor guard
1876/// ([`stream_watch_floor_guarded`]) — these drive the guarded loop DIRECTLY
1877/// with a deliberately clamped `Watch`, which reproduces exactly the state
1878/// retention leaves behind when it overruns a live consumer (the watcher
1879/// methods' resume-time checks can't be raced deterministically from
1880/// outside, but the guarded loop neither knows nor cares how its watch got
1881/// clamped). Spawns a throwaway `nats-server` (mise-installed, same pattern
1882/// as tests/common).
1883#[cfg(test)]
1884mod floor_guard_tests {
1885 use super::*;
1886 use std::process::{Child, Command, Stdio};
1887
1888 struct TestServer {
1889 child: Child,
1890 url: String,
1891 _dir: tempfile::TempDir,
1892 }
1893
1894 impl Drop for TestServer {
1895 fn drop(&mut self) {
1896 let _ = self.child.kill();
1897 let _ = self.child.wait();
1898 }
1899 }
1900
1901 async fn start_server() -> TestServer {
1902 let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".into());
1903 let port = std::net::TcpListener::bind("127.0.0.1:0")
1904 .unwrap()
1905 .local_addr()
1906 .unwrap()
1907 .port();
1908 let dir = tempfile::tempdir().unwrap();
1909 let child = Command::new(&bin)
1910 .args([
1911 "--jetstream",
1912 "--addr",
1913 "127.0.0.1",
1914 "--port",
1915 &port.to_string(),
1916 "--store_dir",
1917 dir.path().to_str().unwrap(),
1918 ])
1919 .stdout(Stdio::null())
1920 .stderr(Stdio::null())
1921 .spawn()
1922 .unwrap_or_else(|e| panic!("spawn {bin}: {e}; run `mise install`"));
1923 let server = TestServer {
1924 child,
1925 url: format!("nats://127.0.0.1:{port}"),
1926 _dir: dir,
1927 };
1928 for _ in 0..100 {
1929 if async_nats::connect(&server.url).await.is_ok() {
1930 return server;
1931 }
1932 tokio::time::sleep(Duration::from_millis(100)).await;
1933 }
1934 panic!("nats-server never became ready");
1935 }
1936
1937 /// `(js, kv store)` with five revisions across five subjects (history 1).
1938 async fn seeded_bucket(
1939 url: &str,
1940 ) -> (
1941 async_nats::jetstream::Context,
1942 async_nats::jetstream::kv::Store,
1943 ) {
1944 let client = async_nats::connect(url).await.unwrap();
1945 let js = async_nats::jetstream::new(client);
1946 let kv = js
1947 .create_key_value(async_nats::jetstream::kv::Config {
1948 bucket: "guard".into(),
1949 history: 1,
1950 ..Default::default()
1951 })
1952 .await
1953 .unwrap();
1954 for i in 1..=5u8 {
1955 kv.put(format!("k{i}"), vec![i].into()).await.unwrap();
1956 }
1957 (js, kv)
1958 }
1959
1960 /// `KvPurge::purge` must reclaim bytes against `max_bytes` — unlike
1961 /// `delete`/`delete_with_version`, which only append markers. This is the
1962 /// in-repo twin of the "does purge actually free bytes?" gate: fill a
1963 /// bucket, purge half the keys, assert the stream's byte count drops.
1964 #[tokio::test(flavor = "multi_thread")]
1965 async fn purge_reclaims_bytes() {
1966 use crate::kv::KvPurge;
1967
1968 let server = start_server().await;
1969 let client = async_nats::connect(&server.url).await.unwrap();
1970 let js = async_nats::jetstream::new(client);
1971 let kv = js
1972 .create_key_value(async_nats::jetstream::kv::Config {
1973 bucket: "purge".into(),
1974 history: 1,
1975 ..Default::default()
1976 })
1977 .await
1978 .unwrap();
1979
1980 // Fill with sizable values across many distinct keys.
1981 let val = vec![b'x'; 4096];
1982 for i in 0..50u32 {
1983 kv.put(format!("k{i}"), val.clone().into()).await.unwrap();
1984 }
1985 let before = js
1986 .get_stream("KV_purge")
1987 .await
1988 .unwrap()
1989 .info()
1990 .await
1991 .unwrap()
1992 .state
1993 .bytes;
1994
1995 // Purge half the keys through the KvPurge impl.
1996 let writer = NatsKvWriterImpl { kv: kv.clone() };
1997 for i in 0..25u32 {
1998 writer.purge(&format!("k{i}")).await.unwrap();
1999 }
2000
2001 // Purge is a rollup: prior revisions of each subject are removed, so the
2002 // stream's byte count must fall. (A residual purge marker may remain per
2003 // subject — far smaller than the 4KiB value — so we assert a strict drop,
2004 // not zero.) Poll briefly in case the server reflects reclamation async.
2005 let mut after = before;
2006 for _ in 0..20 {
2007 after = js
2008 .get_stream("KV_purge")
2009 .await
2010 .unwrap()
2011 .info()
2012 .await
2013 .unwrap()
2014 .state
2015 .bytes;
2016 if after < before {
2017 break;
2018 }
2019 tokio::time::sleep(Duration::from_millis(100)).await;
2020 }
2021 assert!(
2022 after < before,
2023 "purge must reclaim bytes: before={before} after={after}"
2024 );
2025
2026 // Purge is idempotent: re-purging an absent key is not an error.
2027 writer.purge("k0").await.unwrap();
2028 }
2029
2030 /// TRUE POSITIVE: the watch was clamped past evicted revisions (purge
2031 /// advanced first_seq beyond the frontier) — the first gapped delivery
2032 /// must trip the guard BEFORE the entry is processed, never silently
2033 /// folding past the lost range. This is the live twin of the model's
2034 /// `GuardRepair`-only-progress gate.
2035 #[tokio::test(flavor = "multi_thread")]
2036 async fn gapped_delivery_with_advanced_floor_trips() {
2037 let server = start_server().await;
2038 let (js, kv) = seeded_bucket(&server.url).await;
2039
2040 // Evict revisions 1-3 outright: first_sequence becomes 4.
2041 let mut stream = js.get_stream("KV_guard").await.unwrap();
2042 stream.purge().sequence(4).await.unwrap();
2043 assert_eq!(stream.info().await.unwrap().state.first_sequence, 4);
2044
2045 // A consumer resuming from revision 1 gets CLAMPED to revision 4
2046 // (NATS's silent skip, pinned by tests/resync.rs). Hand that watch
2047 // to the guarded loop as a live consumer whose retention just
2048 // overran it.
2049 let watch = kv.watch_all_from_revision(2).await.unwrap();
2050 let (tx, mut rx) = tokio::sync::mpsc::channel(64);
2051 let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
2052
2053 // The 5s bound pins IN-BAND detection: the trip must come from the
2054 // gapped-delivery check itself, not the 30s no-traffic backstop. A
2055 // regression to periodic-only detection (the design the model
2056 // checker rejected — catch-up between probes erases the evidence)
2057 // fails this bound.
2058 let err = tokio::time::timeout(
2059 Duration::from_secs(5),
2060 stream_watch_floor_guarded(watch, &tx, 1, &js, "guard"),
2061 )
2062 .await
2063 .expect("the trip must be IN-BAND (immediate), not backstop-paced")
2064 .expect_err("a gapped delivery over an advanced floor must trip");
2065 assert!(
2066 err.to_string().contains("retention overran live watch"),
2067 "{err}"
2068 );
2069 drop(tx);
2070 let _ = drain.await;
2071 }
2072
2073 /// NO FALSE POSITIVE: interior (per-subject) eviction also gaps the
2074 /// delivered revisions, but the floor stays at or below the frontier —
2075 /// benign for a last-write-wins fold, and the guard must let it
2076 /// through. (Every existing bootstrap e2e also rides this path on its
2077 /// resume; this pins the discrimination explicitly.)
2078 #[tokio::test(flavor = "multi_thread")]
2079 async fn benign_interior_gap_passes() {
2080 let server = start_server().await;
2081 let (js, kv) = seeded_bucket(&server.url).await;
2082
2083 // Overwrite k2 and k3: revisions 2 and 3 are interior-evicted
2084 // (history 1), revisions 6 and 7 replace them. first_sequence stays
2085 // 1 (k1's revision is retained).
2086 kv.put("k2", vec![22].into()).await.unwrap();
2087 kv.put("k3", vec![33].into()).await.unwrap();
2088 let mut stream = js.get_stream("KV_guard").await.unwrap();
2089 assert_eq!(stream.info().await.unwrap().state.first_sequence, 1);
2090
2091 // Resume from revision 1: deliveries jump 2 and 3 — gapped, benign.
2092 let watch = kv.watch_all_from_revision(2).await.unwrap();
2093 let (tx, mut rx) = tokio::sync::mpsc::channel(64);
2094 let guard =
2095 tokio::spawn(
2096 async move { stream_watch_floor_guarded(watch, &tx, 1, &js, "guard").await },
2097 );
2098
2099 let mut got = Vec::new();
2100 while got.len() < 4 {
2101 let update = tokio::time::timeout(Duration::from_secs(5), rx.recv())
2102 .await
2103 .expect("deliveries continue past benign gaps")
2104 .expect("watch alive");
2105 got.push(update.version().as_u64().unwrap());
2106 }
2107 assert_eq!(got, vec![4, 5, 6, 7], "interior gaps jumped, tail dense");
2108 guard.abort(); // endless live watch; the assertion above is the test
2109 }
2110}