Skip to main content

slipstream/
nats.rs

1use async_nats::jetstream::kv::Store;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, Ordering},
7};
8use std::time::Duration;
9use tokio::sync::{RwLock, mpsc::Sender};
10use tracing::{debug, error, info, warn};
11
12use crate::kv::{
13    KvEntry, KvError, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
14};
15use crate::stores::{Connection, ConnectionCapabilities, KvStore, StoreConfig};
16
17/// Default per-operation timeout for NATS KV ops. async-nats's request/response
18/// futures don't fail in-flight requests when the underlying TCP connection
19/// goes half-dead (CLOSE_WAIT) — they just await forever. Without a timeout
20/// here, ANY hung NATS connection translates into a tokio runtime deadlock as
21/// soon as enough callers queue behind the dead connection. 30s is generous
22/// for legitimate slow ops (cold JetStream stream sync, leader election under
23/// load) and short enough that a dead connection recovers within reasonable
24/// human-debug latency.
25const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
26
27/// Server-side inactivity reaper for the ephemeral consumers `scan()`/`keys()`
28/// create. Our code deletes each consumer explicitly when the drain finishes,
29/// but that delete is best-effort: on a half-dead (CLOSE_WAIT) connection it
30/// times out, orphaning the consumer server-side where it counts against the
31/// per-stream consumer limit. Setting `inactive_threshold` makes JetStream reap
32/// any consumer that sees no activity for this long, so a failed explicit delete
33/// self-heals instead of accumulating until the limit is hit. Comfortably longer
34/// than [`KV_OP_TIMEOUT`] so it never reaps a legitimately slow in-flight drain
35/// (each delivery resets the inactivity timer).
36const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
37
38/// Run a future under [`KV_OP_TIMEOUT`], returning [`KvError::Timeout`] if it
39/// doesn't complete in time. Preserves the inner future's `Result` so callers
40/// keep their existing error-mapping logic.
41async fn timed<F, T>(fut: F) -> Result<T, KvError>
42where
43    F: std::future::Future<Output = T>,
44{
45    tokio::time::timeout(KV_OP_TIMEOUT, fut)
46        .await
47        .map_err(|_| KvError::Timeout)
48}
49
50/// Build NATS connect options (with auth applied) and resolve the URL to dial.
51///
52/// Split out from [`nats_connect`] so the connection lifecycle can attach an
53/// event callback (for health tracking) to the *same* options before dialing,
54/// without duplicating the auth-priority logic. Returns the options plus the URL
55/// to connect to (which may differ from `url` when credentials are stripped out
56/// of a `user:pass@host` URL).
57///
58/// Auth priority (first match wins):
59/// 1. Inline credentials (base64-encoded .creds content)
60/// 2. Credentials file (if provided)
61/// 3. URL-embedded credentials (user:pass@host)
62/// 4. No authentication
63async fn build_connect_options(
64    url: &str,
65    creds: Option<&str>,
66    creds_file: Option<&str>,
67) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
68    // Priority 1: Inline credentials (base64-encoded)
69    if let Some(encoded) = creds {
70        use base64::Engine;
71        let decoded = base64::engine::general_purpose::STANDARD
72            .decode(encoded)
73            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
74        let content = String::from_utf8(decoded)
75            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
76        return Ok((
77            async_nats::ConnectOptions::with_credentials(&content)?,
78            url.to_string(),
79        ));
80    }
81
82    // Priority 2: Credentials file
83    if let Some(path) = creds_file {
84        return Ok((
85            async_nats::ConnectOptions::with_credentials_file(path).await?,
86            url.to_string(),
87        ));
88    }
89
90    // Priority 3: URL-embedded credentials
91    if let Ok(parsed) = url::Url::parse(url)
92        && !parsed.username().is_empty()
93    {
94        let user = parsed.username().to_string();
95        let pass = parsed.password().unwrap_or("").to_string();
96        // Rebuild URL without credentials. If the scheme doesn't support
97        // userinfo, these calls fail and the credentials would remain embedded
98        // in the URL we later log — warn loudly rather than silently leak them.
99        let mut clean_url = parsed.clone();
100        if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
101            warn!("could not strip credentials from NATS URL; they may appear in logs");
102        }
103        return Ok((
104            async_nats::ConnectOptions::with_user_and_password(user, pass),
105            clean_url.as_str().to_string(),
106        ));
107    }
108
109    // Priority 4: No authentication
110    Ok((async_nats::ConnectOptions::new(), url.to_string()))
111}
112
113/// Connect to NATS with various authentication methods.
114///
115/// Supports the auth-priority order documented on [`build_connect_options`].
116/// This is the standalone helper; the [`Connection`] impl builds options the
117/// same way but also installs a health-tracking event callback.
118pub async fn nats_connect(
119    url: &str,
120    creds: Option<&str>,
121    creds_file: Option<&str>,
122) -> Result<async_nats::Client, async_nats::ConnectError> {
123    let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
124    opts.connect(dial_url).await
125}
126
127/// Render an untrusted server payload for logging: borrowed as-is when valid
128/// UTF-8, lowercase hex otherwise. `from_utf8_lossy` would mash every invalid
129/// byte into U+FFFD — exactly the bytes an incident needs to see — so the
130/// fallback preserves them losslessly instead.
131fn payload_for_log(payload: &[u8]) -> std::borrow::Cow<'_, str> {
132    match std::str::from_utf8(payload) {
133        Ok(s) => std::borrow::Cow::Borrowed(s),
134        Err(_) => std::borrow::Cow::Owned(format!("0x{}", crate::artifact::hex_encode(payload))),
135    }
136}
137
138/// Configuration for NATS connection.
139///
140/// `Debug` is hand-written, not derived: `creds` holds decoded credential
141/// material and `creds_file` a filesystem path to secrets. A derived `Debug`
142/// would print both verbatim the moment anyone `{:?}`-formats the config (a
143/// `tracing` span, an error context, a test dump), leaking credentials into
144/// logs. The redacting impl below keeps that from being one careless format
145/// string away.
146#[derive(Clone)]
147pub struct NatsConnectionConfig {
148    pub url: String,
149    /// Base64-encoded .creds file content (for ECS / containerized environments).
150    pub creds: Option<String>,
151    /// Path to .creds file on disk (for bare-metal / local development).
152    pub creds_file: Option<String>,
153}
154
155impl std::fmt::Debug for NatsConnectionConfig {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("NatsConnectionConfig")
158            .field("url", &self.url)
159            // Presence, never content: enough to debug "are creds set?" without
160            // ever rendering the secret itself. The same applies to `creds_file`:
161            // a path like `/run/secrets/prod.creds` leaks the secrets layout into
162            // logs/traces, so redact it to presence too.
163            .field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
164            .field(
165                "creds_file",
166                &self.creds_file.as_ref().map(|_| "[redacted]"),
167            )
168            .finish()
169    }
170}
171
172/// Create a KV bucket using raw JetStream API (bypasses async-nats response parsing issues).
173///
174/// Synadia Cloud returns responses that `async_nats` can't parse. This function
175/// uses the raw JetStream API directly, bypassing the client's response deserialization.
176///
177/// `pub(crate)`: this is an internal Synadia Cloud workaround invoked by
178/// `get_or_create_bucket`, not a stable entry point. Exposing it would pin a
179/// vendor quirk into the crate's semver surface.
180pub(crate) async fn create_kv_bucket_raw(
181    client: &async_nats::Client,
182    bucket: &str,
183    max_bytes: i64,
184    history: i64,
185    max_age_nanos: i64,
186    num_replicas: usize,
187) -> Result<(), KvError> {
188    let stream_name = format!("KV_{}", bucket);
189    let subject = format!("$KV.{}.>", bucket);
190
191    // JetStream stream config for KV bucket
192    let config = serde_json::json!({
193        "name": stream_name,
194        "subjects": [subject],
195        "max_msgs_per_subject": history,
196        "max_bytes": max_bytes,
197        "max_age": max_age_nanos,
198        "storage": "file",
199        "allow_rollup_hdrs": true,
200        "deny_delete": false,
201        "deny_purge": false,
202        "allow_direct": true,
203        "discard": "new",
204        "num_replicas": num_replicas,
205        "retention": "limits"
206    });
207
208    let payload = serde_json::to_vec(&config)
209        .map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
210
211    let response = client
212        .request(
213            format!("$JS.API.STREAM.CREATE.{}", stream_name),
214            payload.into(),
215        )
216        .await
217        .map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
218
219    debug!(bucket, response = %payload_for_log(&response.payload), "raw JetStream response");
220
221    match classify_raw_create_response(&response.payload) {
222        RawCreateOutcome::AlreadyExists => {
223            info!(bucket, "bucket already exists");
224            Ok(())
225        }
226        RawCreateOutcome::StreamLimit => {
227            info!(bucket, "stream limit reached, bucket may already exist");
228            Ok(())
229        }
230        RawCreateOutcome::Created => {
231            info!(bucket, "bucket created successfully via raw API");
232            Ok(())
233        }
234        RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
235            "JetStream error {}: {}",
236            code, description
237        ))),
238    }
239}
240
241/// Classification of a raw `$JS.API.STREAM.CREATE` response payload.
242///
243/// Separated from the I/O in [`create_kv_bucket_raw`] so the Synadia Cloud
244/// error-code logic — the reason this raw path exists — is unit-testable
245/// without a live NATS server.
246#[derive(Debug, PartialEq, Eq)]
247enum RawCreateOutcome {
248    /// No error in the response: the bucket was created.
249    Created,
250    /// `10058` — stream name already in use; the bucket exists. Non-fatal.
251    AlreadyExists,
252    /// `400` "maximum number of streams"; Synadia Cloud returns this at the
253    /// stream limit even when the bucket already exists. Non-fatal.
254    StreamLimit,
255    /// Any other JetStream error — fatal.
256    Failed { code: i64, description: String },
257}
258
259fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
260    // Unparseable payloads are treated as success: the caller re-verifies the
261    // bucket with a follow-up `get_key_value`, so a garbled body here does not
262    // mask a real failure. Warn so the fallback assumption is auditable — if the
263    // re-verify step is ever refactored away, this log is the breadcrumb.
264    //
265    // INVARIANT: this `Created`-on-garbage path is only sound because every
266    // caller re-verifies the bucket exists after `create_kv_bucket_raw` returns
267    // Ok. The sole caller — `get_or_create_bucket` — does so via the
268    // `timed(js.get_key_value(...))` immediately after the raw-create fallback.
269    // Do not remove that re-verify without making this path return `Failed`.
270    let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
271        warn!(
272            response = %payload_for_log(payload),
273            "unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
274        );
275        return RawCreateOutcome::Created;
276    };
277
278    let Some(err) = json.get("error") else {
279        return RawCreateOutcome::Created;
280    };
281
282    // JetStream splits its error codes: `code` is the HTTP-style status (400,
283    // 404, 500) while `err_code` carries the granular code (e.g. 10058). The
284    // already-exists code can surface in either field depending on the server
285    // (standard NATS puts 10058 in `err_code` with `code` = 400; some managed
286    // deployments echo it in `code`), so we accept it in either.
287    let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
288    let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
289    let description = err
290        .get("description")
291        .and_then(|d| d.as_str())
292        .unwrap_or("unknown error");
293
294    // 10058 = stream name already in use (bucket exists) - that's OK
295    if code == 10058 || err_code == 10058 {
296        return RawCreateOutcome::AlreadyExists;
297    }
298
299    // 400 "maximum number of streams reached" may also mean bucket exists
300    // (Synadia Cloud returns this when at stream limit but bucket exists)
301    if code == 400 && description.contains("maximum number of streams") {
302        return RawCreateOutcome::StreamLimit;
303    }
304
305    RawCreateOutcome::Failed {
306        code,
307        description: description.to_string(),
308    }
309}
310
311struct NatsHandle {
312    // Held to keep the NATS connection alive for the lifetime of the handle.
313    // The `jetstream` context clones an internal reference, but this field is
314    // the authoritative owner — dropping the handle drops the connection.
315    // `dead_code` because we never read it directly after construction.
316    #[allow(dead_code)]
317    client: async_nats::Client,
318    jetstream: async_nats::jetstream::Context,
319}
320
321/// NATS JetStream KV connection.
322pub struct NatsConnection {
323    config: NatsConnectionConfig,
324    handle: RwLock<Option<NatsHandle>>,
325    // Shared with the installed client's event callback so `is_healthy()`
326    // tracks real connection state (Connected/Disconnected) rather than staying
327    // pinned at its connect-time value. `Arc` because the callback outlives this
328    // struct's borrow — it runs on the client's event-loop task.
329    healthy: Arc<AtomicBool>,
330    // Set only for connections built via `from_client`, where no health-tracking
331    // event callback could be installed (the client was already connected).
332    // `is_healthy()` consults this client's live `connection_state()` instead of
333    // the callback-driven `healthy` flag. `None` for the `new()` + `connect()`
334    // path, whose flag is kept current by the installed event callback.
335    //
336    // `Some(_)` is also the marker that this connection borrows a caller-owned
337    // client: it carries no URL or credentials of its own (see `from_client`),
338    // so it cannot redial. `connect()` refuses to reconnect such a connection
339    // rather than dialing the empty config URL and surfacing a cryptic error.
340    state_probe: Option<async_nats::Client>,
341}
342
343impl NatsConnection {
344    pub fn new(config: NatsConnectionConfig) -> Self {
345        Self {
346            config,
347            handle: RwLock::new(None),
348            healthy: Arc::new(AtomicBool::new(false)),
349            state_probe: None,
350        }
351    }
352
353    /// Create a NatsConnection from an existing NATS client.
354    ///
355    /// This is useful when the caller already has a NATS connection and wants
356    /// to reuse it for KV stores instead of creating a new connection.
357    pub fn from_client(client: async_nats::Client) -> Self {
358        let jetstream = async_nats::jetstream::new(client.clone());
359        let config = NatsConnectionConfig {
360            url: String::new(), // Not used when pre-connected
361            creds: None,
362            creds_file: None,
363        };
364
365        // Clone a probe handle before the client moves into `NatsHandle`.
366        // `async_nats::Client` is cheap to clone (internally an `Arc`), and
367        // `connection_state()` just reads a watch channel — no I/O.
368        let state_probe = Some(client.clone());
369        let handle = NatsHandle { client, jetstream };
370
371        Self {
372            config,
373            handle: RwLock::new(Some(handle)),
374            // A pre-connected client carries no health-tracking callback (we
375            // didn't build its options), so `is_healthy()` reads the client's
376            // live `connection_state()` via `state_probe`. The flag below only
377            // gates explicit `shutdown()`.
378            healthy: Arc::new(AtomicBool::new(true)),
379            state_probe,
380        }
381    }
382
383    async fn get_or_create_bucket(
384        client: &async_nats::Client,
385        js: &async_nats::jetstream::Context,
386        config: &StoreConfig,
387    ) -> Result<Store, KvError> {
388        // Try to get existing bucket first. Bound the call so a slow/dead
389        // NATS connection at startup can't park the daemon's init thread
390        // forever — the rest of startup (HTTP listener bind, etc.) happens
391        // after this. Without the timeout, a single bad NATS round-trip
392        // here held HTTP bind for 30s+ in observed cases.
393        //
394        // A failure here (permission denied, JetStream disabled, timeout) is not
395        // necessarily fatal — the bucket may simply not exist yet, so we fall
396        // through to create. But surface the original error first: otherwise a
397        // later create failure (e.g. "permission denied on STREAM.CREATE") masks
398        // the real cause ("permission denied on STREAM.INFO") and makes the
399        // failure undebuggable under load.
400        match timed(js.get_key_value(&config.name)).await {
401            Ok(Ok(kv)) => return Ok(kv),
402            Ok(Err(e)) => {
403                debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
404            }
405            Err(_) => {
406                warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
407            }
408        }
409
410        // Bucket doesn't exist, create it
411        let mut kv_config = async_nats::jetstream::kv::Config {
412            bucket: config.name.clone(),
413            num_replicas: config.num_replicas.unwrap_or(1),
414            ..Default::default()
415        };
416
417        // Apply max_age (bucket-level TTL) if specified. `as_nanos()` is u128;
418        // saturate to i64::MAX rather than `as i64`, which would silently wrap a
419        // >292-year duration into a negative (and thus meaningless) TTL.
420        let max_age_nanos = if let Some(max_age) = config.max_age {
421            kv_config.max_age = max_age;
422            i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
423        } else {
424            0
425        };
426
427        // Apply max_history if specified. `i64::from` is lossless for a u32 and
428        // states the widening intent, where `as i64` would quietly mask a future
429        // type change that no longer fits.
430        let history = if let Some(history) = config.max_history {
431            let history = i64::from(history);
432            kv_config.history = history;
433            history
434        } else {
435            1
436        };
437
438        // Apply max_bytes if specified (required by Synadia Cloud)
439        let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); // Default 10MB for Synadia Cloud
440        kv_config.max_bytes = max_bytes;
441
442        // Try normal create first, fall back to raw API if it fails (Synadia Cloud compatibility)
443        match timed(js.create_key_value(kv_config)).await? {
444            Ok(kv) => Ok(kv),
445            Err(e) => {
446                warn!(
447                    bucket = config.name,
448                    error = ?e,
449                    "create_key_value failed, trying raw JetStream API"
450                );
451
452                // Try raw JetStream API as fallback
453                create_kv_bucket_raw(
454                    client,
455                    &config.name,
456                    max_bytes,
457                    history,
458                    max_age_nanos,
459                    config.num_replicas.unwrap_or(1),
460                )
461                .await?;
462
463                // Re-verify the bucket exists. This upholds the INVARIANT in
464                // `classify_raw_create_response`: the raw path reports `Created`
465                // on an unparseable response, so this round-trip is what actually
466                // confirms the bucket — do not remove it.
467                timed(js.get_key_value(&config.name))
468                    .await?
469                    .map_err(|e| {
470                        error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
471                        KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
472                    })
473            }
474        }
475    }
476}
477
478#[async_trait]
479impl Connection for NatsConnection {
480    async fn connect(&self) -> Result<(), KvError> {
481        // Fast path: skip if already connected.
482        if self.healthy.load(Ordering::Acquire) {
483            return Ok(());
484        }
485
486        // A `from_client` connection borrows a caller-owned client and kept no
487        // URL or credentials, so it cannot redial. Refuse here with an actionable
488        // message instead of dialing the empty config URL (which fails with an
489        // opaque parse/connect error). This is reachable only after `shutdown()`
490        // cleared the fast-path flag above — a live borrowed client short-circuits
491        // there. The caller must construct a `NatsConnection::new(config)` if it
492        // needs reconnect semantics.
493        if self.state_probe.is_some() {
494            return Err(KvError::ConnectionFailed(
495                "connection was built via NatsConnection::from_client and cannot \
496                 reconnect (no URL or credentials retained); construct \
497                 NatsConnection::new(config) for a reconnectable connection"
498                    .to_string(),
499            ));
500        }
501
502        let (opts, dial_url) = build_connect_options(
503            &self.config.url,
504            self.config.creds.as_deref(),
505            self.config.creds_file.as_deref(),
506        )
507        .await
508        .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
509
510        // Drive `healthy` from the client's own connection events so it reflects
511        // reality through async-nats's transparent reconnects — without this the
512        // flag stays `true` straight through a NATS outage, and a readiness probe
513        // built on `is_healthy()` keeps routing traffic to a node that can't
514        // reach NATS.
515        //
516        // `installed` gates the callback: a caller that loses the connect race
517        // (see the double-check below) tears down its freshly built client, and
518        // that teardown fires `Disconnected`. Without the gate, the loser's drop
519        // would clobber the *winner's* `healthy` flag. Only the client we
520        // actually install ever flips `installed` to `true`, so the losers'
521        // callbacks are inert.
522        let installed = Arc::new(AtomicBool::new(false));
523        let cb_healthy = Arc::clone(&self.healthy);
524        let cb_installed = Arc::clone(&installed);
525        let opts = opts.event_callback(move |event| {
526            let cb_healthy = Arc::clone(&cb_healthy);
527            let cb_installed = Arc::clone(&cb_installed);
528            async move {
529                if !cb_installed.load(Ordering::Acquire) {
530                    return;
531                }
532                match event {
533                    async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
534                    async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
535                    _ => {}
536                }
537            }
538        });
539
540        let client = opts
541            .connect(dial_url)
542            .await
543            .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
544
545        let jetstream = async_nats::jetstream::new(client.clone());
546
547        let conn = NatsHandle { client, jetstream };
548
549        // Re-check under the write lock: a concurrent caller may have connected
550        // while we were awaiting the dial. If so, drop our freshly built handle
551        // (closing its connection) instead of replacing the live one, which would
552        // orphan a connection the first caller still believes is installed.
553        // Leaving `installed = false` keeps our about-to-drop client's teardown
554        // events from touching `healthy`.
555        let mut handle = self.handle.write().await;
556        if handle.is_some() {
557            return Ok(());
558        }
559        installed.store(true, Ordering::Release);
560        *handle = Some(conn);
561        self.healthy.store(true, Ordering::Release);
562
563        Ok(())
564    }
565
566    async fn shutdown(&self) -> Result<(), KvError> {
567        self.healthy.store(false, Ordering::Release);
568        *self.handle.write().await = None;
569        Ok(())
570    }
571
572    fn is_healthy(&self) -> bool {
573        // `healthy` is the shutdown gate for both construction paths: once
574        // `shutdown()` clears it the connection is down regardless of socket
575        // state, so check it first.
576        if !self.healthy.load(Ordering::Acquire) {
577            return false;
578        }
579        match &self.state_probe {
580            // `from_client`: no event callback could be installed, so consult the
581            // client's live connection state instead of a stale connect-time
582            // value. A dead or reconnecting socket reports Pending/Disconnected,
583            // so a readiness probe correctly sees the node as unhealthy. A
584            // borrowed client is never replaced (connect() refuses to reconnect
585            // it), so this probe never goes stale.
586            Some(client) => matches!(
587                client.connection_state(),
588                async_nats::connection::State::Connected
589            ),
590            // `new()` + `connect()`: `healthy` is kept current by the installed
591            // Connected/Disconnected event callback.
592            None => true,
593        }
594    }
595
596    async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
597        let config = StoreConfig {
598            name: name.to_string(),
599            ..Default::default()
600        };
601        self.store_with_config(config).await
602    }
603
604    async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
605        // Clone the client/jetstream out from under the read lock before the
606        // (up to 60s) bucket get-or-create. Holding the read guard across that
607        // await would block `shutdown()`'s `write().await` for the full
608        // duration, stalling graceful shutdown behind an in-flight store call.
609        let (client, js) = {
610            let conn = self.handle.read().await;
611            let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
612            (conn.client.clone(), conn.jetstream.clone())
613        };
614
615        let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
616
617        Ok(Arc::new(NatsKvStore {
618            name: config.name,
619            client,
620            js,
621            kv,
622        }))
623    }
624
625    fn capabilities(&self) -> ConnectionCapabilities {
626        ConnectionCapabilities {
627            streaming_watch: true,
628            prefix_watch: true,
629            // `KvTtl` is not implemented for the NATS backend yet (only `KvWriter`
630            // is vended by `writer()`), so advertising `ttl: true` would lead
631            // callers that branch on this flag down a path that can never
632            // succeed. Flip to `true` together with the `KvTtl` impl.
633            ttl: false,
634            cas: true,
635            transactions: false,
636            // 0 = unlimited from this layer's perspective: we impose no cap, but
637            // the NATS server still enforces its own max payload (~1MB by
638            // default). Callers that branch on this must not read 0 as "any size
639            // is safe" — an oversized value is rejected server-side at write time.
640            max_value_size: 0,
641            global_ordering: false,
642        }
643    }
644}
645
646struct NatsKvStore {
647    name: String,
648    kv: Store,
649    client: async_nats::Client,
650    js: async_nats::jetstream::Context,
651}
652
653impl KvStore for NatsKvStore {
654    fn name(&self) -> &str {
655        &self.name
656    }
657
658    fn reader(&self) -> Arc<dyn KvReader> {
659        Arc::new(NatsKvReader {
660            kv: self.kv.clone(),
661            client: self.client.clone(),
662            js: self.js.clone(),
663            bucket: self.name.clone(),
664        })
665    }
666
667    fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
668        Some(Arc::new(NatsKvWatcher {
669            kv: self.kv.clone(),
670            client: self.client.clone(),
671            js: self.js.clone(),
672            bucket: self.name.clone(),
673        }))
674    }
675
676    fn writer(&self) -> Option<Arc<dyn KvWriter>> {
677        Some(Arc::new(NatsKvWriterImpl {
678            kv: self.kv.clone(),
679        }))
680    }
681}
682
683struct NatsKvReader {
684    kv: Store,
685    client: async_nats::Client,
686    js: async_nats::jetstream::Context,
687    // The bucket name is known at construction (it's the store's name), so
688    // `consume_last_per_subject` builds its subject filters from this field
689    // instead of issuing a `kv.status()` round-trip per `scan()`/`keys()` call
690    // just to read it back from the server.
691    bucket: String,
692}
693
694#[async_trait]
695impl KvReader for NatsKvReader {
696    async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
697        // Empty value → treat as absent. This unifies a real stored `b""` and a
698        // `delete_with_version` tombstone (empty-value Put) under one "absent =
699        // None" contract, consistent with `scan()`/`keys()`. Callers needing
700        // zero-length semantics use `entry()`. See the `KvReader::get` trait doc.
701        match self.entry(key).await? {
702            Some(entry) if entry.value.is_empty() => Ok(None),
703            other => Ok(other),
704        }
705    }
706
707    async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
708        use async_nats::jetstream::kv::Operation;
709        // Use entry() instead of get() to access revision.
710        // Return Put entries even with empty values — delete_with_version
711        // writes empty bytes as a tombstone and callers need the version
712        // for CAS conflict detection. Only filter real Delete/Purge markers.
713        match timed(self.kv.entry(key)).await? {
714            Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
715                key: key.to_string(),
716                value: entry.value.to_vec(),
717                version: VersionToken::from_u64(entry.revision),
718            })),
719            Ok(Some(_)) => Ok(None), // Delete/Purge marker
720            Ok(None) => Ok(None),
721            Err(e) => Err(KvError::OperationFailed(e.to_string())),
722        }
723    }
724
725    async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
726        debug!(prefix = %prefix, "listing keys with prefix");
727
728        let mut keys = Vec::new();
729        self.consume_last_per_subject(prefix, true, |msg, key| {
730            // Skip both real KV deletes and CAS tombstones (empty-value Puts
731            // written by delete_with_version). get()/scan() hide the latter, so
732            // keys() must too — otherwise a list-then-get returns phantom keys.
733            // With headers_only the payload is stripped, but NATS adds a
734            // `Nats-Msg-Size` header we use to detect the empty value.
735            if !is_kv_delete(&msg) && !is_empty_value(&msg) {
736                keys.push(key);
737            }
738        })
739        .await?;
740
741        debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
742        Ok(keys)
743    }
744
745    async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
746        let mut entries = Vec::new();
747        self.consume_last_per_subject(prefix, false, |msg, key| {
748            if !is_kv_delete(&msg) && !msg.payload.is_empty() {
749                // The KV revision is the stream sequence, carried in the JetStream
750                // ACK subject (the message's reply subject). A revision of 0 means
751                // we couldn't parse it; callers treat that as "unknown version".
752                let revision = msg
753                    .reply
754                    .as_deref()
755                    .and_then(stream_sequence_from_ack)
756                    .unwrap_or(0);
757
758                entries.push(KvEntry {
759                    key,
760                    value: msg.payload.to_vec(),
761                    version: VersionToken::from_u64(revision),
762                });
763            }
764        })
765        .await?;
766
767        debug!(prefix = %prefix, entries = entries.len(), "scan complete");
768        Ok(entries)
769    }
770}
771
772/// Extract the stream sequence (== KV revision) from a JetStream ACK subject.
773///
774/// The ACK subject — delivered as a push message's reply subject — comes in two
775/// shapes, and the stream sequence sits at different offsets in each:
776///
777/// ```text
778/// legacy (9 tokens):  $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
779/// modern (11–12):     $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
780/// ```
781///
782/// The previous implementation took the *last* token, which is `num_pending`
783/// (typically 0 on the final delivery), not the sequence — corrupting the
784/// version on every scanned entry. We instead parse from the front, accounting
785/// for the optional `<domain>.<account>` prefix that modern servers prepend.
786fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
787    // The stream-seq field sits at index 5 (legacy) or 7 (modern), so we only
788    // ever read the first 8 tokens. Keep those in a stack array and count the
789    // remainder with the iterator — no heap `Vec`, which on a large `scan()`
790    // would be one allocation per delivered message.
791    let mut head = [""; 8];
792    let mut count = 0usize;
793    for (i, token) in reply.split('.').enumerate() {
794        if i < head.len() {
795            head[i] = token;
796        }
797        count += 1;
798    }
799    if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
800        return None;
801    }
802    // Legacy form has exactly 9 tokens with no domain/account; anything longer
803    // carries the two-token `<domain>.<account>` prefix, shifting fields right.
804    let stream_seq_idx = if count == 9 { 5 } else { 7 };
805    head[stream_seq_idx].parse::<u64>().ok()
806}
807
808/// Check if a NATS message represents a KV delete/purge operation.
809fn is_kv_delete(msg: &async_nats::Message) -> bool {
810    msg.headers
811        .as_ref()
812        .and_then(|h| h.get("KV-Operation"))
813        .is_some()
814}
815
816/// Check if a `headers_only` delivery carries an empty value (a CAS tombstone
817/// written by `delete_with_version`).
818///
819/// When a consumer is created with `headers_only`, NATS strips the body and adds
820/// a `Nats-Msg-Size` header with the original payload length. Size 0 means the
821/// stored value is empty, which `get()`/`scan()` treat as absent. Messages
822/// without the header (e.g. non-`headers_only` deliveries) are not classified as
823/// empty here — callers on that path inspect the payload directly instead.
824fn is_empty_value(msg: &async_nats::Message) -> bool {
825    msg.headers
826        .as_ref()
827        .and_then(|h| h.get("Nats-Msg-Size"))
828        .map(|v| v.as_str() == "0")
829        .unwrap_or(false)
830}
831
832impl NatsKvReader {
833    /// Subscribe to last-per-subject messages for a KV prefix, calling `on_msg`
834    /// for each delivered message. Handles the subscribe-first race workaround,
835    /// consumer lifecycle, and cleanup.
836    async fn consume_last_per_subject(
837        &self,
838        prefix: &str,
839        headers_only: bool,
840        mut on_msg: impl FnMut(async_nats::Message, String),
841    ) -> Result<(), KvError> {
842        use async_nats::jetstream::consumer::push;
843        use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
844
845        // The bucket name is known at construction, so the subject filters are
846        // built directly from `self.bucket` — no `kv.status()` round-trip just
847        // to read it back. Every *remaining* setup await below is still bounded
848        // by `timed()`: a half-dead NATS connection (CLOSE_WAIT) would otherwise
849        // park here before the per-message drain timer downstream ever starts,
850        // hanging scan()/keys() indefinitely — the same failure `timed()` guards
851        // on the write path.
852        let bucket = self.bucket.as_str();
853
854        let nats_filter = if prefix.is_empty() {
855            format!("$KV.{bucket}.>")
856        } else {
857            format!("$KV.{bucket}.{prefix}>")
858        };
859
860        // Work around async-nats <=0.46 subscribe-after-create race:
861        // subscribe to the inbox FIRST, then create the consumer.
862        let inbox = self.client.new_inbox();
863        let mut sub = timed(self.client.subscribe(inbox.clone()))
864            .await?
865            .map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
866
867        let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
868            .await?
869            .map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
870
871        let consumer = timed(stream.create_consumer(push::Config {
872            deliver_subject: inbox,
873            deliver_policy: DeliverPolicy::LastPerSubject,
874            filter_subject: nats_filter,
875            headers_only,
876            // This is a one-shot point-in-time drain — we never ack. Under
877            // the default `AckPolicy::Explicit`, JetStream stops delivering
878            // once `max_ack_pending` (default 1000) messages sit unacked,
879            // which would silently truncate scan()/keys() to the first ~1000
880            // keys (or stall waiting for deliveries that never come) on any
881            // larger bucket. `None` removes the ack-pending gate entirely.
882            ack_policy: AckPolicy::None,
883            // Safety net for the best-effort `delete_consumer` below: if that
884            // cleanup times out on a half-dead connection, JetStream still reaps
885            // this consumer after `CONSUMER_INACTIVE_THRESHOLD` of inactivity, so
886            // repeated timed-out scans can't pile orphaned consumers up against
887            // the per-stream limit.
888            inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
889            ..Default::default()
890        }))
891        .await?
892        .map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
893
894        let num_pending = consumer.cached_info().num_pending;
895
896        // Drain exactly `num_pending` messages, but bound each await: a half-dead
897        // connection (CLOSE_WAIT) would otherwise park this loop forever, the same
898        // failure `timed()` guards on the write path. On timeout we still fall
899        // through to consumer cleanup, then surface `Timeout`.
900        let mut timed_out = false;
901        if num_pending > 0 {
902            let mut delivered = 0u64;
903            let kv_prefix = format!("$KV.{bucket}.");
904
905            while delivered < num_pending {
906                match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
907                    Ok(Some(msg)) => {
908                        let key = msg
909                            .subject
910                            .strip_prefix(&kv_prefix)
911                            .unwrap_or(msg.subject.as_str())
912                            .to_string();
913
914                        on_msg(msg, key);
915                        delivered += 1;
916                    }
917                    Ok(None) => break, // subscription closed early
918                    Err(_) => {
919                        timed_out = true;
920                        break;
921                    }
922                }
923            }
924        }
925
926        // Clean up ephemeral consumer (best-effort), even on timeout — a stalled
927        // scan shouldn't also leak a server-side consumer. A leaked consumer
928        // lingers on the server and counts against per-stream limits, so surface
929        // failures in observability without failing the operation. Bound the
930        // delete with `timed()`: on the same half-dead (CLOSE_WAIT) connection
931        // that tripped the drain timeout above, an unbounded delete would re-park
932        // here forever, defeating the timeout recovery we just performed.
933        match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
934            Ok(Ok(_)) => {}
935            Ok(Err(e)) => {
936                // `warn!`, not `debug!`: a leaked ephemeral consumer lingers on
937                // the server and counts against per-stream limits. Under a flaky
938                // NATS connection every scan()/keys() leaks one, so this must be
939                // visible in default spans before the pile-up hits the limit.
940                warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
941            }
942            Err(_) => {
943                warn!("timed out deleting ephemeral consumer (best-effort)");
944            }
945        }
946
947        if timed_out {
948            return Err(KvError::Timeout);
949        }
950        Ok(())
951    }
952}
953
954/// Convert a NATS KV entry to a KvUpdate.
955///
956/// Takes the entry by value so the key `String` moves into the `KvUpdate`
957/// instead of allocating a fresh copy per watch event.
958fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
959    use async_nats::jetstream::kv::Operation;
960    let version = VersionToken::from_u64(entry.revision);
961    match entry.operation {
962        Operation::Put => KvUpdate::Put(KvEntry {
963            key: entry.key,
964            value: entry.value.to_vec(),
965            version,
966        }),
967        Operation::Delete => KvUpdate::Delete {
968            key: entry.key,
969            version,
970        },
971        Operation::Purge => KvUpdate::Purge {
972            key: entry.key,
973            version,
974        },
975    }
976}
977
978/// Stream updates from a NATS Watch into a channel until it ends or the receiver drops.
979async fn stream_watch(
980    mut watcher: async_nats::jetstream::kv::Watch,
981    tx: &Sender<KvUpdate>,
982) -> Result<(), KvError> {
983    while let Some(entry) = watcher.next().await {
984        match entry {
985            Ok(entry) => {
986                let update = nats_entry_to_kv_update(entry);
987                if tx.send(update).await.is_err() {
988                    debug!("watch receiver closed");
989                    break;
990                }
991            }
992            Err(e) => {
993                error!(error = %e, "NATS KV watch error");
994                return Err(KvError::WatchError(e.to_string()));
995            }
996        }
997    }
998    Ok(())
999}
1000
1001/// Cadence of the floor guard's no-traffic backstop probe (one stream-info
1002/// RPC per interval per guarded watch). The PRIMARY detection is in-band —
1003/// the gapped-delivery check fires the moment evidence surfaces — so this
1004/// interval only bounds detection latency when NOTHING is being delivered;
1005/// it is not load-bearing for eventual detection.
1006const FLOOR_GUARD_INTERVAL: Duration = Duration::from_secs(30);
1007
1008/// [`stream_watch`] for the dense ALL-scope resume path, with the LIVE
1009/// retention floor guard (`tests/model_live_watch.rs` — the live twin of
1010/// [`NatsKvWatcher::check_resume_window`]).
1011///
1012/// The hazard: retention overrunning a live consumer makes JetStream
1013/// silently skip evicted messages — delete markers included — with no error
1014/// anywhere (the same clamp behavior as resumes, mid-stream). Unguarded,
1015/// that is PERMANENT silent fold divergence; the model proves it reachable.
1016///
1017/// Detection is primarily **in-band**: an unfiltered `ByStartSequence`
1018/// consumer sees every retained message, so a delivered revision that jumps
1019/// the frontier by more than one is evidence of eviction inside the gap.
1020/// The model checker REJECTED a periodic-only design with exactly the trace
1021/// this closes — deliveries can catch the frontier up past the gap between
1022/// probes, erasing the evidence — so the check runs AT the gapped delivery,
1023/// before the entry is processed: fetch `first_sequence` and apply the
1024/// shared kernel (`protocol::resume_window_ok`) to the frontier. A benign
1025/// gap (interior per-subject eviction with the floor still at or below the
1026/// frontier) passes; head eviction past the frontier fails the watch, and
1027/// the caller's restart routes into the verified resume → `CursorExpired` →
1028/// resync repair path. The periodic probe backstops the no-traffic case.
1029///
1030/// Scope: sound only where density holds — the unfiltered resume watch.
1031/// Prefix-scoped watches deliver sparse revisions by design and cannot
1032/// distinguish benign from hazardous eviction client-side; they retain the
1033/// (narrowed) retention-outlives-lag operating axiom plus the resume-time
1034/// check on every restart (model axiom 5).
1035///
1036/// The guarantee split, precisely: the SAFETY half — never folding past
1037/// unexamined evidence of loss — is unconditional in this loop (the gap
1038/// check precedes processing, and a stalled downstream stalls folding too).
1039/// The REPAIR half is conditional on the caller restarting the failed watch
1040/// (standard supervision; same posture as the resync fail-stop): a trip
1041/// with no restart is a loudly dead watch, never a silently wrong one.
1042async fn stream_watch_floor_guarded(
1043    mut watcher: async_nats::jetstream::kv::Watch,
1044    tx: &Sender<KvUpdate>,
1045    resume_revision: u64,
1046    js: &async_nats::jetstream::Context,
1047    bucket: &str,
1048) -> Result<(), KvError> {
1049    let stream_name = format!("KV_{bucket}");
1050    let first_sequence = || async {
1051        let stream = timed(js.get_stream(&stream_name))
1052            .await?
1053            .map_err(|e| KvError::OperationFailed(format!("floor guard stream lookup: {e}")))?;
1054        Ok::<u64, KvError>(stream.cached_info().state.first_sequence)
1055    };
1056    fn trip(frontier: u64, first: u64, bucket: &str) -> KvError {
1057        warn!(
1058            frontier,
1059            first_sequence = first,
1060            bucket,
1061            "stream retention overran this live watch; failing so the restart can resync \
1062             (messages in the gap were evicted unseen)"
1063        );
1064        KvError::WatchError(format!(
1065            "stream retention overran live watch (first_sequence {first} > delivered \
1066             frontier {frontier} + 1); restart will resync"
1067        ))
1068    }
1069
1070    let mut frontier = resume_revision;
1071    let mut backstop = tokio::time::interval(FLOOR_GUARD_INTERVAL);
1072    backstop.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1073    backstop.tick().await; // consume the immediate first tick
1074
1075    loop {
1076        tokio::select! {
1077            entry = watcher.next() => {
1078                let Some(entry) = entry else { break };
1079                match entry {
1080                    Ok(entry) => {
1081                        let revision = entry.revision;
1082                        // In-band gap check BEFORE processing: never fold
1083                        // past unexamined evidence of eviction.
1084                        if revision > frontier.saturating_add(1) {
1085                            let first = first_sequence().await?;
1086                            if !crate::protocol::resume_window_ok(frontier, first) {
1087                                return Err(trip(frontier, first, bucket));
1088                            }
1089                            // Benign interior gap: every evicted revision
1090                            // below a still-low floor was a per-subject
1091                            // overwrite, whose later revision the fold will
1092                            // see — safe for last-write-wins.
1093                        }
1094                        frontier = frontier.max(revision);
1095                        let update = nats_entry_to_kv_update(entry);
1096                        if tx.send(update).await.is_err() {
1097                            debug!("watch receiver closed");
1098                            break;
1099                        }
1100                    }
1101                    Err(e) => {
1102                        error!(error = %e, "NATS KV watch error");
1103                        return Err(KvError::WatchError(e.to_string()));
1104                    }
1105                }
1106            }
1107            _ = backstop.tick() => {
1108                // No-traffic backstop: nothing is being delivered, so the
1109                // in-band check has no evidence to act on; probe the floor
1110                // directly.
1111                let first = first_sequence().await?;
1112                if !crate::protocol::resume_window_ok(frontier, first) {
1113                    return Err(trip(frontier, first, bucket));
1114                }
1115            }
1116        }
1117    }
1118    Ok(())
1119}
1120
1121/// Check if a NATS watch error indicates the requested start sequence is
1122/// too old (compacted), meaning callers should fall back to a full watch.
1123///
1124/// SECOND line of defense only: live nats-server (2.14) does not error on a
1125/// below-head start sequence at all — it silently clamps to the first
1126/// retained message (pinned by `tests/resync.rs`), so the PRIMARY expiry
1127/// detection is [`NatsKvWatcher::check_resume_window`]'s proactive
1128/// `first_sequence` comparison. This matcher remains for server versions or
1129/// paths that do error, where mapping to [`KvError::CursorExpired`] keeps
1130/// the fallback reachable instead of stranding the caller.
1131///
1132/// async-nats has no granular error kind for this: `WatchErrorKind` is only
1133/// `InvalidKey`/`TimedOut`/`ConsumerCreate`/`Other`, and "start sequence too old"
1134/// arrives as `ConsumerCreate`/`Other` with the real reason buried in the source
1135/// error's *message*. So we substring-match the full error string — which already
1136/// includes the source, since `Error`'s `Display` renders `"{kind}: {source}"`.
1137///
1138/// Two deliberate choices make this robust to wording drift:
1139/// - We lowercase first, so a capitalization change in NATS/async-nats can't slip
1140///   past.
1141/// - Detection is biased toward `true`. A false positive only costs an
1142///   unnecessary (but always-safe) full `watch_all()` replay; a false negative
1143///   propagates `WatchError` and strands a caller that would otherwise recover.
1144///
1145/// If these messages ever change, `cursor_expired_matches_known_nats_error_strings`
1146/// is the canary that fails loudly on the next dependency bump.
1147fn is_cursor_expired_error(err: &str) -> bool {
1148    use std::sync::OnceLock;
1149    // One Aho-Corasick automaton over all needles: a single pass over the error
1150    // string regardless of how many needles accumulate as NATS versions reword
1151    // their messages, vs. one `windows()` scan per needle. Case-insensitivity is
1152    // baked into the automaton, so no lowercased copy is allocated either.
1153    static MATCHER: OnceLock<aho_corasick::AhoCorasick> = OnceLock::new();
1154    MATCHER
1155        .get_or_init(|| {
1156            aho_corasick::AhoCorasick::builder()
1157                .ascii_case_insensitive(true)
1158                .build([
1159                    "start sequence",
1160                    "first sequence",
1161                    "sequence not found",
1162                    "too old",
1163                ])
1164                .expect("static needle set always compiles")
1165        })
1166        .is_match(err)
1167}
1168
1169struct NatsKvWatcher {
1170    kv: Store,
1171    // `watch_prefixes_from` has no async-nats equivalent (there is no
1172    // `watch_many_from_revision`), so it hand-builds the multi-filter ordered
1173    // consumer itself — which needs the raw client (inbox allocation), the
1174    // JetStream context (stream lookup), and the bucket name (subject filters),
1175    // same as the reader's scan path.
1176    client: async_nats::Client,
1177    js: async_nats::jetstream::Context,
1178    bucket: String,
1179}
1180
1181/// Decode a raw KV stream message (as delivered by a hand-built ordered push
1182/// consumer) into a [`KvUpdate`] — the same mapping `async-nats`'s `kv::Watch`
1183/// performs internally for the `watch_*` paths: key from the subject (stripping
1184/// the `$KV.{bucket}.` prefix), operation from the `KV-Operation` header
1185/// (absent = Put), revision from the stream sequence in the ACK reply subject.
1186///
1187/// Returns `None` for a subject outside the bucket's keyspace, which a
1188/// subject-filtered consumer should never deliver — skipped rather than
1189/// surfaced, matching `kv::Watch`'s behavior.
1190fn kv_message_to_update(msg: &async_nats::Message, kv_prefix: &str) -> Option<KvUpdate> {
1191    let key = msg.subject.strip_prefix(kv_prefix)?.to_string();
1192    let revision = msg
1193        .reply
1194        .as_deref()
1195        .and_then(stream_sequence_from_ack)
1196        .unwrap_or(0);
1197    let version = VersionToken::from_u64(revision);
1198    let operation = msg
1199        .headers
1200        .as_ref()
1201        .and_then(|h| h.get("KV-Operation"))
1202        .map(|v| v.as_str());
1203    Some(match operation {
1204        Some("DEL") => KvUpdate::Delete { key, version },
1205        Some("PURGE") => KvUpdate::Purge { key, version },
1206        // No header (or an explicit "PUT") is a put — the common case carries
1207        // no KV-Operation header at all.
1208        _ => KvUpdate::Put(KvEntry {
1209            key,
1210            value: msg.payload.to_vec(),
1211            version,
1212        }),
1213    })
1214}
1215
1216impl NatsKvWatcher {
1217    /// Proactive cursor-expiry detection, REQUIRED before any `*_from` resume.
1218    ///
1219    /// NATS does **not** error when an ordered consumer's `ByStartSequence`
1220    /// falls below the stream's first retained sequence — it silently delivers
1221    /// from the first available message (pinned against a live nats-server by
1222    /// `tests/resync.rs::nats_silently_clamps_resume_below_first_seq`). A
1223    /// silent clamp skips the gap's evicted messages — delete markers
1224    /// included — without ever taking the `CursorExpired` → resync path, so
1225    /// expiry MUST be detected by comparing the stream's `first_sequence`
1226    /// against the resume point before trusting the consumer. The
1227    /// error-string matching at the consumer-create sites stays as a second
1228    /// line of defense for server versions that do error.
1229    ///
1230    /// Why `first_sequence` is the right boundary: interior (per-subject
1231    /// history) eviction inside the gap is safe for a last-write-wins fold —
1232    /// an overwrite-evicted revision implies a LATER revision of the same
1233    /// subject exists and will be delivered. Lost *deletes* come from head
1234    /// eviction (stream limits/age), which is exactly what advances
1235    /// `first_sequence`. (An admin interior purge of a subject can also
1236    /// destroy a delete marker without moving the head — that is a manual
1237    /// destructive operation, same trust class as deleting the stream.)
1238    ///
1239    /// Head eviction racing the window between this check and consumer
1240    /// creation is the same exposure any live consumer has against
1241    /// aggressive retention; the check bounds the silent gap to that
1242    /// milliseconds-scale window, where the prior behavior left it unbounded.
1243    async fn check_resume_window(&self, revision: u64) -> Result<(), KvError> {
1244        let stream = timed(self.js.get_stream(format!("KV_{}", self.bucket)))
1245            .await?
1246            .map_err(|e| {
1247                KvError::OperationFailed(format!("get KV stream for resume check: {e}"))
1248            })?;
1249        let first = stream.cached_info().state.first_sequence;
1250        // The shared protocol kernel — the same guard the model checker's
1251        // Resume transition executes (`crate::protocol::resume_window_ok`).
1252        if !crate::protocol::resume_window_ok(revision, first) {
1253            warn!(
1254                revision,
1255                first_sequence = first,
1256                "resume cursor is below the stream's first retained sequence; cursor expired"
1257            );
1258            return Err(KvError::CursorExpired);
1259        }
1260        Ok(())
1261    }
1262}
1263
1264#[async_trait]
1265impl KvWatcher for NatsKvWatcher {
1266    async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1267        // `watch_with_history` (DeliverPolicy::LastPerSubject), NOT `watch_all`
1268        // (DeliverPolicy::New): the trait contract is state-sync — current value
1269        // of every key first, then live updates. async-nats's `watch_all` only
1270        // delivers messages published AFTER the consumer exists, which would
1271        // leave a no-cursor consumer empty until keys happen to change.
1272        //
1273        // Bound the watch *setup* with `timed()` for the same reason every KV op
1274        // is bounded: a half-dead (CLOSE_WAIT) NATS connection parks this await
1275        // forever instead of failing. The streaming drain in `stream_watch` is
1276        // intentionally unbounded (a watch is long-lived), but establishing it
1277        // must not be able to hang a reconnecting caller.
1278        let watcher = timed(self.kv.watch_with_history(">"))
1279            .await?
1280            .map_err(|e| KvError::WatchError(e.to_string()))?;
1281        stream_watch(watcher, &tx).await
1282    }
1283
1284    async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1285        // Use native NATS subject-based filtering. KV key "node.abc" maps to
1286        // subject "$KV.BUCKET.node.abc", and ">" is the multi-level wildcard.
1287        // `_with_history` for the same state-sync contract as `watch_all`.
1288        let nats_key = format!("{prefix}>");
1289        let watcher = timed(self.kv.watch_with_history(&nats_key))
1290            .await?
1291            .map_err(|e| KvError::WatchError(e.to_string()))?;
1292        stream_watch(watcher, &tx).await
1293    }
1294
1295    async fn watch_prefixes(&self, prefixes: &[&str], tx: Sender<KvUpdate>) -> Result<(), KvError> {
1296        if prefixes.is_empty() {
1297            // Nothing to watch. Critically, do NOT fall through to `watch_many`
1298            // with an empty filter set — an unfiltered ordered consumer would
1299            // watch the WHOLE bucket, the opposite of a scoped watch.
1300            return Ok(());
1301        }
1302        // ONE multi-filter consumer for every prefix (NATS 2.10 `filter_subjects`)
1303        // rather than one consumer per prefix. `watch_many_with_history` builds a
1304        // single ordered push consumer with `filter_subjects = [{p}> ...]` and
1305        // yields the same `Entry` stream as `watch`, so `stream_watch` is reused
1306        // verbatim. This is the per-stream-consumer-count fix: a node scoped to N
1307        // prefixes costs 1 consumer, not N. `_with_history` for the same
1308        // state-sync contract as `watch_all`.
1309        let keys: Vec<String> = prefixes.iter().map(|p| format!("{p}>")).collect();
1310        let watcher = timed(self.kv.watch_many_with_history(keys))
1311            .await?
1312            .map_err(|e| KvError::WatchError(e.to_string()))?;
1313        stream_watch(watcher, &tx).await
1314    }
1315
1316    async fn watch_all_from(
1317        &self,
1318        cursor: &WatchCursor,
1319        tx: Sender<KvUpdate>,
1320    ) -> Result<(), KvError> {
1321        let revision = match cursor.as_u64() {
1322            Some(rev) if rev > 0 => rev,
1323            _ => return self.watch_all(tx).await,
1324        };
1325        self.check_resume_window(revision).await?;
1326
1327        let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
1328            Ok(w) => w,
1329            Err(e) => {
1330                let err_str = e.to_string();
1331                if is_cursor_expired_error(&err_str) {
1332                    warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
1333                    return Err(KvError::CursorExpired);
1334                }
1335                return Err(KvError::WatchError(err_str));
1336            }
1337        };
1338        // Re-check AFTER the consumer exists: head eviction in the window
1339        // between the pre-flight check and consumer creation would otherwise
1340        // clamp silently.
1341        self.check_resume_window(revision).await?;
1342
1343        info!(revision, "resumed watch from cursor");
1344        // The LIVE floor guard takes over from here: in-band gapped-delivery
1345        // checks plus a no-traffic backstop, so retention overrunning this
1346        // watch mid-stream fail-stops into the restart→resync repair path
1347        // instead of silently skipping evicted deletes (model:
1348        // tests/model_live_watch.rs).
1349        stream_watch_floor_guarded(watcher, &tx, revision, &self.js, &self.bucket).await
1350    }
1351
1352    async fn watch_prefix_from(
1353        &self,
1354        prefix: &str,
1355        cursor: &WatchCursor,
1356        tx: Sender<KvUpdate>,
1357    ) -> Result<(), KvError> {
1358        let revision = match cursor.as_u64() {
1359            Some(rev) if rev > 0 => rev,
1360            _ => return self.watch_prefix(prefix, tx).await,
1361        };
1362        self.check_resume_window(revision).await?;
1363
1364        let nats_key = format!("{prefix}>");
1365        let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
1366            Ok(w) => w,
1367            Err(e) => {
1368                let err_str = e.to_string();
1369                if is_cursor_expired_error(&err_str) {
1370                    warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
1371                    return Err(KvError::CursorExpired);
1372                }
1373                return Err(KvError::WatchError(err_str));
1374            }
1375        };
1376        // Same post-create re-check as watch_all_from: close the
1377        // check→create eviction window.
1378        self.check_resume_window(revision).await?;
1379
1380        info!(revision, prefix, "resumed prefix watch from cursor");
1381        stream_watch(watcher, &tx).await
1382    }
1383
1384    async fn watch_prefixes_from(
1385        &self,
1386        prefixes: &[&str],
1387        cursor: &WatchCursor,
1388        tx: Sender<KvUpdate>,
1389    ) -> Result<(), KvError> {
1390        use async_nats::jetstream::consumer::{DeliverPolicy, ReplayPolicy, push};
1391
1392        if prefixes.is_empty() {
1393            // Same guard as watch_prefixes: an empty filter set must not become
1394            // an unfiltered whole-bucket consumer.
1395            return Ok(());
1396        }
1397        let revision = match cursor.as_u64() {
1398            Some(rev) if rev > 0 => rev,
1399            _ => return self.watch_prefixes(prefixes, tx).await,
1400        };
1401
1402        // async-nats has `watch_many` (multi-filter) and `watch_from_revision`
1403        // (seek) but no combination of the two, so build the multi-filter
1404        // ordered push consumer ourselves — the exact consumer
1405        // `watch_many_with_deliver_policy` would build, with
1406        // `ByStartSequence(cursor+1)` for the delta seek. The ordered-consumer
1407        // machinery (gap detection, auto-recreate from the last delivered
1408        // sequence) comes with `OrderedConfig` for free.
1409        let bucket = self.bucket.as_str();
1410        let kv_prefix = format!("$KV.{bucket}.");
1411        let filter_subjects: Vec<String> = prefixes
1412            .iter()
1413            .map(|p| format!("{kv_prefix}{p}>"))
1414            .collect();
1415
1416        let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
1417            .await?
1418            .map_err(|e| KvError::WatchError(format!("get KV stream: {e}")))?;
1419
1420        // Same proactive expiry detection as `check_resume_window` (NATS
1421        // silently clamps a below-head ByStartSequence; see that method's
1422        // docs) — checked on the stream handle this path already fetched,
1423        // via the shared protocol kernel.
1424        let first = stream.cached_info().state.first_sequence;
1425        if !crate::protocol::resume_window_ok(revision, first) {
1426            warn!(
1427                revision,
1428                first_sequence = first,
1429                ?prefixes,
1430                "resume cursor is below the stream's first retained sequence; cursor expired"
1431            );
1432            return Err(KvError::CursorExpired);
1433        }
1434
1435        let consumer = match timed(stream.create_consumer(push::OrderedConfig {
1436            deliver_subject: self.client.new_inbox(),
1437            description: Some("kv multi-prefix resume consumer".to_string()),
1438            filter_subjects,
1439            replay_policy: ReplayPolicy::Instant,
1440            deliver_policy: DeliverPolicy::ByStartSequence {
1441                start_sequence: revision + 1,
1442            },
1443            ..Default::default()
1444        }))
1445        .await?
1446        {
1447            Ok(c) => c,
1448            Err(e) => {
1449                // Same expiry classification as watch_all_from: a start sequence
1450                // the stream has compacted past surfaces as a consumer-create
1451                // error whose message names the sequence problem.
1452                let err_str = e.to_string();
1453                if is_cursor_expired_error(&err_str) {
1454                    warn!(revision, ?prefixes, error = %err_str, "cursor expired for multi-prefix watch, caller should fall back");
1455                    return Err(KvError::CursorExpired);
1456                }
1457                return Err(KvError::WatchError(err_str));
1458            }
1459        };
1460
1461        // Re-check AFTER the consumer exists (fresh stream info, not the
1462        // handle's cached copy): closes the check→create eviction window,
1463        // same as the single-filter resume paths.
1464        self.check_resume_window(revision).await?;
1465
1466        let mut messages = timed(consumer.messages())
1467            .await?
1468            .map_err(|e| KvError::WatchError(e.to_string()))?;
1469
1470        info!(
1471            revision,
1472            ?prefixes,
1473            "resumed multi-prefix watch from cursor"
1474        );
1475        while let Some(msg) = messages.next().await {
1476            match msg {
1477                Ok(msg) => {
1478                    // A subject-filtered consumer only delivers in-keyspace
1479                    // subjects; `None` here would be a server bug, skipped to
1480                    // match kv::Watch's tolerance.
1481                    let Some(update) = kv_message_to_update(&msg, &kv_prefix) else {
1482                        continue;
1483                    };
1484                    if tx.send(update).await.is_err() {
1485                        debug!("watch receiver closed");
1486                        break;
1487                    }
1488                }
1489                Err(e) => {
1490                    error!(error = %e, "NATS KV multi-prefix watch error");
1491                    return Err(KvError::WatchError(e.to_string()));
1492                }
1493            }
1494        }
1495        Ok(())
1496    }
1497}
1498
1499struct NatsKvWriterImpl {
1500    kv: Store,
1501}
1502
1503#[async_trait]
1504impl KvWriter for NatsKvWriterImpl {
1505    async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1506        let rev = timed(self.kv.put(key, value.to_vec().into()))
1507            .await?
1508            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1509        Ok(VersionToken::from_u64(rev))
1510    }
1511
1512    async fn delete(&self, key: &str) -> Result<bool, KvError> {
1513        // NATS delete doesn't tell us if key existed, so we always return true
1514        timed(self.kv.delete(key))
1515            .await?
1516            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1517        Ok(true)
1518    }
1519
1520    async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1521        use async_nats::jetstream::kv::CreateErrorKind;
1522        timed(self.kv.create(key, value.to_vec().into()))
1523            .await?
1524            .map(VersionToken::from_u64)
1525            .map_err(|e| {
1526                if e.kind() == CreateErrorKind::AlreadyExists {
1527                    KvError::AlreadyExists
1528                } else {
1529                    KvError::OperationFailed(e.to_string())
1530                }
1531            })
1532    }
1533
1534    async fn update(
1535        &self,
1536        key: &str,
1537        value: &[u8],
1538        expected: &VersionToken,
1539    ) -> Result<VersionToken, KvError> {
1540        use async_nats::jetstream::kv::UpdateErrorKind;
1541        let rev = expected.as_u64().ok_or_else(|| {
1542            KvError::OperationFailed("invalid version token for NATS update".into())
1543        })?;
1544        timed(self.kv.update(key, value.to_vec().into(), rev))
1545            .await?
1546            .map(VersionToken::from_u64)
1547            .map_err(|e| {
1548                if e.kind() == UpdateErrorKind::WrongLastRevision {
1549                    KvError::RevisionMismatch
1550                } else {
1551                    KvError::OperationFailed(e.to_string())
1552                }
1553            })
1554    }
1555
1556    async fn delete_with_version(
1557        &self,
1558        key: &str,
1559        expected: &VersionToken,
1560    ) -> Result<bool, KvError> {
1561        use async_nats::jetstream::kv::UpdateErrorKind;
1562        let rev = expected.as_u64().ok_or_else(|| {
1563            KvError::OperationFailed("invalid version token for NATS delete".into())
1564        })?;
1565        // Write empty value with CAS — logically deletes while preserving conflict detection
1566        timed(self.kv.update(key, Vec::new().into(), rev))
1567            .await?
1568            .map(|_| true)
1569            .map_err(|e| {
1570                if e.kind() == UpdateErrorKind::WrongLastRevision {
1571                    KvError::RevisionMismatch
1572                } else {
1573                    KvError::OperationFailed(e.to_string())
1574                }
1575            })
1576    }
1577}
1578
1579impl std::fmt::Debug for NatsConnection {
1580    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1581        f.debug_struct("NatsConnection")
1582            .field("url", &self.config.url)
1583            // `Acquire` to match every other read of `healthy` — a `Relaxed`
1584            // outlier here reads like a deliberate exception during an atomics
1585            // audit, and the fmt path is far too cold for the ordering to cost
1586            // anything.
1587            .field("healthy", &self.healthy.load(Ordering::Acquire))
1588            .finish()
1589    }
1590}
1591
1592#[cfg(test)]
1593mod tests {
1594    use super::*;
1595
1596    #[test]
1597    fn raw_create_success_has_no_error() {
1598        // A successful STREAM.CREATE echoes back the stream config, no "error".
1599        let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
1600        assert_eq!(
1601            classify_raw_create_response(payload),
1602            RawCreateOutcome::Created
1603        );
1604    }
1605
1606    #[test]
1607    fn raw_create_swallows_stream_already_exists() {
1608        // 10058 = stream name already in use → the bucket already exists, OK.
1609        let payload =
1610            br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
1611        assert_eq!(
1612            classify_raw_create_response(payload),
1613            RawCreateOutcome::AlreadyExists
1614        );
1615    }
1616
1617    #[test]
1618    fn raw_create_swallows_stream_limit() {
1619        // Synadia Cloud returns 400 + "maximum number of streams" at the limit,
1620        // but the bucket may already exist — treat as non-fatal.
1621        let payload =
1622            br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
1623        assert_eq!(
1624            classify_raw_create_response(payload),
1625            RawCreateOutcome::StreamLimit
1626        );
1627    }
1628
1629    #[test]
1630    fn raw_create_propagates_unknown_error() {
1631        // Any other JetStream error is fatal and must surface code + description.
1632        let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
1633        match classify_raw_create_response(payload) {
1634            RawCreateOutcome::Failed { code, description } => {
1635                assert_eq!(code, 403);
1636                assert_eq!(description, "insufficient permissions");
1637            }
1638            other => panic!("expected Failed, got {other:?}"),
1639        }
1640    }
1641
1642    #[test]
1643    fn raw_create_400_without_stream_limit_is_fatal() {
1644        // A bare 400 that isn't the stream-limit message must NOT be swallowed,
1645        // otherwise a genuine bad-config rejection would masquerade as success.
1646        let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
1647        match classify_raw_create_response(payload) {
1648            RawCreateOutcome::Failed { code, description } => {
1649                assert_eq!(code, 400);
1650                assert!(description.contains("invalid stream config"));
1651            }
1652            other => panic!("expected Failed, got {other:?}"),
1653        }
1654    }
1655
1656    #[test]
1657    fn raw_create_unparseable_payload_is_treated_as_success() {
1658        // The caller re-verifies with get_key_value, so a garbled body must not
1659        // be reported as a hard failure here.
1660        assert_eq!(
1661            classify_raw_create_response(b"not json at all"),
1662            RawCreateOutcome::Created
1663        );
1664    }
1665
1666    #[test]
1667    fn ack_subject_legacy_format() {
1668        // $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1669        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1670        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1671    }
1672
1673    #[test]
1674    fn ack_subject_modern_format_with_domain_and_account() {
1675        // $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1676        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
1677        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1678    }
1679
1680    #[test]
1681    fn ack_subject_modern_format_with_trailing_token() {
1682        // Some servers append a random trailing token (12 tokens total).
1683        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
1684        assert_eq!(stream_sequence_from_ack(reply), Some(99));
1685    }
1686
1687    #[test]
1688    fn ack_subject_last_token_is_not_the_sequence() {
1689        // Regression guard: the final token is num_pending, never the sequence.
1690        // The old code returned this (0), corrupting every scanned entry's version.
1691        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1692        assert_ne!(stream_sequence_from_ack(reply), Some(0));
1693    }
1694
1695    #[test]
1696    fn ack_subject_rejects_garbage() {
1697        assert_eq!(stream_sequence_from_ack(""), None);
1698        assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
1699        assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
1700        // Right shape, non-numeric sequence field.
1701        assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
1702    }
1703
1704    #[test]
1705    fn cursor_expired_matches_known_nats_error_strings() {
1706        // These substrings come from async-nats error messages. If the library
1707        // rewrites them, watch_all_from would return WatchError instead of
1708        // CursorExpired, breaking callers that fall back to watch_all() on expiry.
1709        assert!(is_cursor_expired_error(
1710            "consumer start sequence is too old"
1711        ));
1712        assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
1713        assert!(is_cursor_expired_error("sequence not found in stream"));
1714        // "too old" on its own (no "sequence" wording) must still be caught.
1715        assert!(is_cursor_expired_error("requested revision is too old"));
1716        // Case-insensitive: a capitalization change upstream must not slip past.
1717        assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
1718        assert!(!is_cursor_expired_error("connection refused"));
1719        assert!(!is_cursor_expired_error("permission denied"));
1720        assert!(!is_cursor_expired_error("stream not found"));
1721    }
1722
1723    fn raw_kv_msg(
1724        subject: &str,
1725        reply: Option<&str>,
1726        payload: &[u8],
1727        op: Option<&str>,
1728    ) -> async_nats::Message {
1729        let headers = op.map(|op| {
1730            let mut h = async_nats::HeaderMap::new();
1731            h.insert("KV-Operation", op);
1732            h
1733        });
1734        async_nats::Message {
1735            subject: subject.to_string().into(),
1736            reply: reply.map(|r| r.to_string().into()),
1737            payload: payload.to_vec().into(),
1738            headers,
1739            status: None,
1740            description: None,
1741            length: 0,
1742        }
1743    }
1744
1745    const ACK_42: &str = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1746
1747    #[test]
1748    fn kv_message_decodes_put_without_operation_header() {
1749        // The common case: a put carries no KV-Operation header at all.
1750        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"v1", None);
1751        match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1752            KvUpdate::Put(e) => {
1753                assert_eq!(e.key, "node.a");
1754                assert_eq!(e.value, b"v1");
1755                assert_eq!(e.version.as_u64(), Some(42));
1756            }
1757            other => panic!("expected Put, got {other:?}"),
1758        }
1759    }
1760
1761    #[test]
1762    fn kv_message_decodes_delete_and_purge_markers() {
1763        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("DEL"));
1764        assert!(matches!(
1765            kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1766            KvUpdate::Delete { ref key, ref version } if key == "node.a" && version.as_u64() == Some(42)
1767        ));
1768
1769        let msg = raw_kv_msg("$KV.certs.node.a", Some(ACK_42), b"", Some("PURGE"));
1770        assert!(matches!(
1771            kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace"),
1772            KvUpdate::Purge { ref key, .. } if key == "node.a"
1773        ));
1774    }
1775
1776    #[test]
1777    fn kv_message_outside_keyspace_is_skipped() {
1778        // A subject-filtered consumer should never deliver this; the decode
1779        // skips rather than mis-keys it.
1780        let msg = raw_kv_msg("$KV.other.node.a", Some(ACK_42), b"v", None);
1781        assert!(kv_message_to_update(&msg, "$KV.certs.").is_none());
1782    }
1783
1784    #[test]
1785    fn kv_message_without_reply_gets_revision_zero() {
1786        // No ACK reply subject → revision unparseable → 0, the same "unknown
1787        // version" convention scan() uses.
1788        let msg = raw_kv_msg("$KV.certs.node.a", None, b"v", None);
1789        match kv_message_to_update(&msg, "$KV.certs.").expect("in keyspace") {
1790            KvUpdate::Put(e) => assert_eq!(e.version.as_u64(), Some(0)),
1791            other => panic!("expected Put, got {other:?}"),
1792        }
1793    }
1794
1795    #[test]
1796    fn raw_create_already_exists_when_10058_in_code_field() {
1797        // Some Synadia Cloud deployments echo 10058 in `code` rather than
1798        // `err_code`. Both paths must return AlreadyExists, not Failed.
1799        let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
1800        assert_eq!(
1801            classify_raw_create_response(payload),
1802            RawCreateOutcome::AlreadyExists
1803        );
1804    }
1805
1806    #[test]
1807    fn raw_create_error_without_code_defaults_to_zero() {
1808        // Defensive: a malformed error object still classifies as Failed rather
1809        // than silently passing, with code defaulting to 0.
1810        let payload = br#"{"error":{"description":"mystery"}}"#;
1811        match classify_raw_create_response(payload) {
1812            RawCreateOutcome::Failed { code, description } => {
1813                assert_eq!(code, 0);
1814                assert_eq!(description, "mystery");
1815            }
1816            other => panic!("expected Failed, got {other:?}"),
1817        }
1818    }
1819}
1820
1821/// Live-server conformance tests for the floor guard
1822/// ([`stream_watch_floor_guarded`]) — these drive the guarded loop DIRECTLY
1823/// with a deliberately clamped `Watch`, which reproduces exactly the state
1824/// retention leaves behind when it overruns a live consumer (the watcher
1825/// methods' resume-time checks can't be raced deterministically from
1826/// outside, but the guarded loop neither knows nor cares how its watch got
1827/// clamped). Spawns a throwaway `nats-server` (mise-installed, same pattern
1828/// as tests/common).
1829#[cfg(test)]
1830mod floor_guard_tests {
1831    use super::*;
1832    use std::process::{Child, Command, Stdio};
1833
1834    struct TestServer {
1835        child: Child,
1836        url: String,
1837        _dir: tempfile::TempDir,
1838    }
1839
1840    impl Drop for TestServer {
1841        fn drop(&mut self) {
1842            let _ = self.child.kill();
1843            let _ = self.child.wait();
1844        }
1845    }
1846
1847    async fn start_server() -> TestServer {
1848        let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".into());
1849        let port = std::net::TcpListener::bind("127.0.0.1:0")
1850            .unwrap()
1851            .local_addr()
1852            .unwrap()
1853            .port();
1854        let dir = tempfile::tempdir().unwrap();
1855        let child = Command::new(&bin)
1856            .args([
1857                "--jetstream",
1858                "--addr",
1859                "127.0.0.1",
1860                "--port",
1861                &port.to_string(),
1862                "--store_dir",
1863                dir.path().to_str().unwrap(),
1864            ])
1865            .stdout(Stdio::null())
1866            .stderr(Stdio::null())
1867            .spawn()
1868            .unwrap_or_else(|e| panic!("spawn {bin}: {e}; run `mise install`"));
1869        let server = TestServer {
1870            child,
1871            url: format!("nats://127.0.0.1:{port}"),
1872            _dir: dir,
1873        };
1874        for _ in 0..100 {
1875            if async_nats::connect(&server.url).await.is_ok() {
1876                return server;
1877            }
1878            tokio::time::sleep(Duration::from_millis(100)).await;
1879        }
1880        panic!("nats-server never became ready");
1881    }
1882
1883    /// `(js, kv store)` with five revisions across five subjects (history 1).
1884    async fn seeded_bucket(
1885        url: &str,
1886    ) -> (
1887        async_nats::jetstream::Context,
1888        async_nats::jetstream::kv::Store,
1889    ) {
1890        let client = async_nats::connect(url).await.unwrap();
1891        let js = async_nats::jetstream::new(client);
1892        let kv = js
1893            .create_key_value(async_nats::jetstream::kv::Config {
1894                bucket: "guard".into(),
1895                history: 1,
1896                ..Default::default()
1897            })
1898            .await
1899            .unwrap();
1900        for i in 1..=5u8 {
1901            kv.put(format!("k{i}"), vec![i].into()).await.unwrap();
1902        }
1903        (js, kv)
1904    }
1905
1906    /// TRUE POSITIVE: the watch was clamped past evicted revisions (purge
1907    /// advanced first_seq beyond the frontier) — the first gapped delivery
1908    /// must trip the guard BEFORE the entry is processed, never silently
1909    /// folding past the lost range. This is the live twin of the model's
1910    /// `GuardRepair`-only-progress gate.
1911    #[tokio::test(flavor = "multi_thread")]
1912    async fn gapped_delivery_with_advanced_floor_trips() {
1913        let server = start_server().await;
1914        let (js, kv) = seeded_bucket(&server.url).await;
1915
1916        // Evict revisions 1-3 outright: first_sequence becomes 4.
1917        let mut stream = js.get_stream("KV_guard").await.unwrap();
1918        stream.purge().sequence(4).await.unwrap();
1919        assert_eq!(stream.info().await.unwrap().state.first_sequence, 4);
1920
1921        // A consumer resuming from revision 1 gets CLAMPED to revision 4
1922        // (NATS's silent skip, pinned by tests/resync.rs). Hand that watch
1923        // to the guarded loop as a live consumer whose retention just
1924        // overran it.
1925        let watch = kv.watch_all_from_revision(2).await.unwrap();
1926        let (tx, mut rx) = tokio::sync::mpsc::channel(64);
1927        let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
1928
1929        // The 5s bound pins IN-BAND detection: the trip must come from the
1930        // gapped-delivery check itself, not the 30s no-traffic backstop. A
1931        // regression to periodic-only detection (the design the model
1932        // checker rejected — catch-up between probes erases the evidence)
1933        // fails this bound.
1934        let err = tokio::time::timeout(
1935            Duration::from_secs(5),
1936            stream_watch_floor_guarded(watch, &tx, 1, &js, "guard"),
1937        )
1938        .await
1939        .expect("the trip must be IN-BAND (immediate), not backstop-paced")
1940        .expect_err("a gapped delivery over an advanced floor must trip");
1941        assert!(
1942            err.to_string().contains("retention overran live watch"),
1943            "{err}"
1944        );
1945        drop(tx);
1946        let _ = drain.await;
1947    }
1948
1949    /// NO FALSE POSITIVE: interior (per-subject) eviction also gaps the
1950    /// delivered revisions, but the floor stays at or below the frontier —
1951    /// benign for a last-write-wins fold, and the guard must let it
1952    /// through. (Every existing bootstrap e2e also rides this path on its
1953    /// resume; this pins the discrimination explicitly.)
1954    #[tokio::test(flavor = "multi_thread")]
1955    async fn benign_interior_gap_passes() {
1956        let server = start_server().await;
1957        let (js, kv) = seeded_bucket(&server.url).await;
1958
1959        // Overwrite k2 and k3: revisions 2 and 3 are interior-evicted
1960        // (history 1), revisions 6 and 7 replace them. first_sequence stays
1961        // 1 (k1's revision is retained).
1962        kv.put("k2", vec![22].into()).await.unwrap();
1963        kv.put("k3", vec![33].into()).await.unwrap();
1964        let mut stream = js.get_stream("KV_guard").await.unwrap();
1965        assert_eq!(stream.info().await.unwrap().state.first_sequence, 1);
1966
1967        // Resume from revision 1: deliveries jump 2 and 3 — gapped, benign.
1968        let watch = kv.watch_all_from_revision(2).await.unwrap();
1969        let (tx, mut rx) = tokio::sync::mpsc::channel(64);
1970        let guard =
1971            tokio::spawn(
1972                async move { stream_watch_floor_guarded(watch, &tx, 1, &js, "guard").await },
1973            );
1974
1975        let mut got = Vec::new();
1976        while got.len() < 4 {
1977            let update = tokio::time::timeout(Duration::from_secs(5), rx.recv())
1978                .await
1979                .expect("deliveries continue past benign gaps")
1980                .expect("watch alive");
1981            got.push(update.version().as_u64().unwrap());
1982        }
1983        assert_eq!(got, vec![4, 5, 6, 7], "interior gaps jumped, tail dense");
1984        guard.abort(); // endless live watch; the assertion above is the test
1985    }
1986}