Skip to main content

slipstream/
nats.rs

1use async_nats::jetstream::kv::Store;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, Ordering},
7};
8use std::time::Duration;
9use tokio::sync::{RwLock, mpsc::Sender};
10use tracing::{debug, error, info, warn};
11
12use crate::kv::{
13    KvEntry, KvError, KvPurge, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
14};
15use crate::stores::{Connection, ConnectionCapabilities, DiscardPolicy, KvStore, StoreConfig};
16
17/// Default per-operation timeout for NATS KV ops. async-nats's request/response
18/// futures don't fail in-flight requests when the underlying TCP connection
19/// goes half-dead (CLOSE_WAIT) — they just await forever. Without a timeout
20/// here, ANY hung NATS connection translates into a tokio runtime deadlock as
21/// soon as enough callers queue behind the dead connection. 30s is generous
22/// for legitimate slow ops (cold JetStream stream sync, leader election under
23/// load) and short enough that a dead connection recovers within reasonable
24/// human-debug latency.
25const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
26
27/// Server-side inactivity reaper for the ephemeral consumers `scan()`/`keys()`
28/// create. Our code deletes each consumer explicitly when the drain finishes,
29/// but that delete is best-effort: on a half-dead (CLOSE_WAIT) connection it
30/// times out, orphaning the consumer server-side where it counts against the
31/// per-stream consumer limit. Setting `inactive_threshold` makes JetStream reap
32/// any consumer that sees no activity for this long, so a failed explicit delete
33/// self-heals instead of accumulating until the limit is hit. Comfortably longer
34/// than [`KV_OP_TIMEOUT`] so it never reaps a legitimately slow in-flight drain
35/// (each delivery resets the inactivity timer).
36const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
37
38/// Run a future under [`KV_OP_TIMEOUT`], returning [`KvError::Timeout`] if it
39/// doesn't complete in time. Preserves the inner future's `Result` so callers
40/// keep their existing error-mapping logic.
41async fn timed<F, T>(fut: F) -> Result<T, KvError>
42where
43    F: std::future::Future<Output = T>,
44{
45    tokio::time::timeout(KV_OP_TIMEOUT, fut)
46        .await
47        .map_err(|_| KvError::Timeout)
48}
49
50/// Build NATS connect options (with auth applied) and resolve the URL to dial.
51///
52/// Split out from [`nats_connect`] so the connection lifecycle can attach an
53/// event callback (for health tracking) to the *same* options before dialing,
54/// without duplicating the auth-priority logic. Returns the options plus the URL
55/// to connect to (which may differ from `url` when credentials are stripped out
56/// of a `user:pass@host` URL).
57///
58/// Auth priority (first match wins):
59/// 1. Inline credentials (base64-encoded .creds content)
60/// 2. Credentials file (if provided)
61/// 3. URL-embedded credentials (user:pass@host)
62/// 4. No authentication
63async fn build_connect_options(
64    url: &str,
65    creds: Option<&str>,
66    creds_file: Option<&str>,
67) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
68    // Priority 1: Inline credentials (base64-encoded)
69    if let Some(encoded) = creds {
70        use base64::Engine;
71        let decoded = base64::engine::general_purpose::STANDARD
72            .decode(encoded)
73            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
74        let content = String::from_utf8(decoded)
75            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
76        return Ok((
77            async_nats::ConnectOptions::with_credentials(&content)?,
78            url.to_string(),
79        ));
80    }
81
82    // Priority 2: Credentials file
83    if let Some(path) = creds_file {
84        return Ok((
85            async_nats::ConnectOptions::with_credentials_file(path).await?,
86            url.to_string(),
87        ));
88    }
89
90    // Priority 3: URL-embedded credentials
91    if let Ok(parsed) = url::Url::parse(url)
92        && !parsed.username().is_empty()
93    {
94        let user = parsed.username().to_string();
95        let pass = parsed.password().unwrap_or("").to_string();
96        // Rebuild URL without credentials. If the scheme doesn't support
97        // userinfo, these calls fail and the credentials would remain embedded
98        // in the URL we later log — warn loudly rather than silently leak them.
99        let mut clean_url = parsed.clone();
100        if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
101            warn!("could not strip credentials from NATS URL; they may appear in logs");
102        }
103        return Ok((
104            async_nats::ConnectOptions::with_user_and_password(user, pass),
105            clean_url.as_str().to_string(),
106        ));
107    }
108
109    // Priority 4: No authentication
110    Ok((async_nats::ConnectOptions::new(), url.to_string()))
111}
112
113/// Connect to NATS with various authentication methods.
114///
115/// Supports the auth-priority order documented on [`build_connect_options`].
116/// This is the standalone helper; the [`Connection`] impl builds options the
117/// same way but also installs a health-tracking event callback.
118pub async fn nats_connect(
119    url: &str,
120    creds: Option<&str>,
121    creds_file: Option<&str>,
122) -> Result<async_nats::Client, async_nats::ConnectError> {
123    let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
124    opts.connect(dial_url).await
125}
126
127/// Render an untrusted server payload for logging: borrowed as-is when valid
128/// UTF-8, lowercase hex otherwise. `from_utf8_lossy` would mash every invalid
129/// byte into U+FFFD — exactly the bytes an incident needs to see — so the
130/// fallback preserves them losslessly instead.
131fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
132    match std::str::from_utf8(payload) {
133        Ok(s) => std::borrow::Cow::Borrowed(s),
134        Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
135    }
136}
137
138/// Configuration for NATS connection.
139///
140/// `Debug` is hand-written, not derived: `creds` holds decoded credential
141/// material and `creds_file` a filesystem path to secrets. A derived `Debug`
142/// would print both verbatim the moment anyone `{:?}`-formats the config (a
143/// `tracing` span, an error context, a test dump), leaking credentials into
144/// logs. The redacting impl below keeps that from being one careless format
145/// string away.
146#[derive(Clone)]
147pub struct NatsConnectionConfig {
148    pub url: String,
149    /// Base64-encoded .creds file content (for ECS / containerized environments).
150    pub creds: Option<String>,
151    /// Path to .creds file on disk (for bare-metal / local development).
152    pub creds_file: Option<String>,
153}
154
155impl std::fmt::Debug for NatsConnectionConfig {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("NatsConnectionConfig")
158            .field("url", &self.url)
159            // Presence, never content: enough to debug "are creds set?" without
160            // ever rendering the secret itself. The same applies to `creds_file`:
161            // a path like `/run/secrets/prod.creds` leaks the secrets layout into
162            // logs/traces, so redact it to presence too.
163            .field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
164            .field(
165                "creds_file",
166                &self.creds_file.as_ref().map(|_| "[redacted]"),
167            )
168            .finish()
169    }
170}
171
172/// Create a KV bucket using raw JetStream API (bypasses async-nats response parsing issues).
173///
174/// Synadia Cloud returns responses that `async_nats` can't parse. This function
175/// uses the raw JetStream API directly, bypassing the client's response deserialization.
176///
177/// `pub(crate)`: this is an internal Synadia Cloud workaround invoked by
178/// `get_or_create_bucket`, not a stable entry point. Exposing it would pin a
179/// vendor quirk into the crate's semver surface.
180pub(crate) async fn create_kv_bucket_raw(
181    client: &async_nats::Client,
182    bucket: &str,
183    max_bytes: i64,
184    history: i64,
185    max_age_nanos: i64,
186    discard: DiscardPolicy,
187    num_replicas: usize,
188) -> Result<(), KvError> {
189    let stream_name = format!("KV_{}", bucket);
190    let subject = format!("$KV.{}.>", bucket);
191
192    // JetStream stream config for KV bucket
193    let config = serde_json::json!({
194        "name": stream_name,
195        "subjects": [subject],
196        "max_msgs_per_subject": history,
197        "max_bytes": max_bytes,
198        "max_age": max_age_nanos,
199        "storage": "file",
200        "allow_rollup_hdrs": true,
201        "deny_delete": false,
202        "deny_purge": false,
203        "allow_direct": true,
204        "discard": discard.as_nats(),
205        "num_replicas": num_replicas,
206        "retention": "limits"
207    });
208
209    let payload = serde_json::to_vec(&config)
210        .map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
211
212    let response = client
213        .request(
214            format!("$JS.API.STREAM.CREATE.{}", stream_name),
215            payload.into(),
216        )
217        .await
218        .map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
219
220    debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
221
222    match classify_raw_create_response(&response.payload) {
223        RawCreateOutcome::AlreadyExists => {
224            info!(bucket, "bucket already exists");
225            Ok(())
226        }
227        RawCreateOutcome::StreamLimit => {
228            info!(bucket, "stream limit reached, bucket may already exist");
229            Ok(())
230        }
231        RawCreateOutcome::Created => {
232            info!(bucket, "bucket created successfully via raw API");
233            Ok(())
234        }
235        RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
236            "JetStream error {}: {}",
237            code, description
238        ))),
239    }
240}
241
242/// Classification of a raw `$JS.API.STREAM.CREATE` response payload.
243///
244/// Separated from the I/O in [`create_kv_bucket_raw`] so the Synadia Cloud
245/// error-code logic — the reason this raw path exists — is unit-testable
246/// without a live NATS server.
247#[derive(Debug, PartialEq, Eq)]
248enum RawCreateOutcome {
249    /// No error in the response: the bucket was created.
250    Created,
251    /// `10058` — stream name already in use; the bucket exists. Non-fatal.
252    AlreadyExists,
253    /// `400` "maximum number of streams"; Synadia Cloud returns this at the
254    /// stream limit even when the bucket already exists. Non-fatal.
255    StreamLimit,
256    /// Any other JetStream error — fatal.
257    Failed { code: i64, description: String },
258}
259
260fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
261    // Unparseable payloads are treated as success: the caller re-verifies the
262    // bucket with a follow-up `get_key_value`, so a garbled body here does not
263    // mask a real failure. Warn so the fallback assumption is auditable — if the
264    // re-verify step is ever refactored away, this log is the breadcrumb.
265    //
266    // INVARIANT: this `Created`-on-garbage path is only sound because every
267    // caller re-verifies the bucket exists after `create_kv_bucket_raw` returns
268    // Ok. The sole caller — `get_or_create_bucket` — does so via the
269    // `timed(js.get_key_value(...))` immediately after the raw-create fallback.
270    // Do not remove that re-verify without making this path return `Failed`.
271    let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
272        warn!(
273            response = %payload_for_log(payload),
274            "unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
275        );
276        return RawCreateOutcome::Created;
277    };
278
279    let Some(err) = json.get("error") else {
280        return RawCreateOutcome::Created;
281    };
282
283    // JetStream splits its error codes: `code` is the HTTP-style status (400,
284    // 404, 500) while `err_code` carries the granular code (e.g. 10058). The
285    // already-exists code can surface in either field depending on the server
286    // (standard NATS puts 10058 in `err_code` with `code` = 400; some managed
287    // deployments echo it in `code`), so we accept it in either.
288    let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
289    let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
290    let description = err
291        .get("description")
292        .and_then(|d| d.as_str())
293        .unwrap_or("unknown error");
294
295    // 10058 = stream name already in use (bucket exists) - that's OK
296    if code == 10058 || err_code == 10058 {
297        return RawCreateOutcome::AlreadyExists;
298    }
299
300    // 400 "maximum number of streams reached" may also mean bucket exists
301    // (Synadia Cloud returns this when at stream limit but bucket exists)
302    if code == 400 && description.contains("maximum number of streams") {
303        return RawCreateOutcome::StreamLimit;
304    }
305
306    RawCreateOutcome::Failed {
307        code,
308        description: description.to_string(),
309    }
310}
311
312struct NatsHandle {
313    // Held to keep the NATS connection alive for the lifetime of the handle.
314    // The `jetstream` context clones an internal reference, but this field is
315    // the authoritative owner — dropping the handle drops the connection.
316    // Also read by `store_with_config`, which clones it out from under the
317    // handle lock.
318    client: async_nats::Client,
319    jetstream: async_nats::jetstream::Context,
320}
321
322/// NATS JetStream KV connection.
323pub struct NatsConnection {
324    config: NatsConnectionConfig,
325    handle: RwLock<Option<NatsHandle>>,
326    // Shared with the installed client's event callback so `is_healthy()`
327    // tracks real connection state (Connected/Disconnected) rather than staying
328    // pinned at its connect-time value. `Arc` because the callback outlives this
329    // struct's borrow — it runs on the client's event-loop task.
330    healthy: Arc<AtomicBool>,
331    // Set only for connections built via `from_client`, where no health-tracking
332    // event callback could be installed (the client was already connected).
333    // `is_healthy()` consults this client's live `connection_state()` instead of
334    // the callback-driven `healthy` flag. `None` for the `new()` + `connect()`
335    // path, whose flag is kept current by the installed event callback.
336    //
337    // `Some(_)` is also the marker that this connection borrows a caller-owned
338    // client: it carries no URL or credentials of its own (see `from_client`),
339    // so it cannot redial. `connect()` refuses to reconnect such a connection
340    // rather than dialing the empty config URL and surfacing a cryptic error.
341    state_probe: Option<async_nats::Client>,
342}
343
344impl NatsConnection {
345    pub fn new(config: NatsConnectionConfig) -> Self {
346        Self {
347            config,
348            handle: RwLock::new(None),
349            healthy: Arc::new(AtomicBool::new(false)),
350            state_probe: None,
351        }
352    }
353
354    /// Create a NatsConnection from an existing NATS client.
355    ///
356    /// This is useful when the caller already has a NATS connection and wants
357    /// to reuse it for KV stores instead of creating a new connection.
358    pub fn from_client(client: async_nats::Client) -> Self {
359        let jetstream = async_nats::jetstream::new(client.clone());
360        let config = NatsConnectionConfig {
361            url: String::new(), // Not used when pre-connected
362            creds: None,
363            creds_file: None,
364        };
365
366        // Clone a probe handle before the client moves into `NatsHandle`.
367        // `async_nats::Client` is cheap to clone (internally an `Arc`), and
368        // `connection_state()` just reads a watch channel — no I/O.
369        let state_probe = Some(client.clone());
370        let handle = NatsHandle { client, jetstream };
371
372        Self {
373            config,
374            handle: RwLock::new(Some(handle)),
375            // A pre-connected client carries no health-tracking callback (we
376            // didn't build its options), so `is_healthy()` reads the client's
377            // live `connection_state()` via `state_probe`. The flag below only
378            // gates explicit `shutdown()`.
379            healthy: Arc::new(AtomicBool::new(true)),
380            state_probe,
381        }
382    }
383
384    async fn get_or_create_bucket(
385        client: &async_nats::Client,
386        js: &async_nats::jetstream::Context,
387        config: &StoreConfig,
388    ) -> Result<Store, KvError> {
389        // Try to get existing bucket first. Bound the call so a slow/dead
390        // NATS connection at startup can't park the daemon's init thread
391        // forever — the rest of startup (HTTP listener bind, etc.) happens
392        // after this. Without the timeout, a single bad NATS round-trip
393        // here held HTTP bind for 30s+ in observed cases.
394        //
395        // A failure here (permission denied, JetStream disabled, timeout) is not
396        // necessarily fatal — the bucket may simply not exist yet, so we fall
397        // through to create. But surface the original error first: otherwise a
398        // later create failure (e.g. "permission denied on STREAM.CREATE") masks
399        // the real cause ("permission denied on STREAM.INFO") and makes the
400        // failure undebuggable under load.
401        match timed(js.get_key_value(&config.name)).await {
402            Ok(Ok(kv)) => return Ok(kv),
403            Ok(Err(e)) => {
404                debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
405            }
406            Err(_) => {
407                warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
408            }
409        }
410
411        // Bucket doesn't exist, create it
412        let mut kv_config = async_nats::jetstream::kv::Config {
413            bucket: config.name.clone(),
414            num_replicas: config.num_replicas.unwrap_or(1),
415            ..Default::default()
416        };
417
418        // Apply max_age (bucket-level TTL) if specified. `as_nanos()` is u128;
419        // saturate to i64::MAX rather than `as i64`, which would silently wrap a
420        // >292-year duration into a negative (and thus meaningless) TTL.
421        let max_age_nanos = if let Some(max_age) = config.max_age {
422            kv_config.max_age = max_age;
423            i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
424        } else {
425            0
426        };
427
428        // Apply max_history if specified. `i64::from` is lossless for a u32 and
429        // states the widening intent, where `as i64` would quietly mask a future
430        // type change that no longer fits.
431        let history = if let Some(history) = config.max_history {
432            let history = i64::from(history);
433            kv_config.history = history;
434            history
435        } else {
436            1
437        };
438
439        // Apply max_bytes if specified (required by Synadia Cloud)
440        let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); // Default 10MB for Synadia Cloud
441        kv_config.max_bytes = max_bytes;
442
443        // Try normal create first, fall back to raw API if it fails (Synadia Cloud compatibility).
444        //
445        // A TIMEOUT also takes the fallback, not an early return: the raw path
446        // exists for Synadia Cloud, which is the deployment most likely to be
447        // slow or distant — `?`-propagating the timeout here would skip the
448        // exact workaround built for it. If the connection is genuinely dead,
449        // the raw path's own `timed()` bounds surface that promptly anyway.
450        //
451        // EXCEPTION: `discard:old` cannot go through the normal path —
452        // async-nats' `kv::Config` exposes no discard field, so `create_key_value`
453        // would silently produce a `discard:new` bucket. Skip straight to the raw
454        // stream API (the only place the policy is expressible) for that case.
455        if config.discard == DiscardPolicy::New {
456            match timed(js.create_key_value(kv_config)).await {
457                Ok(Ok(kv)) => return Ok(kv),
458                Ok(Err(e)) => {
459                    warn!(
460                        bucket = config.name,
461                        error = ?e,
462                        "create_key_value failed, trying raw JetStream API"
463                    );
464                }
465                Err(_) => {
466                    warn!(
467                        bucket = config.name,
468                        timeout = ?KV_OP_TIMEOUT,
469                        "create_key_value timed out, trying raw JetStream API"
470                    );
471                }
472            }
473        }
474
475        // Raw JetStream API: the fallback for `discard:new`, and the ONLY path for
476        // `discard:old`.
477        create_kv_bucket_raw(
478            client,
479            &config.name,
480            max_bytes,
481            history,
482            max_age_nanos,
483            config.discard,
484            config.num_replicas.unwrap_or(1),
485        )
486        .await?;
487
488        // Re-verify the bucket exists. This upholds the INVARIANT in
489        // `classify_raw_create_response`: the raw path reports `Created`
490        // on an unparseable response, so this round-trip is what actually
491        // confirms the bucket — do not remove it.
492        timed(js.get_key_value(&config.name)).await?.map_err(|e| {
493            error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
494            KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
495        })
496    }
497}
498
499#[async_trait]
500impl Connection for NatsConnection {
501    async fn connect(&self) -> Result<(), KvError> {
502        // Fast path: skip if already connected.
503        if self.healthy.load(Ordering::Acquire) {
504            return Ok(());
505        }
506
507        // A `from_client` connection borrows a caller-owned client and kept no
508        // URL or credentials, so it cannot redial. Refuse here with an actionable
509        // message instead of dialing the empty config URL (which fails with an
510        // opaque parse/connect error). This is reachable only after `shutdown()`
511        // cleared the fast-path flag above — a live borrowed client short-circuits
512        // there. The caller must construct a `NatsConnection::new(config)` if it
513        // needs reconnect semantics.
514        if self.state_probe.is_some() {
515            return Err(KvError::ConnectionFailed(
516                "connection was built via NatsConnection::from_client and cannot \
517                 reconnect (no URL or credentials retained); construct \
518                 NatsConnection::new(config) for a reconnectable connection"
519                    .to_string(),
520            ));
521        }
522
523        let (opts, dial_url) = build_connect_options(
524            &self.config.url,
525            self.config.creds.as_deref(),
526            self.config.creds_file.as_deref(),
527        )
528        .await
529        .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
530
531        // Drive `healthy` from the client's own connection events so it reflects
532        // reality through async-nats's transparent reconnects — without this the
533        // flag stays `true` straight through a NATS outage, and a readiness probe
534        // built on `is_healthy()` keeps routing traffic to a node that can't
535        // reach NATS.
536        //
537        // `installed` gates the callback: a caller that loses the connect race
538        // (see the double-check below) tears down its freshly built client, and
539        // that teardown fires `Disconnected`. Without the gate, the loser's drop
540        // would clobber the *winner's* `healthy` flag. Only the client we
541        // actually install ever flips `installed` to `true`, so the losers'
542        // callbacks are inert.
543        let installed = Arc::new(AtomicBool::new(false));
544        let cb_healthy = Arc::clone(&self.healthy);
545        let cb_installed = Arc::clone(&installed);
546        let opts = opts.event_callback(move |event| {
547            let cb_healthy = Arc::clone(&cb_healthy);
548            let cb_installed = Arc::clone(&cb_installed);
549            async move {
550                if !cb_installed.load(Ordering::Acquire) {
551                    return;
552                }
553                match event {
554                    async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
555                    async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
556                    _ => {}
557                }
558            }
559        });
560
561        let client = opts
562            .connect(dial_url)
563            .await
564            .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
565
566        let jetstream = async_nats::jetstream::new(client.clone());
567
568        let conn = NatsHandle { client, jetstream };
569
570        // Re-check under the write lock: a concurrent caller may have connected
571        // while we were awaiting the dial. If so, drop our freshly built handle
572        // (closing its connection) instead of replacing the live one, which would
573        // orphan a connection the first caller still believes is installed.
574        // Leaving `installed = false` keeps our about-to-drop client's teardown
575        // events from touching `healthy`.
576        let mut handle = self.handle.write().await;
577        if handle.is_some() {
578            return Ok(());
579        }
580        installed.store(true, Ordering::Release);
581        *handle = Some(conn);
582        self.healthy.store(true, Ordering::Release);
583
584        Ok(())
585    }
586
587    async fn shutdown(&self) -> Result<(), KvError> {
588        self.healthy.store(false, Ordering::Release);
589        *self.handle.write().await = None;
590        Ok(())
591    }
592
593    fn is_healthy(&self) -> bool {
594        // `healthy` is the shutdown gate for both construction paths: once
595        // `shutdown()` clears it the connection is down regardless of socket
596        // state, so check it first.
597        if !self.healthy.load(Ordering::Acquire) {
598            return false;
599        }
600        match &self.state_probe {
601            // `from_client`: no event callback could be installed, so consult the
602            // client's live connection state instead of a stale connect-time
603            // value. A dead or reconnecting socket reports Pending/Disconnected,
604            // so a readiness probe correctly sees the node as unhealthy. A
605            // borrowed client is never replaced (connect() refuses to reconnect
606            // it), so this probe never goes stale.
607            Some(client) => matches!(
608                client.connection_state(),
609                async_nats::connection::State::Connected
610            ),
611            // `new()` + `connect()`: `healthy` is kept current by the installed
612            // Connected/Disconnected event callback.
613            None => true,
614        }
615    }
616
617    async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
618        let config = StoreConfig {
619            name: name.to_string(),
620            ..Default::default()
621        };
622        self.store_with_config(config).await
623    }
624
625    async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
626        // Clone the client/jetstream out from under the read lock before the
627        // (up to 60s) bucket get-or-create. Holding the read guard across that
628        // await would block `shutdown()`'s `write().await` for the full
629        // duration, stalling graceful shutdown behind an in-flight store call.
630        let (client, js) = {
631            let conn = self.handle.read().await;
632            let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
633            (conn.client.clone(), conn.jetstream.clone())
634        };
635
636        let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
637
638        Ok(Arc::new(NatsKvStore {
639            name: config.name,
640            client,
641            js,
642            kv,
643        }))
644    }
645
646    fn capabilities(&self) -> ConnectionCapabilities {
647        ConnectionCapabilities {
648            streaming_watch: true,
649            prefix_watch: true,
650            // `KvTtl` is not implemented for the NATS backend yet (only `KvWriter`
651            // is vended by `writer()`), so advertising `ttl: true` would lead
652            // callers that branch on this flag down a path that can never
653            // succeed. Flip to `true` together with the `KvTtl` impl.
654            ttl: false,
655            // Byte-reclaiming purge IS implemented for NATS (rollup delete) and
656            // vended via `KvStore::purge_writer`.
657            purge: true,
658            cas: true,
659            transactions: false,
660            // 0 = unlimited from this layer's perspective: we impose no cap, but
661            // the NATS server still enforces its own max payload (~1MB by
662            // default). Callers that branch on this must not read 0 as "any size
663            // is safe" — an oversized value is rejected server-side at write time.
664            max_value_size: 0,
665            global_ordering: false,
666        }
667    }
668}
669
670struct NatsKvStore {
671    name: String,
672    kv: Store,
673    client: async_nats::Client,
674    js: async_nats::jetstream::Context,
675}
676
677impl KvStore for NatsKvStore {
678    fn name(&self) -> &str {
679        &self.name
680    }
681
682    fn reader(&self) -> Arc<dyn KvReader> {
683        Arc::new(NatsKvReader {
684            kv: self.kv.clone(),
685            client: self.client.clone(),
686            js: self.js.clone(),
687            bucket: self.name.clone(),
688        })
689    }
690
691    fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
692        Some(Arc::new(NatsKvWatcher {
693            kv: self.kv.clone(),
694            client: self.client.clone(),
695            js: self.js.clone(),
696            bucket: self.name.clone(),
697        }))
698    }
699
700    fn writer(&self) -> Option<Arc<dyn KvWriter>> {
701        Some(Arc::new(NatsKvWriterImpl {
702            kv: self.kv.clone(),
703        }))
704    }
705
706    fn purge_writer(&self) -> Option<Arc<dyn KvPurge>> {
707        Some(Arc::new(NatsKvWriterImpl {
708            kv: self.kv.clone(),
709        }))
710    }
711}
712
713struct NatsKvReader {
714    kv: Store,
715    client: async_nats::Client,
716    js: async_nats::jetstream::Context,
717    // The bucket name is known at construction (it's the store's name), so
718    // `consume_last_per_subject` builds its subject filters from this field
719    // instead of issuing a `kv.status()` round-trip per `scan()`/`keys()` call
720    // just to read it back from the server.
721    bucket: String,
722}
723
724#[async_trait]
725impl KvReader for NatsKvReader {
726    async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
727        // Empty value → treat as absent. This unifies a real stored `b""` and a
728        // `delete_with_version` tombstone (empty-value Put) under one "absent =
729        // None" contract, consistent with `scan()`/`keys()`. Callers needing
730        // zero-length semantics use `entry()`. See the `KvReader::get` trait doc.
731        match self.entry(key).await? {
732            Some(entry) if entry.value.is_empty() => Ok(None),
733            other => Ok(other),
734        }
735    }
736
737    async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
738        use async_nats::jetstream::kv::Operation;
739        // Use entry() instead of get() to access revision.
740        // Return Put entries even with empty values — delete_with_version
741        // writes empty bytes as a tombstone and callers need the version
742        // for CAS conflict detection. Only filter real Delete/Purge markers.
743        match timed(self.kv.entry(key)).await? {
744            Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
745                key: key.to_string(),
746                value: entry.value.to_vec(),
747                version: VersionToken::from_u64(entry.revision),
748            })),
749            Ok(Some(_)) => Ok(None), // Delete/Purge marker
750            Ok(None) => Ok(None),
751            Err(e) => Err(KvError::OperationFailed(e.to_string())),
752        }
753    }
754
755    async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
756        debug!(prefix = %prefix, "listing keys with prefix");
757
758        let mut keys = Vec::new();
759        self.consume_last_per_subject(prefix, true, |msg, key| {
760            // Skip both real KV deletes and CAS tombstones (empty-value Puts
761            // written by delete_with_version). get()/scan() hide the latter, so
762            // keys() must too — otherwise a list-then-get returns phantom keys.
763            // With headers_only the payload is stripped, but NATS adds a
764            // `Nats-Msg-Size` header we use to detect the empty value.
765            if !is_kv_delete(&msg) && !is_empty_value(&msg) {
766                keys.push(key);
767            }
768        })
769        .await?;
770
771        debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
772        Ok(keys)
773    }
774
775    async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
776        let mut entries = Vec::new();
777        self.consume_last_per_subject(prefix, false, |msg, key| {
778            if !is_kv_delete(&msg) && !msg.payload.is_empty() {
779                // The KV revision is the stream sequence, carried in the JetStream
780                // ACK subject (the message's reply subject). A revision of 0 means
781                // we couldn't parse it; callers treat that as "unknown version".
782                let revision = msg
783                    .reply
784                    .as_deref()
785                    .and_then(stream_sequence_from_ack)
786                    .unwrap_or(0);
787
788                entries.push(KvEntry {
789                    key,
790                    value: msg.payload.to_vec(),
791                    version: VersionToken::from_u64(revision),
792                });
793            }
794        })
795        .await?;
796
797        debug!(prefix = %prefix, entries = entries.len(), "scan complete");
798        Ok(entries)
799    }
800}
801
802/// Extract the stream sequence (== KV revision) from a JetStream ACK subject.
803///
804/// The ACK subject — delivered as a push message's reply subject — comes in two
805/// shapes, and the stream sequence sits at different offsets in each:
806///
807/// ```text
808/// legacy (9 tokens):  $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
809/// modern (11–12):     $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
810/// ```
811///
812/// The previous implementation took the *last* token, which is `num_pending`
813/// (typically 0 on the final delivery), not the sequence — corrupting the
814/// version on every scanned entry. We instead parse from the front, accounting
815/// for the optional `<domain>.<account>` prefix that modern servers prepend.
816fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
817    // The stream-seq field sits at index 5 (legacy) or 7 (modern), so we only
818    // ever read the first 8 tokens. Keep those in a stack array and count the
819    // remainder with the iterator — no heap `Vec`, which on a large `scan()`
820    // would be one allocation per delivered message.
821    let mut head = [""; 8];
822    let mut count = 0usize;
823    for (i, token) in reply.split('.').enumerate() {
824        if i < head.len() {
825            head[i] = token;
826        }
827        count += 1;
828    }
829    if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
830        return None;
831    }
832    // Legacy form has exactly 9 tokens with no domain/account; anything longer
833    // carries the two-token `<domain>.<account>` prefix, shifting fields right.
834    let stream_seq_idx = if count == 9 { 5 } else { 7 };
835    head[stream_seq_idx].parse::<u64>().ok()
836}
837
838/// Check if a NATS message represents a KV delete/purge operation.
839fn is_kv_delete(msg: &async_nats::Message) -> bool {
840    msg.headers
841        .as_ref()
842        .and_then(|h| h.get("KV-Operation"))
843        .is_some()
844}
845
846/// Check if a `headers_only` delivery carries an empty value (a CAS tombstone
847/// written by `delete_with_version`).
848///
849/// When a consumer is created with `headers_only`, NATS strips the body and adds
850/// a `Nats-Msg-Size` header with the original payload length. Size 0 means the
851/// stored value is empty, which `get()`/`scan()` treat as absent. Messages
852/// without the header (e.g. non-`headers_only` deliveries) are not classified as
853/// empty here — callers on that path inspect the payload directly instead.
854fn is_empty_value(msg: &async_nats::Message) -> bool {
855    msg.headers
856        .as_ref()
857        .and_then(|h| h.get("Nats-Msg-Size"))
858        .map(|v| v.as_str() == "0")
859        .unwrap_or(false)
860}
861
862impl NatsKvReader {
863    /// Subscribe to last-per-subject messages for a KV prefix, calling `on_msg`
864    /// for each delivered message. Handles the subscribe-first race workaround,
865    /// consumer lifecycle, and cleanup.
866    async fn consume_last_per_subject(
867        &self,
868        prefix: &str,
869        headers_only: bool,
870        mut on_msg: impl FnMut(async_nats::Message, String),
871    ) -> Result<(), KvError> {
872        use async_nats::jetstream::consumer::push;
873        use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
874
875        // The bucket name is known at construction, so the subject filters are
876        // built directly from `self.bucket` — no `kv.status()` round-trip just
877        // to read it back. Every *remaining* setup await below is still bounded
878        // by `timed()`: a half-dead NATS connection (CLOSE_WAIT) would otherwise
879        // park here before the per-message drain timer downstream ever starts,
880        // hanging scan()/keys() indefinitely — the same failure `timed()` guards
881        // on the write path.
882        let bucket = self.bucket.as_str();
883
884        let nats_filter = if prefix.is_empty() {
885            format!("$KV.{bucket}.>")
886        } else {
887            format!("$KV.{bucket}.{prefix}>")
888        };
889
890        // Work around async-nats <=0.46 subscribe-after-create race:
891        // subscribe to the inbox FIRST, then create the consumer.
892        let inbox = self.client.new_inbox();
893        let mut sub = timed(self.client.subscribe(inbox.clone()))
894            .await?
895            .map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
896
897        let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
898            .await?
899            .map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
900
901        let consumer = timed(stream.create_consumer(push::Config {
902            deliver_subject: inbox,
903            deliver_policy: DeliverPolicy::LastPerSubject,
904            filter_subject: nats_filter,
905            headers_only,
906            // This is a one-shot point-in-time drain — we never ack. Under
907            // the default `AckPolicy::Explicit`, JetStream stops delivering
908            // once `max_ack_pending` (default 1000) messages sit unacked,
909            // which would silently truncate scan()/keys() to the first ~1000
910            // keys (or stall waiting for deliveries that never come) on any
911            // larger bucket. `None` removes the ack-pending gate entirely.
912            ack_policy: AckPolicy::None,
913            // Safety net for the best-effort `delete_consumer` below: if that
914            // cleanup times out on a half-dead connection, JetStream still reaps
915            // this consumer after `CONSUMER_INACTIVE_THRESHOLD` of inactivity, so
916            // repeated timed-out scans can't pile orphaned consumers up against
917            // the per-stream limit.
918            inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
919            ..Default::default()
920        }))
921        .await?
922        .map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
923
924        let num_pending = consumer.cached_info().num_pending;
925
926        // Drain exactly `num_pending` messages, but bound each await: a half-dead
927        // connection (CLOSE_WAIT) would otherwise park this loop forever, the same
928        // failure `timed()` guards on the write path. On timeout we still fall
929        // through to consumer cleanup, then surface `Timeout`.
930        let mut timed_out = false;
931        if num_pending > 0 {
932            let mut delivered = 0u64;
933            let kv_prefix = format!("$KV.{bucket}.");
934
935            while delivered < num_pending {
936                match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
937                    Ok(Some(msg)) => {
938                        let key = msg
939                            .subject
940                            .strip_prefix(&kv_prefix)
941                            .unwrap_or(msg.subject.as_str())
942                            .to_string();
943
944                        on_msg(msg, key);
945                        delivered += 1;
946                    }
947                    Ok(None) => break, // subscription closed early
948                    Err(_) => {
949                        timed_out = true;
950                        break;
951                    }
952                }
953            }
954        }
955
956        // Clean up ephemeral consumer (best-effort), even on timeout — a stalled
957        // scan shouldn't also leak a server-side consumer. A leaked consumer
958        // lingers on the server and counts against per-stream limits, so surface
959        // failures in observability without failing the operation. Bound the
960        // delete with `timed()`: on the same half-dead (CLOSE_WAIT) connection
961        // that tripped the drain timeout above, an unbounded delete would re-park
962        // here forever, defeating the timeout recovery we just performed.
963        match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
964            Ok(Ok(_)) => {}
965            Ok(Err(e)) => {
966                // `warn!`, not `debug!`: a leaked ephemeral consumer lingers on
967                // the server and counts against per-stream limits. Under a flaky
968                // NATS connection every scan()/keys() leaks one, so this must be
969                // visible in default spans before the pile-up hits the limit.
970                warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
971            }
972            Err(_) => {
973                warn!("timed out deleting ephemeral consumer (best-effort)");
974            }
975        }
976
977        if timed_out {
978            return Err(KvError::Timeout);
979        }
980        Ok(())
981    }
982}
983
984/// Convert a NATS KV entry to a KvUpdate.
985///
986/// Takes the entry by value so the key `String` moves into the `KvUpdate`
987/// instead of allocating a fresh copy per watch event.
988fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
989    use async_nats::jetstream::kv::Operation;
990    let version = VersionToken::from_u64(entry.revision);
991    match entry.operation {
992        Operation::Put => KvUpdate::Put(KvEntry {
993            key: entry.key,
994            value: entry.value.to_vec(),
995            version,
996        }),
997        Operation::Delete => KvUpdate::Delete {
998            key: entry.key,
999            version,
1000        },
1001        Operation::Purge => KvUpdate::Purge {
1002            key: entry.key,
1003            version,
1004        },
1005    }
1006}
1007
1008/// Stream updates from a NATS Watch into a channel until it ends or the receiver drops.
1009async fn stream_watch(
1010    mut watcher: async_nats::jetstream::kv::Watch,
1011    tx: &Sender<KvUpdate>,
1012) -> Result<(), KvError> {
1013    while let Some(entry) = watcher.next().await {
1014        match entry {
1015            Ok(entry) => {
1016                let update = nats_entry_to_kv_update(entry);
1017                if tx.send(update).await.is_err() {
1018                    debug!("watch receiver closed");
1019                    break;
1020                }
1021            }
1022            Err(e) => {
1023                error!(error = %e, "NATS KV watch error");
1024                return Err(KvError::WatchError(e.to_string()));
1025            }
1026        }
1027    }
1028    Ok(())
1029}
1030
1031/// Cadence of the floor guard's no-traffic backstop probe (one stream-info
1032/// RPC per interval per guarded watch). The PRIMARY detection is in-band —
1033/// the gapped-delivery check fires the moment evidence surfaces — so this
1034/// interval only bounds detection latency when NOTHING is being delivered;
1035/// it is not load-bearing for eventual detection.
1036const FLOOR_GUARD_INTERVAL: Duration = Duration::from_secs(30);
1037
1038/// [`stream_watch`] for the dense ALL-scope resume path, with the LIVE
1039/// retention floor guard (`tests/model_live_watch.rs` — the live twin of
1040/// [`NatsKvWatcher::check_resume_window`]).
1041///
1042/// The hazard: retention overrunning a live consumer makes JetStream
1043/// silently skip evicted messages — delete markers included — with no error
1044/// anywhere (the same clamp behavior as resumes, mid-stream). Unguarded,
1045/// that is PERMANENT silent fold divergence; the model proves it reachable.
1046///
1047/// Detection is primarily **in-band**: an unfiltered `ByStartSequence`
1048/// consumer sees every retained message, so a delivered revision that jumps
1049/// the frontier by more than one is evidence of eviction inside the gap.
1050/// The model checker REJECTED a periodic-only design with exactly the trace
1051/// this closes — deliveries can catch the frontier up past the gap between
1052/// probes, erasing the evidence — so the check runs AT the gapped delivery,
1053/// before the entry is processed: fetch `first_sequence` and apply the
1054/// shared kernel (`protocol::resume_window_ok`) to the frontier. A benign
1055/// gap (interior per-subject eviction with the floor still at or below the
1056/// frontier) passes; head eviction past the frontier fails the watch, and
1057/// the caller's restart routes into the verified resume → `CursorExpired` →
1058/// resync repair path. The periodic probe backstops the no-traffic case.
1059///
1060/// Scope: sound only where density holds — the unfiltered resume watch.
1061/// Prefix-scoped watches deliver sparse revisions by design and cannot
1062/// distinguish benign from hazardous eviction client-side; they retain the
1063/// (narrowed) retention-outlives-lag operating axiom plus the resume-time
1064/// check on every restart (model axiom 5).
1065///
1066/// The guarantee split, precisely: the SAFETY half — never folding past
1067/// unexamined evidence of loss — is unconditional in this loop (the gap
1068/// check precedes processing, and a stalled downstream stalls folding too).
1069/// The REPAIR half is conditional on the caller restarting the failed watch
1070/// (standard supervision; same posture as the resync fail-stop): a trip
1071/// with no restart is a loudly dead watch, never a silently wrong one.
1072async fn stream_watch_floor_guarded(
1073    mut watcher: async_nats::jetstream::kv::Watch,
1074    tx: &Sender<KvUpdate>,
1075    resume_revision: u64,
1076    js: &async_nats::jetstream::Context,
1077    bucket: &str,
1078) -> Result<(), KvError> {
1079    let stream_name = format!("KV_{bucket}");
1080    let first_sequence = || async {
1081        let stream = timed(js.get_stream(&stream_name))
1082            .await?
1083            .map_err(|e| KvError::OperationFailed(format!("floor guard stream lookup: {e}")))?;
1084        Ok::<u64, KvError>(stream.cached_info().state.first_sequence)
1085    };
1086    fn trip(frontier: u64, first: u64, bucket: &str) -> KvError {
1087        warn!(
1088            frontier,
1089            first_sequence = first,
1090            bucket,
1091            "stream retention overran this live watch; failing so the restart can resync \
1092             (messages in the gap were evicted unseen)"
1093        );
1094        KvError::WatchError(format!(
1095            "stream retention overran live watch (first_sequence {first} > delivered \
1096             frontier {frontier} + 1); restart will resync"
1097        ))
1098    }
1099
1100    let mut frontier = resume_revision;
1101    let mut backstop = tokio::time::interval(FLOOR_GUARD_INTERVAL);
1102    backstop.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1103    backstop.tick().await; // consume the immediate first tick
1104
1105    loop {
1106        tokio::select! {
1107            entry = watcher.next() => {
1108                let Some(entry) = entry else { break };
1109                match entry {
1110                    Ok(entry) => {
1111                        let revision = entry.revision;
1112                        // In-band gap check BEFORE processing: never fold
1113                        // past unexamined evidence of eviction.
1114                        if revision > frontier.saturating_add(1) {
1115                            let first = first_sequence().await?;
1116                            if !crate::protocol::resume_window_ok(frontier, first) {
1117                                return Err(trip(frontier, first, bucket));
1118                            }
1119                            // Benign interior gap: every evicted revision
1120                            // below a still-low floor was a per-subject
1121                            // overwrite, whose later revision the fold will
1122                            // see — safe for last-write-wins.
1123                        }
1124                        frontier = frontier.max(revision);
1125                        let update = nats_entry_to_kv_update(entry);
1126                        if tx.send(update).await.is_err() {
1127                            debug!("watch receiver closed");
1128                            break;
1129                        }
1130                    }
1131                    Err(e) => {
1132                        error!(error = %e, "NATS KV watch error");
1133                        return Err(KvError::WatchError(e.to_string()));
1134                    }
1135                }
1136            }
1137            _ = backstop.tick() => {
1138                // No-traffic backstop: nothing is being delivered, so the
1139                // in-band check has no evidence to act on; probe the floor
1140                // directly.
1141                let first = first_sequence().await?;
1142                if !crate::protocol::resume_window_ok(frontier, first) {
1143                    return Err(trip(frontier, first, bucket));
1144                }
1145            }
1146        }
1147    }
1148    Ok(())
1149}
1150
1151/// Check if a NATS watch error indicates the requested start sequence is
1152/// too old (compacted), meaning callers should fall back to a full watch.
1153///
1154/// SECOND line of defense only: live nats-server (2.14) does not error on a
1155/// below-head start sequence at all — it silently clamps to the first
1156/// retained message (pinned by `tests/resync.rs`), so the PRIMARY expiry
1157/// detection is [`NatsKvWatcher::check_resume_window`]'s proactive
1158/// `first_sequence` comparison. This matcher remains for server versions or
1159/// paths that do error, where mapping to [`KvError::CursorExpired`] keeps
1160/// the fallback reachable instead of stranding the caller.
1161///
1162/// async-nats has no granular error kind for this: `WatchErrorKind` is only
1163/// `InvalidKey`/`TimedOut`/`ConsumerCreate`/`Other`, and "start sequence too old"
1164/// arrives as `ConsumerCreate`/`Other` with the real reason buried in the source
1165/// error's *message*. So we substring-match the full error string — which already
1166/// includes the source, since `Error`'s `Display` renders `"{kind}: {source}"`.
1167///
1168/// Two deliberate choices make this robust to wording drift:
1169/// - We lowercase first, so a capitalization change in NATS/async-nats can't slip
1170///   past.
1171/// - Detection is biased toward `true`. A false positive only costs an
1172///   unnecessary (but always-safe) full `watch_all()` replay; a false negative
1173///   propagates `WatchError` and strands a caller that would otherwise recover.
1174///
1175/// If these messages ever change, `cursor_expired_matches_known_nats_error_strings`
1176/// is the canary that fails loudly on the next dependency bump.
1177fn is_cursor_expired_error(err: &str) -> bool {
1178    use std::sync::OnceLock;
1179    // One Aho-Corasick automaton over all needles: a single pass over the error
1180    // string regardless of how many needles accumulate as NATS versions reword
1181    // their messages, vs. one `windows()` scan per needle. Case-insensitivity is
1182    // baked into the automaton, so no lowercased copy is allocated either.
1183    static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
1184    MATCHER
1185        .get_or_init(|| {
1186            aho_corasick::AhoCorasick::builder()
1187                .ascii_case_insensitive(true)
1188                .build([
1189                    "start sequence",
1190                    "first sequence",
1191                    "sequence not found",
1192                    "too old",
1193                ])
1194                .expect("static needle set always compiles")
1195        })
1196        .is_match(err)
1197}
1198
1199struct NatsKvWatcher {
1200    kv: Store,
1201    // `watch_prefixes_from` has no async-nats equivalent (there is no
1202    // `watch_many_from_revision`), so it hand-builds the multi-filter ordered
1203    // consumer itself — which needs the raw client (inbox allocation), the
1204    // JetStream context (stream lookup), and the bucket name (subject filters),
1205    // same as the reader's scan path.
1206    client: async_nats::Client,
1207    js: async_nats::jetstream::Context,
1208    bucket: String,
1209}
1210
1211/// Decode a raw KV stream message (as delivered by a hand-built ordered push
1212/// consumer) into a [`KvUpdate`] — the same mapping `async-nats`'s `kv::Watch`
1213/// performs internally for the `watch_*` paths: key from the subject (stripping
1214/// the `$KV.{bucket}.` prefix), operation from the `KV-Operation` header
1215/// (absent = Put), revision from the stream sequence in the ACK reply subject.
1216///
1217/// Returns `None` for a subject outside the bucket's keyspace, which a
1218/// subject-filtered consumer should never deliver — skipped rather than
1219/// surfaced, matching `kv::Watch`'s behavior.
1220fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
1221    let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
1222    // An unparseable (or absent) ACK subject yields the UNKNOWN version, not a
1223    // fabricated revision 0: `from_u64(0)` is a *parseable* position that
1224    // `watch_applied` would adopt as its batch high-water — regressing the
1225    // persisted cursor to 0 and forcing a full replay on the next restart.
1226    // `unknown()` is the honest value; the cursor-authority loop skips it.
1227    let version = msg
1228        .reply
1229        .as_deref()
1230        .and_then(stream_sequence_from_ack)
1231        .map(VersionToken::from_u64)
1232        .unwrap_or_else(VersionToken::unknown);
1233    let operation = msg
1234        .headers
1235        .as_ref()
1236        .and_then(|h| h.get("KV-Operation"))
1237        .map(|v| v.as_str());
1238    Some(match operation {
1239        Some("DEL") => KvUpdate::Delete { key, version },
1240        Some("PURGE") => KvUpdate::Purge { key, version },
1241        // No header (or an explicit "PUT") is a put — the common case carries
1242        // no KV-Operation header at all.
1243        _ => KvUpdate::Put(KvEntry {
1244            key,
1245            value: msg.payload.to_vec(),
1246            version,
1247        }),
1248    })
1249}
1250
1251impl NatsKvWatcher {
1252    /// Proactive cursor-expiry detection, REQUIRED before any `*_from` resume.
1253    ///
1254    /// NATS does **not** error when an ordered consumer's `ByStartSequence`
1255    /// falls below the stream's first retained sequence — it silently delivers
1256    /// from the first available message (pinned against a live nats-server by
1257    /// `tests/resync.rs::nats_silently_clamps_resume_below_first_seq`). A
1258    /// silent clamp skips the gap's evicted messages — delete markers
1259    /// included — without ever taking the `CursorExpired` → resync path, so
1260    /// expiry MUST be detected by comparing the stream's `first_sequence`
1261    /// against the resume point before trusting the consumer. The
1262    /// error-string matching at the consumer-create sites stays as a second
1263    /// line of defense for server versions that do error.
1264    ///
1265    /// Why `first_sequence` is the right boundary: interior (per-subject
1266    /// history) eviction inside the gap is safe for a last-write-wins fold —
1267    /// an overwrite-evicted revision implies a LATER revision of the same
1268    /// subject exists and will be delivered. Lost *deletes* come from head
1269    /// eviction (stream limits/age), which is exactly what advances
1270    /// `first_sequence`. (An admin interior purge of a subject can also
1271    /// destroy a delete marker without moving the head — that is a manual
1272    /// destructive operation, same trust class as deleting the stream.)
1273    ///
1274    /// Head eviction racing the window between this check and consumer
1275    /// creation is the same exposure any live consumer has against
1276    /// aggressive retention; the check bounds the silent gap to that
1277    /// milliseconds-scale window, where the prior behavior left it unbounded.
1278    async fn check_resume_window(&self, revision: u64) -> Result<(), KvError> {
1279        let stream = timed(self.js.get_stream(format!("KV_{}", self.bucket)))
1280            .await?
1281            .map_err(|e| {
1282                KvError::OperationFailed(format!("get KV stream for resume check: {e}"))
1283            })?;
1284        let first = stream.cached_info().state.first_sequence;
1285        // The shared protocol kernel — the same guard the model checker's
1286        // Resume transition executes (`crate::protocol::resume_window_ok`).
1287        if !crate::protocol::resume_window_ok(revision, first) {
1288            warn!(
1289                revision,
1290                first_sequence = first,
1291                "resume cursor is below the stream's first retained sequence; cursor expired"
1292            );
1293            return Err(KvError::CursorExpired);
1294        }
1295        Ok(())
1296    }
1297}
1298
1299#[async_trait]
1300impl KvWatcher for NatsKvWatcher {
1301    async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1302        // `watch_with_history` (DeliverPolicy::LastPerSubject), NOT `watch_all`
1303        // (DeliverPolicy::New): the trait contract is state-sync — current value
1304        // of every key first, then live updates. async-nats's `watch_all` only
1305        // delivers messages published AFTER the consumer exists, which would
1306        // leave a no-cursor consumer empty until keys happen to change.
1307        //
1308        // Bound the watch *setup* with `timed()` for the same reason every KV op
1309        // is bounded: a half-dead (CLOSE_WAIT) NATS connection parks this await
1310        // forever instead of failing. The streaming drain in `stream_watch` is
1311        // intentionally unbounded (a watch is long-lived), but establishing it
1312        // must not be able to hang a reconnecting caller.
1313        let watcher = timed(self.kv.watch_with_history(">"))
1314            .await?
1315            .map_err(|e| KvError::WatchError(e.to_string()))?;
1316        stream_watch(watcher, &tx).await
1317    }
1318
1319    async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1320        // Use native NATS subject-based filtering. KV key "node.abc" maps to
1321        // subject "$KV.BUCKET.node.abc", and ">" is the multi-level wildcard.
1322        // `_with_history` for the same state-sync contract as `watch_all`.
1323        let nats_key = format!("{prefix}>");
1324        let watcher = timed(self.kv.watch_with_history(&nats_key))
1325            .await?
1326            .map_err(|e| KvError::WatchError(e.to_string()))?;
1327        stream_watch(watcher, &tx).await
1328    }
1329
1330    async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
1331        if prefixes.is_empty() {
1332            // Nothing to watch. Critically, do NOT fall through to `watch_many`
1333            // with an empty filter set — an unfiltered ordered consumer would
1334            // watch the WHOLE bucket, the opposite of a scoped watch.
1335            return Ok(());
1336        }
1337        // ONE multi-filter consumer for every prefix (NATS 2.10 `filter_subjects`)
1338        // rather than one consumer per prefix. `watch_many_with_history` builds a
1339        // single ordered push consumer with `filter_subjects = [{p}> ...]` and
1340        // yields the same `Entry` stream as `watch`, so `stream_watch` is reused
1341        // verbatim. This is the per-stream-consumer-count fix: a node scoped to N
1342        // prefixes costs 1 consumer, not N. `_with_history` for the same
1343        // state-sync contract as `watch_all`.
1344        let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
1345        let watcher = timed(self.kv.watch_many_with_history(keys))
1346            .await?
1347            .map_err(|e| KvError::WatchError(e.to_string()))?;
1348        stream_watch(watcher, &tx).await
1349    }
1350
1351    async fn watch_all_from(
1352        &self,
1353        cursor: &WatchCursor,
1354        tx: Sender<KvUpdate>,
1355    ) -> Result<(), KvError> {
1356        let revision = match cursor.as_u64() {
1357            Some(rev) if rev > 0 => rev,
1358            _ => return self.watch_all(tx).await,
1359        };
1360        self.check_resume_window(revision).await?;
1361
1362        let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
1363            Ok(w) => w,
1364            Err(e) => {
1365                let err_str = e.to_string();
1366                if is_cursor_expired_error(&err_str) {
1367                    warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
1368                    return Err(KvError::CursorExpired);
1369                }
1370                return Err(KvError::WatchError(err_str));
1371            }
1372        };
1373        // Re-check AFTER the consumer exists: head eviction in the window
1374        // between the pre-flight check and consumer creation would otherwise
1375        // clamp silently.
1376        self.check_resume_window(revision).await?;
1377
1378        info!(revision, "resumed watch from cursor");
1379        // The LIVE floor guard takes over from here: in-band gapped-delivery
1380        // checks plus a no-traffic backstop, so retention overrunning this
1381        // watch mid-stream fail-stops into the restart→resync repair path
1382        // instead of silently skipping evicted deletes (model:
1383        // tests/model_live_watch.rs).
1384        stream_watch_floor_guarded(watcher, &tx, revision, &self.js, &self.bucket).await
1385    }
1386
1387    async fn watch_prefix_from(
1388        &self,
1389        prefix: &str,
1390        cursor: &WatchCursor,
1391        tx: Sender<KvUpdate>,
1392    ) -> Result<(), KvError> {
1393        let revision = match cursor.as_u64() {
1394            Some(rev) if rev > 0 => rev,
1395            _ => return self.watch_prefix(prefix, tx).await,
1396        };
1397        self.check_resume_window(revision).await?;
1398
1399        let nats_key = format!("{prefix}>");
1400        let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
1401            Ok(w) => w,
1402            Err(e) => {
1403                let err_str = e.to_string();
1404                if is_cursor_expired_error(&err_str) {
1405                    warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
1406                    return Err(KvError::CursorExpired);
1407                }
1408                return Err(KvError::WatchError(err_str));
1409            }
1410        };
1411        // Same post-create re-check as watch_all_from: close the
1412        // check→create eviction window.
1413        self.check_resume_window(revision).await?;
1414
1415        info!(revision, prefix, "resumed prefix watch from cursor");
1416        stream_watch(watcher, &tx).await
1417    }
1418
1419    async fn watch_prefixes_from(
1420        &self,
1421        prefixes: &[&str],
1422        cursor: &WatchCursor,
1423        tx: Sender<KvUpdate>,
1424    ) -> Result<(), KvError> {
1425        use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
1426
1427        if prefixes.is_empty() {
1428            // Same guard as watch_prefixes: an empty filter set must not become
1429            // an unfiltered whole-bucket consumer.
1430            return Ok(());
1431        }
1432        let revision = match cursor.as_u64() {
1433            Some(rev) if rev > 0 => rev,
1434            _ => return self.watch_prefixes(prefixes, tx).await,
1435        };
1436
1437        // async-nats has `watch_many` (multi-filter) and `watch_from_revision`
1438        // (seek) but no combination of the two, so build the multi-filter
1439        // ordered push consumer ourselves — the exact consumer
1440        // `watch_many_with_deliver_policy` would build, with
1441        // `ByStartSequence(cursor+1)` for the delta seek. The ordered-consumer
1442        // machinery (gap detection, auto-recreate from the last delivered
1443        // sequence) comes with `OrderedConfig` for free.
1444        let bucket = self.bucket.as_str();
1445        let kv_prefix = format!("$KV.{bucket}.");
1446        let filter_subjects: Vec<String> = prefixes
1447            .iter()
1448            .map(|p| format!("{kv_prefix}{p}>"))
1449            .collect();
1450
1451        let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
1452            .await?
1453            .map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
1454
1455        // Same proactive expiry detection as `check_resume_window` (NATS
1456        // silently clamps a below-head ByStartSequence; see that method's
1457        // docs) — checked on the stream handle this path already fetched,
1458        // via the shared protocol kernel.
1459        let first = stream.cached_info().state.first_sequence;
1460        if !crate::protocol::resume_window_ok(revision, first) {
1461            warn!(
1462                revision,
1463                first_sequence = first,
1464                ?prefixes,
1465                "resume cursor is below the stream's first retained sequence; cursor expired"
1466            );
1467            return Err(KvError::CursorExpired);
1468        }
1469
1470        let consumer = match timed(stream.create_consumer(push::OrderedConfig {
1471            deliver_subject: self.client.new_inbox(),
1472            description: Some("kv multi-prefix resume consumer".to_string()),
1473            filter_subjects,
1474            replay_policy: ReplayPolicy::Instant,
1475            deliver_policy: DeliverPolicy::ByStartSequence {
1476                start_sequence: revision + 1,
1477            },
1478            ..Default::default()
1479        }))
1480        .await?
1481        {
1482            Ok(c) => c,
1483            Err(e) => {
1484                // Same expiry classification as watch_all_from: a start sequence
1485                // the stream has compacted past surfaces as a consumer-create
1486                // error whose message names the sequence problem.
1487                let err_str = e.to_string();
1488                if is_cursor_expired_error(&err_str) {
1489                    warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
1490                    return Err(KvError::CursorExpired);
1491                }
1492                return Err(KvError::WatchError(err_str));
1493            }
1494        };
1495
1496        // Re-check AFTER the consumer exists (fresh stream info, not the
1497        // handle's cached copy): closes the check→create eviction window,
1498        // same as the single-filter resume paths.
1499        self.check_resume_window(revision).await?;
1500
1501        let mut messages = timed(consumer.messages())
1502            .await?
1503            .map_err(|e| KvError::WatchError(e.to_string()))?;
1504
1505        info!(
1506            revision,
1507            ?prefixes,
1508            "resumed multi-prefix watch from cursor"
1509        );
1510        while let Some(msg) = messages.next().await {
1511            match msg {
1512                Ok(msg) => {
1513                    // A subject-filtered consumer only delivers in-keyspace
1514                    // subjects; `None` here would be a server bug, skipped to
1515                    // match kv::Watch's tolerance.
1516                    let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
1517                        continue;
1518                    };
1519                    if tx.send(update).await.is_err() {
1520                        debug!("watch receiver closed");
1521                        break;
1522                    }
1523                }
1524                Err(e) => {
1525                    error!(error = %e, "NATS KV multi-prefix watch error");
1526                    return Err(KvError::WatchError(e.to_string()));
1527                }
1528            }
1529        }
1530        Ok(())
1531    }
1532}
1533
1534struct NatsKvWriterImpl {
1535    kv: Store,
1536}
1537
1538#[async_trait]
1539impl KvWriter for NatsKvWriterImpl {
1540    async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1541        let rev = timed(self.kv.put(key, value.to_vec().into()))
1542            .await?
1543            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1544        Ok(VersionToken::from_u64(rev))
1545    }
1546
1547    async fn delete(&self, key: &str) -> Result<bool, KvError> {
1548        // NATS delete doesn't tell us if key existed, so we always return true
1549        timed(self.kv.delete(key))
1550            .await?
1551            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1552        Ok(true)
1553    }
1554
1555    async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1556        use async_nats::jetstream::kv::CreateErrorKind;
1557        timed(self.kv.create(key, value.to_vec().into()))
1558            .await?
1559            .map(VersionToken::from_u64)
1560            .map_err(|e| {
1561                if e.kind() == CreateErrorKind::AlreadyExists {
1562                    KvError::AlreadyExists
1563                } else {
1564                    KvError::OperationFailed(e.to_string())
1565                }
1566            })
1567    }
1568
1569    async fn update(
1570        &self,
1571        key: &str,
1572        value: &[u8],
1573        expected: &VersionToken,
1574    ) -> Result<VersionToken, KvError> {
1575        use async_nats::jetstream::kv::UpdateErrorKind;
1576        let rev = expected.as_u64().ok_or_else(|| {
1577            KvError::OperationFailed("invalid version token for NATS update".into())
1578        })?;
1579        timed(self.kv.update(key, value.to_vec().into(), rev))
1580            .await?
1581            .map(VersionToken::from_u64)
1582            .map_err(|e| {
1583                if e.kind() == UpdateErrorKind::WrongLastRevision {
1584                    KvError::RevisionMismatch
1585                } else {
1586                    KvError::OperationFailed(e.to_string())
1587                }
1588            })
1589    }
1590
1591    async fn delete_with_version(
1592        &self,
1593        key: &str,
1594        expected: &VersionToken,
1595    ) -> Result<bool, KvError> {
1596        use async_nats::jetstream::kv::UpdateErrorKind;
1597        let rev = expected.as_u64().ok_or_else(|| {
1598            KvError::OperationFailed("invalid version token for NATS delete".into())
1599        })?;
1600        // Write empty value with CAS — logically deletes while preserving conflict detection
1601        timed(self.kv.update(key, Vec::new().into(), rev))
1602            .await?
1603            .map(|_| true)
1604            .map_err(|e| {
1605                if e.kind() == UpdateErrorKind::WrongLastRevision {
1606                    KvError::RevisionMismatch
1607                } else {
1608                    KvError::OperationFailed(e.to_string())
1609                }
1610            })
1611    }
1612}
1613
1614#[async_trait]
1615impl KvPurge for NatsKvWriterImpl {
1616    async fn purge(&self, key: &str) -> Result<(), KvError> {
1617        // Rollup purge (`Nats-Rollup: sub`): drops all prior revisions of the
1618        // subject, reclaiming bytes against `max_bytes` — unlike `delete`, which
1619        // only appends a marker. Idempotent: purging an absent key is a no-op.
1620        timed(self.kv.purge(key))
1621            .await?
1622            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1623        Ok(())
1624    }
1625}
1626
1627impl std::fmt::Debug for NatsConnection {
1628    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1629        f.debug_struct("NatsConnection")
1630            .field("url", &self.config.url)
1631            // `Acquire` to match every other read of `healthy` — a `Relaxed`
1632            // outlier here reads like a deliberate exception during an atomics
1633            // audit, and the fmt path is far too cold for the ordering to cost
1634            // anything.
1635            .field("healthy", &self.healthy.load(Ordering::Acquire))
1636            .finish()
1637    }
1638}
1639
1640#[cfg(test)]
1641mod tests {
1642    use super::*;
1643
1644    #[test]
1645    fn raw_create_success_has_no_error() {
1646        // A successful STREAM.CREATE echoes back the stream config, no "error".
1647        let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
1648        assert_eq!(
1649            classify_raw_create_response(payload),
1650            RawCreateOutcome::Created
1651        );
1652    }
1653
1654    #[test]
1655    fn raw_create_swallows_stream_already_exists() {
1656        // 10058 = stream name already in use → the bucket already exists, OK.
1657        let payload =
1658            br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
1659        assert_eq!(
1660            classify_raw_create_response(payload),
1661            RawCreateOutcome::AlreadyExists
1662        );
1663    }
1664
1665    #[test]
1666    fn raw_create_swallows_stream_limit() {
1667        // Synadia Cloud returns 400 + "maximum number of streams" at the limit,
1668        // but the bucket may already exist — treat as non-fatal.
1669        let payload =
1670            br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
1671        assert_eq!(
1672            classify_raw_create_response(payload),
1673            RawCreateOutcome::StreamLimit
1674        );
1675    }
1676
1677    #[test]
1678    fn raw_create_propagates_unknown_error() {
1679        // Any other JetStream error is fatal and must surface code + description.
1680        let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
1681        match classify_raw_create_response(payload) {
1682            RawCreateOutcome::Failed { code, description } => {
1683                assert_eq!(code, 403);
1684                assert_eq!(description, "insufficient permissions");
1685            }
1686            other => panic!("expected Failed, got {other:?}"),
1687        }
1688    }
1689
1690    #[test]
1691    fn raw_create_400_without_stream_limit_is_fatal() {
1692        // A bare 400 that isn't the stream-limit message must NOT be swallowed,
1693        // otherwise a genuine bad-config rejection would masquerade as success.
1694        let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
1695        match classify_raw_create_response(payload) {
1696            RawCreateOutcome::Failed { code, description } => {
1697                assert_eq!(code, 400);
1698                assert!(description.contains("invalid stream config"));
1699            }
1700            other => panic!("expected Failed, got {other:?}"),
1701        }
1702    }
1703
1704    #[test]
1705    fn raw_create_unparseable_payload_is_treated_as_success() {
1706        // The caller re-verifies with get_key_value, so a garbled body must not
1707        // be reported as a hard failure here.
1708        assert_eq!(
1709            classify_raw_create_response(b"not json at all"),
1710            RawCreateOutcome::Created
1711        );
1712    }
1713
1714    #[test]
1715    fn ack_subject_legacy_format() {
1716        // $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1717        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1718        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1719    }
1720
1721    #[test]
1722    fn ack_subject_modern_format_with_domain_and_account() {
1723        // $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1724        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
1725        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1726    }
1727
1728    #[test]
1729    fn ack_subject_modern_format_with_trailing_token() {
1730        // Some servers append a random trailing token (12 tokens total).
1731        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
1732        assert_eq!(stream_sequence_from_ack(reply), Some(99));
1733    }
1734
1735    #[test]
1736    fn ack_subject_last_token_is_not_the_sequence() {
1737        // Regression guard: the final token is num_pending, never the sequence.
1738        // The old code returned this (0), corrupting every scanned entry's version.
1739        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1740        assert_ne!(stream_sequence_from_ack(reply), Some(0));
1741    }
1742
1743    #[test]
1744    fn ack_subject_rejects_garbage() {
1745        assert_eq!(stream_sequence_from_ack(""), None);
1746        assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
1747        assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
1748        // Right shape, non-numeric sequence field.
1749        assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
1750    }
1751
1752    #[test]
1753    fn cursor_expired_matches_known_nats_error_strings() {
1754        // These substrings come from async-nats error messages. If the library
1755        // rewrites them, watch_all_from would return WatchError instead of
1756        // CursorExpired, breaking callers that fall back to watch_all() on expiry.
1757        assert!(is_cursor_expired_error(
1758            "consumer start sequence is too old"
1759        ));
1760        assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
1761        assert!(is_cursor_expired_error("sequence not found in stream"));
1762        // "too old" on its own (no "sequence" wording) must still be caught.
1763        assert!(is_cursor_expired_error("requested revision is too old"));
1764        // Case-insensitive: a capitalization change upstream must not slip past.
1765        assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
1766        assert!(!is_cursor_expired_error("connection refused"));
1767        assert!(!is_cursor_expired_error("permission denied"));
1768        assert!(!is_cursor_expired_error("stream not found"));
1769    }
1770
1771    fn raw_kv_msg(
1772        subject: &str,
1773        reply: Option<&str>,
1774        payload: &[u8],
1775        op: Option<&str>,
1776    ) -> async_nats::Message {
1777        let headers = op.map(|op| {
1778            let mut h = async_nats::HeaderMap::new();
1779            h.insert("KV-Operation", op);
1780            h
1781        });
1782        async_nats::Message {
1783            subject: subject.to_string().into(),
1784            reply: reply.map(|r| r.to_string().into()),
1785            payload: payload.to_vec().into(),
1786            headers,
1787            status: None,
1788            description: None,
1789            length: 0,
1790        }
1791    }
1792
1793    const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1794
1795    #[test]
1796    fn kv_message_decodes_put_without_operation_header() {
1797        // The common case: a put carries no KV-Operation header at all.
1798        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
1799        match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1800            KvUpdate::Put(e) => {
1801                assert_eq!(e.key, "node.a");
1802                assert_eq!(e.value, b"v1");
1803                assert_eq!(e.version.as_u64(), Some(42));
1804            }
1805            other => panic!("expected Put, got {other:?}"),
1806        }
1807    }
1808
1809    #[test]
1810    fn kv_message_decodes_delete_and_purge_markers() {
1811        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
1812        assert!(matches!(
1813            kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1814            KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
1815        ));
1816
1817        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
1818        assert!(matches!(
1819            kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1820            KvUpdate::Purge { ref key, .. } if key == "node.a"
1821        ));
1822    }
1823
1824    #[test]
1825    fn kv_message_outside_keyspace_is_skipped() {
1826        // A subject-filtered consumer should never deliver this; the decode
1827        // skips rather than mis-keys it.
1828        let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
1829        assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
1830    }
1831
1832    #[test]
1833    fn kv_message_without_reply_gets_unknown_version() {
1834        // No ACK reply subject → revision unparseable → the UNKNOWN token,
1835        // never a fabricated revision 0. A parseable `Some(0)` would be
1836        // adopted by watch_applied as a real batch high-water and regress
1837        // the persisted cursor to 0 (full replay on next restart); unknown
1838        // is skipped by the cursor-authority loop instead.
1839        let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
1840        match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1841            KvUpdate::Put(e) => {
1842                assert!(e.version.is_unknown());
1843                assert_eq!(e.version.as_u64(), None);
1844            }
1845            other => panic!("expected Put, got {other:?}"),
1846        }
1847    }
1848
1849    #[test]
1850    fn raw_create_already_exists_when_10058_in_code_field() {
1851        // Some Synadia Cloud deployments echo 10058 in `code` rather than
1852        // `err_code`. Both paths must return AlreadyExists, not Failed.
1853        let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
1854        assert_eq!(
1855            classify_raw_create_response(payload),
1856            RawCreateOutcome::AlreadyExists
1857        );
1858    }
1859
1860    #[test]
1861    fn raw_create_error_without_code_defaults_to_zero() {
1862        // Defensive: a malformed error object still classifies as Failed rather
1863        // than silently passing, with code defaulting to 0.
1864        let payload = br#"{"error":{"description":"mystery"}}"#;
1865        match classify_raw_create_response(payload) {
1866            RawCreateOutcome::Failed { code, description } => {
1867                assert_eq!(code, 0);
1868                assert_eq!(description, "mystery");
1869            }
1870            other => panic!("expected Failed, got {other:?}"),
1871        }
1872    }
1873}
1874
1875/// Live-server conformance tests for the floor guard
1876/// ([`stream_watch_floor_guarded`]) — these drive the guarded loop DIRECTLY
1877/// with a deliberately clamped `Watch`, which reproduces exactly the state
1878/// retention leaves behind when it overruns a live consumer (the watcher
1879/// methods' resume-time checks can't be raced deterministically from
1880/// outside, but the guarded loop neither knows nor cares how its watch got
1881/// clamped). Spawns a throwaway `nats-server` (mise-installed, same pattern
1882/// as tests/common).
1883#[cfg(test)]
1884mod floor_guard_tests {
1885    use super::*;
1886    use std::process::{Child, Command, Stdio};
1887
1888    struct TestServer {
1889        child: Child,
1890        url: String,
1891        _dir: tempfile::TempDir,
1892    }
1893
1894    impl Drop for TestServer {
1895        fn drop(&mut self) {
1896            let _ = self.child.kill();
1897            let _ = self.child.wait();
1898        }
1899    }
1900
1901    async fn start_server() -> TestServer {
1902        let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".into());
1903        let port = std::net::TcpListener::bind("127.0.0.1:0")
1904            .unwrap()
1905            .local_addr()
1906            .unwrap()
1907            .port();
1908        let dir = tempfile::tempdir().unwrap();
1909        let child = Command::new(&bin)
1910            .args([
1911                "--jetstream",
1912                "--addr",
1913                "127.0.0.1",
1914                "--port",
1915                &port.to_string(),
1916                "--store_dir",
1917                dir.path().to_str().unwrap(),
1918            ])
1919            .stdout(Stdio::null())
1920            .stderr(Stdio::null())
1921            .spawn()
1922            .unwrap_or_else(|e| panic!("spawn {bin}: {e}; run `mise install`"));
1923        let server = TestServer {
1924            child,
1925            url: format!("nats://127.0.0.1:{port}"),
1926            _dir: dir,
1927        };
1928        for _ in 0..100 {
1929            if async_nats::connect(&server.url).await.is_ok() {
1930                return server;
1931            }
1932            tokio::time::sleep(Duration::from_millis(100)).await;
1933        }
1934        panic!("nats-server never became ready");
1935    }
1936
1937    /// `(js, kv store)` with five revisions across five subjects (history 1).
1938    async fn seeded_bucket(
1939        url: &str,
1940    ) -> (
1941        async_nats::jetstream::Context,
1942        async_nats::jetstream::kv::Store,
1943    ) {
1944        let client = async_nats::connect(url).await.unwrap();
1945        let js = async_nats::jetstream::new(client);
1946        let kv = js
1947            .create_key_value(async_nats::jetstream::kv::Config {
1948                bucket: "guard".into(),
1949                history: 1,
1950                ..Default::default()
1951            })
1952            .await
1953            .unwrap();
1954        for i in 1..=5u8 {
1955            kv.put(format!("k{i}"), vec![i].into()).await.unwrap();
1956        }
1957        (js, kv)
1958    }
1959
1960    /// `KvPurge::purge` must reclaim bytes against `max_bytes` — unlike
1961    /// `delete`/`delete_with_version`, which only append markers. This is the
1962    /// in-repo twin of the "does purge actually free bytes?" gate: fill a
1963    /// bucket, purge half the keys, assert the stream's byte count drops.
1964    #[tokio::test(flavor = "multi_thread")]
1965    async fn purge_reclaims_bytes() {
1966        use crate::kv::KvPurge;
1967
1968        let server = start_server().await;
1969        let client = async_nats::connect(&server.url).await.unwrap();
1970        let js = async_nats::jetstream::new(client);
1971        let kv = js
1972            .create_key_value(async_nats::jetstream::kv::Config {
1973                bucket: "purge".into(),
1974                history: 1,
1975                ..Default::default()
1976            })
1977            .await
1978            .unwrap();
1979
1980        // Fill with sizable values across many distinct keys.
1981        let val = vec![b'x'; 4096];
1982        for i in 0..50u32 {
1983            kv.put(format!("k{i}"), val.clone().into()).await.unwrap();
1984        }
1985        let before = js
1986            .get_stream("KV_purge")
1987            .await
1988            .unwrap()
1989            .info()
1990            .await
1991            .unwrap()
1992            .state
1993            .bytes;
1994
1995        // Purge half the keys through the KvPurge impl.
1996        let writer = NatsKvWriterImpl { kv: kv.clone() };
1997        for i in 0..25u32 {
1998            writer.purge(&format!("k{i}")).await.unwrap();
1999        }
2000
2001        // Purge is a rollup: prior revisions of each subject are removed, so the
2002        // stream's byte count must fall. (A residual purge marker may remain per
2003        // subject — far smaller than the 4KiB value — so we assert a strict drop,
2004        // not zero.) Poll briefly in case the server reflects reclamation async.
2005        let mut after = before;
2006        for _ in 0..20 {
2007            after = js
2008                .get_stream("KV_purge")
2009                .await
2010                .unwrap()
2011                .info()
2012                .await
2013                .unwrap()
2014                .state
2015                .bytes;
2016            if after < before {
2017                break;
2018            }
2019            tokio::time::sleep(Duration::from_millis(100)).await;
2020        }
2021        assert!(
2022            after < before,
2023            "purge must reclaim bytes: before={before} after={after}"
2024        );
2025
2026        // Purge is idempotent: re-purging an absent key is not an error.
2027        writer.purge("k0").await.unwrap();
2028    }
2029
2030    /// TRUE POSITIVE: the watch was clamped past evicted revisions (purge
2031    /// advanced first_seq beyond the frontier) — the first gapped delivery
2032    /// must trip the guard BEFORE the entry is processed, never silently
2033    /// folding past the lost range. This is the live twin of the model's
2034    /// `GuardRepair`-only-progress gate.
2035    #[tokio::test(flavor = "multi_thread")]
2036    async fn gapped_delivery_with_advanced_floor_trips() {
2037        let server = start_server().await;
2038        let (js, kv) = seeded_bucket(&server.url).await;
2039
2040        // Evict revisions 1-3 outright: first_sequence becomes 4.
2041        let mut stream = js.get_stream("KV_guard").await.unwrap();
2042        stream.purge().sequence(4).await.unwrap();
2043        assert_eq!(stream.info().await.unwrap().state.first_sequence, 4);
2044
2045        // A consumer resuming from revision 1 gets CLAMPED to revision 4
2046        // (NATS's silent skip, pinned by tests/resync.rs). Hand that watch
2047        // to the guarded loop as a live consumer whose retention just
2048        // overran it.
2049        let watch = kv.watch_all_from_revision(2).await.unwrap();
2050        let (tx, mut rx) = tokio::sync::mpsc::channel(64);
2051        let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
2052
2053        // The 5s bound pins IN-BAND detection: the trip must come from the
2054        // gapped-delivery check itself, not the 30s no-traffic backstop. A
2055        // regression to periodic-only detection (the design the model
2056        // checker rejected — catch-up between probes erases the evidence)
2057        // fails this bound.
2058        let err = tokio::time::timeout(
2059            Duration::from_secs(5),
2060            stream_watch_floor_guarded(watch, &tx, 1, &js, "guard"),
2061        )
2062        .await
2063        .expect("the trip must be IN-BAND (immediate), not backstop-paced")
2064        .expect_err("a gapped delivery over an advanced floor must trip");
2065        assert!(
2066            err.to_string().contains("retention overran live watch"),
2067            "{err}"
2068        );
2069        drop(tx);
2070        let _ = drain.await;
2071    }
2072
2073    /// NO FALSE POSITIVE: interior (per-subject) eviction also gaps the
2074    /// delivered revisions, but the floor stays at or below the frontier —
2075    /// benign for a last-write-wins fold, and the guard must let it
2076    /// through. (Every existing bootstrap e2e also rides this path on its
2077    /// resume; this pins the discrimination explicitly.)
2078    #[tokio::test(flavor = "multi_thread")]
2079    async fn benign_interior_gap_passes() {
2080        let server = start_server().await;
2081        let (js, kv) = seeded_bucket(&server.url).await;
2082
2083        // Overwrite k2 and k3: revisions 2 and 3 are interior-evicted
2084        // (history 1), revisions 6 and 7 replace them. first_sequence stays
2085        // 1 (k1's revision is retained).
2086        kv.put("k2", vec![22].into()).await.unwrap();
2087        kv.put("k3", vec![33].into()).await.unwrap();
2088        let mut stream = js.get_stream("KV_guard").await.unwrap();
2089        assert_eq!(stream.info().await.unwrap().state.first_sequence, 1);
2090
2091        // Resume from revision 1: deliveries jump 2 and 3 — gapped, benign.
2092        let watch = kv.watch_all_from_revision(2).await.unwrap();
2093        let (tx, mut rx) = tokio::sync::mpsc::channel(64);
2094        let guard =
2095            tokio::spawn(
2096                async move { stream_watch_floor_guarded(watch, &tx, 1, &js, "guard").await },
2097            );
2098
2099        let mut got = Vec::new();
2100        while got.len() < 4 {
2101            let update = tokio::time::timeout(Duration::from_secs(5), rx.recv())
2102                .await
2103                .expect("deliveries continue past benign gaps")
2104                .expect("watch alive");
2105            got.push(update.version().as_u64().unwrap());
2106        }
2107        assert_eq!(got, vec![4, 5, 6, 7], "interior gaps jumped, tail dense");
2108        guard.abort(); // endless live watch; the assertion above is the test
2109    }
2110}