Skip to main content

slipstream/
nats.rs

1use async_nats::jetstream::kv::Store;
2use async_trait::async_trait;
3use futures::StreamExt;
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, Ordering},
7};
8use std::time::Duration;
9use tokio::sync::{RwLock, mpsc::Sender};
10use tracing::{debug, error, info, warn};
11
12use crate::kv::{
13    KvEntry, KvError, KvReader, KvUpdate, KvWatcher, KvWriter, VersionToken, WatchCursor,
14};
15use crate::stores::{Connection, ConnectionCapabilities, KvStore, StoreConfig};
16
17/// Default per-operation timeout for NATS KV ops. async-nats's request/response
18/// futures don't fail in-flight requests when the underlying TCP connection
19/// goes half-dead (CLOSE_WAIT) — they just await forever. Without a timeout
20/// here, ANY hung NATS connection translates into a tokio runtime deadlock as
21/// soon as enough callers queue behind the dead connection. 30s is generous
22/// for legitimate slow ops (cold JetStream stream sync, leader election under
23/// load) and short enough that a dead connection recovers within reasonable
24/// human-debug latency.
25const KV_OP_TIMEOUT: Duration = Duration::from_secs(30);
26
27/// Server-side inactivity reaper for the ephemeral consumers `scan()`/`keys()`
28/// create. Our code deletes each consumer explicitly when the drain finishes,
29/// but that delete is best-effort: on a half-dead (CLOSE_WAIT) connection it
30/// times out, orphaning the consumer server-side where it counts against the
31/// per-stream consumer limit. Setting `inactive_threshold` makes JetStream reap
32/// any consumer that sees no activity for this long, so a failed explicit delete
33/// self-heals instead of accumulating until the limit is hit. Comfortably longer
34/// than [`KV_OP_TIMEOUT`] so it never reaps a legitimately slow in-flight drain
35/// (each delivery resets the inactivity timer).
36const CONSUMER_INACTIVE_THRESHOLD: Duration = Duration::from_secs(300);
37
38/// Run a future under [`KV_OP_TIMEOUT`], returning [`KvError::Timeout`] if it
39/// doesn't complete in time. Preserves the inner future's `Result` so callers
40/// keep their existing error-mapping logic.
41async fn timed<F, T>(fut: F) -> Result<T, KvError>
42where
43    F: std::future::Future<Output = T>,
44{
45    tokio::time::timeout(KV_OP_TIMEOUT, fut)
46        .await
47        .map_err(|_| KvError::Timeout)
48}
49
50/// Build NATS connect options (with auth applied) and resolve the URL to dial.
51///
52/// Split out from [`nats_connect`] so the connection lifecycle can attach an
53/// event callback (for health tracking) to the *same* options before dialing,
54/// without duplicating the auth-priority logic. Returns the options plus the URL
55/// to connect to (which may differ from `url` when credentials are stripped out
56/// of a `user:pass@host` URL).
57///
58/// Auth priority (first match wins):
59/// 1. Inline credentials (base64-encoded .creds content)
60/// 2. Credentials file (if provided)
61/// 3. URL-embedded credentials (user:pass@host)
62/// 4. No authentication
63async fn build_connect_options(
64    url: &str,
65    creds: Option<&str>,
66    creds_file: Option<&str>,
67) -> Result<(async_nats::ConnectOptions, String), async_nats::ConnectError> {
68    // Priority 1: Inline credentials (base64-encoded)
69    if let Some(encoded) = creds {
70        use base64::Engine;
71        let decoded = base64::engine::general_purpose::STANDARD
72            .decode(encoded)
73            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
74        let content = String::from_utf8(decoded)
75            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
76        return Ok((
77            async_nats::ConnectOptions::with_credentials(&content)?,
78            url.to_string(),
79        ));
80    }
81
82    // Priority 2: Credentials file
83    if let Some(path) = creds_file {
84        return Ok((
85            async_nats::ConnectOptions::with_credentials_file(path).await?,
86            url.to_string(),
87        ));
88    }
89
90    // Priority 3: URL-embedded credentials
91    if let Ok(parsed) = url::Url::parse(url)
92        && !parsed.username().is_empty()
93    {
94        let user = parsed.username().to_string();
95        let pass = parsed.password().unwrap_or("").to_string();
96        // Rebuild URL without credentials. If the scheme doesn't support
97        // userinfo, these calls fail and the credentials would remain embedded
98        // in the URL we later log — warn loudly rather than silently leak them.
99        let mut clean_url = parsed.clone();
100        if clean_url.set_username("").is_err() || clean_url.set_password(None).is_err() {
101            warn!("could not strip credentials from NATS URL; they may appear in logs");
102        }
103        return Ok((
104            async_nats::ConnectOptions::with_user_and_password(user, pass),
105            clean_url.as_str().to_string(),
106        ));
107    }
108
109    // Priority 4: No authentication
110    Ok((async_nats::ConnectOptions::new(), url.to_string()))
111}
112
113/// Connect to NATS with various authentication methods.
114///
115/// Supports the auth-priority order documented on [`build_connect_options`].
116/// This is the standalone helper; the [`Connection`] impl builds options the
117/// same way but also installs a health-tracking event callback.
118pub async fn nats_connect(
119    url: &str,
120    creds: Option<&str>,
121    creds_file: Option<&str>,
122) -> Result<async_nats::Client, async_nats::ConnectError> {
123    let (opts, dial_url) = build_connect_options(url, creds, creds_file).await?;
124    opts.connect(dial_url).await
125}
126
127/// Configuration for NATS connection.
128///
129/// `Debug` is hand-written, not derived: `creds` holds decoded credential
130/// material and `creds_file` a filesystem path to secrets. A derived `Debug`
131/// would print both verbatim the moment anyone `{:?}`-formats the config (a
132/// `tracing` span, an error context, a test dump), leaking credentials into
133/// logs. The redacting impl below keeps that from being one careless format
134/// string away.
135#[derive(Clone)]
136pub struct NatsConnectionConfig {
137    pub url: String,
138    /// Base64-encoded .creds file content (for ECS / containerized environments).
139    pub creds: Option<String>,
140    /// Path to .creds file on disk (for bare-metal / local development).
141    pub creds_file: Option<String>,
142}
143
144impl std::fmt::Debug for NatsConnectionConfig {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("NatsConnectionConfig")
147            .field("url", &self.url)
148            // Presence, never content: enough to debug "are creds set?" without
149            // ever rendering the secret itself. The same applies to `creds_file`:
150            // a path like `/run/secrets/prod.creds` leaks the secrets layout into
151            // logs/traces, so redact it to presence too.
152            .field("creds", &self.creds.as_ref().map(|_| "[redacted]"))
153            .field(
154                "creds_file",
155                &self.creds_file.as_ref().map(|_| "[redacted]"),
156            )
157            .finish()
158    }
159}
160
161/// Create a KV bucket using raw JetStream API (bypasses async-nats response parsing issues).
162///
163/// Synadia Cloud returns responses that `async_nats` can't parse. This function
164/// uses the raw JetStream API directly, bypassing the client's response deserialization.
165///
166/// `pub(crate)`: this is an internal Synadia Cloud workaround invoked by
167/// `get_or_create_bucket`, not a stable entry point. Exposing it would pin a
168/// vendor quirk into the crate's semver surface.
169pub(crate) async fn create_kv_bucket_raw(
170    client: &async_nats::Client,
171    bucket: &str,
172    max_bytes: i64,
173    history: i64,
174    max_age_nanos: i64,
175    num_replicas: usize,
176) -> Result<(), KvError> {
177    let stream_name = format!("KV_{}", bucket);
178    let subject = format!("$KV.{}.>", bucket);
179
180    // JetStream stream config for KV bucket
181    let config = serde_json::json!({
182        "name": stream_name,
183        "subjects": [subject],
184        "max_msgs_per_subject": history,
185        "max_bytes": max_bytes,
186        "max_age": max_age_nanos,
187        "storage": "file",
188        "allow_rollup_hdrs": true,
189        "deny_delete": false,
190        "deny_purge": false,
191        "allow_direct": true,
192        "discard": "new",
193        "num_replicas": num_replicas,
194        "retention": "limits"
195    });
196
197    let payload = serde_json::to_vec(&config)
198        .map_err(|e| KvError::ConnectionFailed(format!("failed to serialize config: {}", e)))?;
199
200    let response = client
201        .request(
202            format!("$JS.API.STREAM.CREATE.{}", stream_name),
203            payload.into(),
204        )
205        .await
206        .map_err(|e| KvError::ConnectionFailed(format!("failed to send create request: {}", e)))?;
207
208    let response_str = String::from_utf8_lossy(&response.payload);
209    debug!(bucket, response = %response_str, "raw JetStream response");
210
211    match classify_raw_create_response(&response.payload) {
212        RawCreateOutcome::AlreadyExists => {
213            info!(bucket, "bucket already exists");
214            Ok(())
215        }
216        RawCreateOutcome::StreamLimit => {
217            info!(bucket, "stream limit reached, bucket may already exist");
218            Ok(())
219        }
220        RawCreateOutcome::Created => {
221            info!(bucket, "bucket created successfully via raw API");
222            Ok(())
223        }
224        RawCreateOutcome::Failed { code, description } => Err(KvError::ConnectionFailed(format!(
225            "JetStream error {}: {}",
226            code, description
227        ))),
228    }
229}
230
231/// Classification of a raw `$JS.API.STREAM.CREATE` response payload.
232///
233/// Separated from the I/O in [`create_kv_bucket_raw`] so the Synadia Cloud
234/// error-code logic — the reason this raw path exists — is unit-testable
235/// without a live NATS server.
236#[derive(Debug, PartialEq, Eq)]
237enum RawCreateOutcome {
238    /// No error in the response: the bucket was created.
239    Created,
240    /// `10058` — stream name already in use; the bucket exists. Non-fatal.
241    AlreadyExists,
242    /// `400` "maximum number of streams"; Synadia Cloud returns this at the
243    /// stream limit even when the bucket already exists. Non-fatal.
244    StreamLimit,
245    /// Any other JetStream error — fatal.
246    Failed { code: i64, description: String },
247}
248
249fn classify_raw_create_response(payload: &[u8]) -> RawCreateOutcome {
250    // Unparseable payloads are treated as success: the caller re-verifies the
251    // bucket with a follow-up `get_key_value`, so a garbled body here does not
252    // mask a real failure. Warn so the fallback assumption is auditable — if the
253    // re-verify step is ever refactored away, this log is the breadcrumb.
254    //
255    // INVARIANT: this `Created`-on-garbage path is only sound because every
256    // caller re-verifies the bucket exists after `create_kv_bucket_raw` returns
257    // Ok. The sole caller — `get_or_create_bucket` — does so via the
258    // `timed(js.get_key_value(...))` immediately after the raw-create fallback.
259    // Do not remove that re-verify without making this path return `Failed`.
260    let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) else {
261        warn!(
262            response = %String::from_utf8_lossy(payload),
263            "unparseable STREAM.CREATE response; assuming created (caller re-verifies via get_key_value)"
264        );
265        return RawCreateOutcome::Created;
266    };
267
268    let Some(err) = json.get("error") else {
269        return RawCreateOutcome::Created;
270    };
271
272    // JetStream splits its error codes: `code` is the HTTP-style status (400,
273    // 404, 500) while `err_code` carries the granular code (e.g. 10058). The
274    // already-exists code can surface in either field depending on the server
275    // (standard NATS puts 10058 in `err_code` with `code` = 400; some managed
276    // deployments echo it in `code`), so we accept it in either.
277    let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
278    let err_code = err.get("err_code").and_then(|c| c.as_i64()).unwrap_or(0);
279    let description = err
280        .get("description")
281        .and_then(|d| d.as_str())
282        .unwrap_or("unknown error");
283
284    // 10058 = stream name already in use (bucket exists) - that's OK
285    if code == 10058 || err_code == 10058 {
286        return RawCreateOutcome::AlreadyExists;
287    }
288
289    // 400 "maximum number of streams reached" may also mean bucket exists
290    // (Synadia Cloud returns this when at stream limit but bucket exists)
291    if code == 400 && description.contains("maximum number of streams") {
292        return RawCreateOutcome::StreamLimit;
293    }
294
295    RawCreateOutcome::Failed {
296        code,
297        description: description.to_string(),
298    }
299}
300
301struct NatsHandle {
302    // Held to keep the NATS connection alive for the lifetime of the handle.
303    // The `jetstream` context clones an internal reference, but this field is
304    // the authoritative owner — dropping the handle drops the connection.
305    // `dead_code` because we never read it directly after construction.
306    #[allow(dead_code)]
307    client: async_nats::Client,
308    jetstream: async_nats::jetstream::Context,
309}
310
311/// NATS JetStream KV connection.
312pub struct NatsConnection {
313    config: NatsConnectionConfig,
314    handle: RwLock<Option<NatsHandle>>,
315    // Shared with the installed client's event callback so `is_healthy()`
316    // tracks real connection state (Connected/Disconnected) rather than staying
317    // pinned at its connect-time value. `Arc` because the callback outlives this
318    // struct's borrow — it runs on the client's event-loop task.
319    healthy: Arc<AtomicBool>,
320    // Set only for connections built via `from_client`, where no health-tracking
321    // event callback could be installed (the client was already connected).
322    // `is_healthy()` consults this client's live `connection_state()` instead of
323    // the callback-driven `healthy` flag. `None` for the `new()` + `connect()`
324    // path, whose flag is kept current by the installed event callback.
325    //
326    // `Some(_)` is also the marker that this connection borrows a caller-owned
327    // client: it carries no URL or credentials of its own (see `from_client`),
328    // so it cannot redial. `connect()` refuses to reconnect such a connection
329    // rather than dialing the empty config URL and surfacing a cryptic error.
330    state_probe: Option<async_nats::Client>,
331}
332
333impl NatsConnection {
334    pub fn new(config: NatsConnectionConfig) -> Self {
335        Self {
336            config,
337            handle: RwLock::new(None),
338            healthy: Arc::new(AtomicBool::new(false)),
339            state_probe: None,
340        }
341    }
342
343    /// Create a NatsConnection from an existing NATS client.
344    ///
345    /// This is useful when the caller already has a NATS connection and wants
346    /// to reuse it for KV stores instead of creating a new connection.
347    pub fn from_client(client: async_nats::Client) -> Self {
348        let jetstream = async_nats::jetstream::new(client.clone());
349        let config = NatsConnectionConfig {
350            url: String::new(), // Not used when pre-connected
351            creds: None,
352            creds_file: None,
353        };
354
355        // Clone a probe handle before the client moves into `NatsHandle`.
356        // `async_nats::Client` is cheap to clone (internally an `Arc`), and
357        // `connection_state()` just reads a watch channel — no I/O.
358        let state_probe = Some(client.clone());
359        let handle = NatsHandle { client, jetstream };
360
361        Self {
362            config,
363            handle: RwLock::new(Some(handle)),
364            // A pre-connected client carries no health-tracking callback (we
365            // didn't build its options), so `is_healthy()` reads the client's
366            // live `connection_state()` via `state_probe`. The flag below only
367            // gates explicit `shutdown()`.
368            healthy: Arc::new(AtomicBool::new(true)),
369            state_probe,
370        }
371    }
372
373    async fn get_or_create_bucket(
374        client: &async_nats::Client,
375        js: &async_nats::jetstream::Context,
376        config: &StoreConfig,
377    ) -> Result<Store, KvError> {
378        // Try to get existing bucket first. Bound the call so a slow/dead
379        // NATS connection at startup can't park the daemon's init thread
380        // forever — the rest of startup (HTTP listener bind, etc.) happens
381        // after this. Without the timeout, a single bad NATS round-trip
382        // here held HTTP bind for 30s+ in observed cases.
383        //
384        // A failure here (permission denied, JetStream disabled, timeout) is not
385        // necessarily fatal — the bucket may simply not exist yet, so we fall
386        // through to create. But surface the original error first: otherwise a
387        // later create failure (e.g. "permission denied on STREAM.CREATE") masks
388        // the real cause ("permission denied on STREAM.INFO") and makes the
389        // failure undebuggable under load.
390        match timed(js.get_key_value(&config.name)).await {
391            Ok(Ok(kv)) => return Ok(kv),
392            Ok(Err(e)) => {
393                debug!(bucket = %config.name, error = ?e, "get_key_value failed; attempting create");
394            }
395            Err(_) => {
396                warn!(bucket = %config.name, timeout = ?KV_OP_TIMEOUT, "get_key_value timed out; attempting create");
397            }
398        }
399
400        // Bucket doesn't exist, create it
401        let mut kv_config = async_nats::jetstream::kv::Config {
402            bucket: config.name.clone(),
403            num_replicas: config.num_replicas.unwrap_or(1),
404            ..Default::default()
405        };
406
407        // Apply max_age (bucket-level TTL) if specified. `as_nanos()` is u128;
408        // saturate to i64::MAX rather than `as i64`, which would silently wrap a
409        // >292-year duration into a negative (and thus meaningless) TTL.
410        let max_age_nanos = if let Some(max_age) = config.max_age {
411            kv_config.max_age = max_age;
412            i64::try_from(max_age.as_nanos()).unwrap_or(i64::MAX)
413        } else {
414            0
415        };
416
417        // Apply max_history if specified. `i64::from` is lossless for a u32 and
418        // states the widening intent, where `as i64` would quietly mask a future
419        // type change that no longer fits.
420        let history = if let Some(history) = config.max_history {
421            let history = i64::from(history);
422            kv_config.history = history;
423            history
424        } else {
425            1
426        };
427
428        // Apply max_bytes if specified (required by Synadia Cloud)
429        let max_bytes = config.max_bytes.unwrap_or(10 * 1024 * 1024); // Default 10MB for Synadia Cloud
430        kv_config.max_bytes = max_bytes;
431
432        // Try normal create first, fall back to raw API if it fails (Synadia Cloud compatibility)
433        match timed(js.create_key_value(kv_config)).await? {
434            Ok(kv) => Ok(kv),
435            Err(e) => {
436                warn!(
437                    bucket = config.name,
438                    error = ?e,
439                    "create_key_value failed, trying raw JetStream API"
440                );
441
442                // Try raw JetStream API as fallback
443                create_kv_bucket_raw(
444                    client,
445                    &config.name,
446                    max_bytes,
447                    history,
448                    max_age_nanos,
449                    config.num_replicas.unwrap_or(1),
450                )
451                .await?;
452
453                // Re-verify the bucket exists. This upholds the INVARIANT in
454                // `classify_raw_create_response`: the raw path reports `Created`
455                // on an unparseable response, so this round-trip is what actually
456                // confirms the bucket — do not remove it.
457                timed(js.get_key_value(&config.name))
458                    .await?
459                    .map_err(|e| {
460                        error!(bucket = config.name, error = ?e, "failed to get bucket after raw create");
461                        KvError::ConnectionFailed(format!("get bucket after raw create: {:?}", e))
462                    })
463            }
464        }
465    }
466}
467
468#[async_trait]
469impl Connection for NatsConnection {
470    async fn connect(&self) -> Result<(), KvError> {
471        // Fast path: skip if already connected.
472        if self.healthy.load(Ordering::Acquire) {
473            return Ok(());
474        }
475
476        // A `from_client` connection borrows a caller-owned client and kept no
477        // URL or credentials, so it cannot redial. Refuse here with an actionable
478        // message instead of dialing the empty config URL (which fails with an
479        // opaque parse/connect error). This is reachable only after `shutdown()`
480        // cleared the fast-path flag above — a live borrowed client short-circuits
481        // there. The caller must construct a `NatsConnection::new(config)` if it
482        // needs reconnect semantics.
483        if self.state_probe.is_some() {
484            return Err(KvError::ConnectionFailed(
485                "connection was built via NatsConnection::from_client and cannot \
486                 reconnect (no URL or credentials retained); construct \
487                 NatsConnection::new(config) for a reconnectable connection"
488                    .to_string(),
489            ));
490        }
491
492        let (opts, dial_url) = build_connect_options(
493            &self.config.url,
494            self.config.creds.as_deref(),
495            self.config.creds_file.as_deref(),
496        )
497        .await
498        .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
499
500        // Drive `healthy` from the client's own connection events so it reflects
501        // reality through async-nats's transparent reconnects — without this the
502        // flag stays `true` straight through a NATS outage, and a readiness probe
503        // built on `is_healthy()` keeps routing traffic to a node that can't
504        // reach NATS.
505        //
506        // `installed` gates the callback: a caller that loses the connect race
507        // (see the double-check below) tears down its freshly built client, and
508        // that teardown fires `Disconnected`. Without the gate, the loser's drop
509        // would clobber the *winner's* `healthy` flag. Only the client we
510        // actually install ever flips `installed` to `true`, so the losers'
511        // callbacks are inert.
512        let installed = Arc::new(AtomicBool::new(false));
513        let cb_healthy = Arc::clone(&self.healthy);
514        let cb_installed = Arc::clone(&installed);
515        let opts = opts.event_callback(move |event| {
516            let cb_healthy = Arc::clone(&cb_healthy);
517            let cb_installed = Arc::clone(&cb_installed);
518            async move {
519                if !cb_installed.load(Ordering::Acquire) {
520                    return;
521                }
522                match event {
523                    async_nats::Event::Connected => cb_healthy.store(true, Ordering::Release),
524                    async_nats::Event::Disconnected => cb_healthy.store(false, Ordering::Release),
525                    _ => {}
526                }
527            }
528        });
529
530        let client = opts
531            .connect(dial_url)
532            .await
533            .map_err(|e| KvError::ConnectionFailed(e.to_string()))?;
534
535        let jetstream = async_nats::jetstream::new(client.clone());
536
537        let conn = NatsHandle { client, jetstream };
538
539        // Re-check under the write lock: a concurrent caller may have connected
540        // while we were awaiting the dial. If so, drop our freshly built handle
541        // (closing its connection) instead of replacing the live one, which would
542        // orphan a connection the first caller still believes is installed.
543        // Leaving `installed = false` keeps our about-to-drop client's teardown
544        // events from touching `healthy`.
545        let mut handle = self.handle.write().await;
546        if handle.is_some() {
547            return Ok(());
548        }
549        installed.store(true, Ordering::Release);
550        *handle = Some(conn);
551        self.healthy.store(true, Ordering::Release);
552
553        Ok(())
554    }
555
556    async fn shutdown(&self) -> Result<(), KvError> {
557        self.healthy.store(false, Ordering::Release);
558        *self.handle.write().await = None;
559        Ok(())
560    }
561
562    fn is_healthy(&self) -> bool {
563        // `healthy` is the shutdown gate for both construction paths: once
564        // `shutdown()` clears it the connection is down regardless of socket
565        // state, so check it first.
566        if !self.healthy.load(Ordering::Acquire) {
567            return false;
568        }
569        match &self.state_probe {
570            // `from_client`: no event callback could be installed, so consult the
571            // client's live connection state instead of a stale connect-time
572            // value. A dead or reconnecting socket reports Pending/Disconnected,
573            // so a readiness probe correctly sees the node as unhealthy. A
574            // borrowed client is never replaced (connect() refuses to reconnect
575            // it), so this probe never goes stale.
576            Some(client) => matches!(
577                client.connection_state(),
578                async_nats::connection::State::Connected
579            ),
580            // `new()` + `connect()`: `healthy` is kept current by the installed
581            // Connected/Disconnected event callback.
582            None => true,
583        }
584    }
585
586    async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError> {
587        let config = StoreConfig {
588            name: name.to_string(),
589            ..Default::default()
590        };
591        self.store_with_config(config).await
592    }
593
594    async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError> {
595        // Clone the client/jetstream out from under the read lock before the
596        // (up to 60s) bucket get-or-create. Holding the read guard across that
597        // await would block `shutdown()`'s `write().await` for the full
598        // duration, stalling graceful shutdown behind an in-flight store call.
599        let (client, js) = {
600            let conn = self.handle.read().await;
601            let conn = conn.as_ref().ok_or(KvError::NotConnected)?;
602            (conn.client.clone(), conn.jetstream.clone())
603        };
604
605        let kv = Self::get_or_create_bucket(&client, &js, &config).await?;
606
607        Ok(Arc::new(NatsKvStore {
608            name: config.name,
609            client,
610            js,
611            kv,
612        }))
613    }
614
615    fn capabilities(&self) -> ConnectionCapabilities {
616        ConnectionCapabilities {
617            streaming_watch: true,
618            prefix_watch: true,
619            // `KvTtl` is not implemented for the NATS backend yet (only `KvWriter`
620            // is vended by `writer()`), so advertising `ttl: true` would lead
621            // callers that branch on this flag down a path that can never
622            // succeed. Flip to `true` together with the `KvTtl` impl.
623            ttl: false,
624            cas: true,
625            transactions: false,
626            // 0 = unlimited from this layer's perspective: we impose no cap, but
627            // the NATS server still enforces its own max payload (~1MB by
628            // default). Callers that branch on this must not read 0 as "any size
629            // is safe" — an oversized value is rejected server-side at write time.
630            max_value_size: 0,
631            global_ordering: false,
632        }
633    }
634}
635
636struct NatsKvStore {
637    name: String,
638    kv: Store,
639    client: async_nats::Client,
640    js: async_nats::jetstream::Context,
641}
642
643impl KvStore for NatsKvStore {
644    fn name(&self) -> &str {
645        &self.name
646    }
647
648    fn reader(&self) -> Arc<dyn KvReader> {
649        Arc::new(NatsKvReader {
650            kv: self.kv.clone(),
651            client: self.client.clone(),
652            js: self.js.clone(),
653            bucket: self.name.clone(),
654        })
655    }
656
657    fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
658        Some(Arc::new(NatsKvWatcher {
659            kv: self.kv.clone(),
660        }))
661    }
662
663    fn writer(&self) -> Option<Arc<dyn KvWriter>> {
664        Some(Arc::new(NatsKvWriterImpl {
665            kv: self.kv.clone(),
666        }))
667    }
668}
669
670struct NatsKvReader {
671    kv: Store,
672    client: async_nats::Client,
673    js: async_nats::jetstream::Context,
674    // The bucket name is known at construction (it's the store's name), so
675    // `consume_last_per_subject` builds its subject filters from this field
676    // instead of issuing a `kv.status()` round-trip per `scan()`/`keys()` call
677    // just to read it back from the server.
678    bucket: String,
679}
680
681#[async_trait]
682impl KvReader for NatsKvReader {
683    async fn get(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
684        // Empty value → treat as absent. This unifies a real stored `b""` and a
685        // `delete_with_version` tombstone (empty-value Put) under one "absent =
686        // None" contract, consistent with `scan()`/`keys()`. Callers needing
687        // zero-length semantics use `entry()`. See the `KvReader::get` trait doc.
688        match self.entry(key).await? {
689            Some(entry) if entry.value.is_empty() => Ok(None),
690            other => Ok(other),
691        }
692    }
693
694    async fn entry(&self, key: &str) -> Result<Option<KvEntry>, KvError> {
695        use async_nats::jetstream::kv::Operation;
696        // Use entry() instead of get() to access revision.
697        // Return Put entries even with empty values — delete_with_version
698        // writes empty bytes as a tombstone and callers need the version
699        // for CAS conflict detection. Only filter real Delete/Purge markers.
700        match timed(self.kv.entry(key)).await? {
701            Ok(Some(entry)) if entry.operation == Operation::Put => Ok(Some(KvEntry {
702                key: key.to_string(),
703                value: entry.value.to_vec(),
704                version: VersionToken::from_u64(entry.revision),
705            })),
706            Ok(Some(_)) => Ok(None), // Delete/Purge marker
707            Ok(None) => Ok(None),
708            Err(e) => Err(KvError::OperationFailed(e.to_string())),
709        }
710    }
711
712    async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
713        debug!(prefix = %prefix, "listing keys with prefix");
714
715        let mut keys = Vec::new();
716        self.consume_last_per_subject(prefix, true, |msg, key| {
717            // Skip both real KV deletes and CAS tombstones (empty-value Puts
718            // written by delete_with_version). get()/scan() hide the latter, so
719            // keys() must too — otherwise a list-then-get returns phantom keys.
720            // With headers_only the payload is stripped, but NATS adds a
721            // `Nats-Msg-Size` header we use to detect the empty value.
722            if !is_kv_delete(&msg) && !is_empty_value(&msg) {
723                keys.push(key);
724            }
725        })
726        .await?;
727
728        debug!(prefix = %prefix, keys = keys.len(), "keys listing complete");
729        Ok(keys)
730    }
731
732    async fn scan(&self, prefix: &str) -> Result<Vec<KvEntry>, KvError> {
733        let mut entries = Vec::new();
734        self.consume_last_per_subject(prefix, false, |msg, key| {
735            if !is_kv_delete(&msg) && !msg.payload.is_empty() {
736                // The KV revision is the stream sequence, carried in the JetStream
737                // ACK subject (the message's reply subject). A revision of 0 means
738                // we couldn't parse it; callers treat that as "unknown version".
739                let revision = msg
740                    .reply
741                    .as_deref()
742                    .and_then(stream_sequence_from_ack)
743                    .unwrap_or(0);
744
745                entries.push(KvEntry {
746                    key,
747                    value: msg.payload.to_vec(),
748                    version: VersionToken::from_u64(revision),
749                });
750            }
751        })
752        .await?;
753
754        debug!(prefix = %prefix, entries = entries.len(), "scan complete");
755        Ok(entries)
756    }
757}
758
759/// Extract the stream sequence (== KV revision) from a JetStream ACK subject.
760///
761/// The ACK subject — delivered as a push message's reply subject — comes in two
762/// shapes, and the stream sequence sits at different offsets in each:
763///
764/// ```text
765/// legacy (9 tokens):  $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
766/// modern (11–12):     $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>[.<token>]
767/// ```
768///
769/// The previous implementation took the *last* token, which is `num_pending`
770/// (typically 0 on the final delivery), not the sequence — corrupting the
771/// version on every scanned entry. We instead parse from the front, accounting
772/// for the optional `<domain>.<account>` prefix that modern servers prepend.
773fn stream_sequence_from_ack(reply: &str) -> Option<u64> {
774    // The stream-seq field sits at index 5 (legacy) or 7 (modern), so we only
775    // ever read the first 8 tokens. Keep those in a stack array and count the
776    // remainder with the iterator — no heap `Vec`, which on a large `scan()`
777    // would be one allocation per delivered message.
778    let mut head = [""; 8];
779    let mut count = 0usize;
780    for (i, token) in reply.split('.').enumerate() {
781        if i < head.len() {
782            head[i] = token;
783        }
784        count += 1;
785    }
786    if count < 9 || head[0] != "$JS" || head[1] != "ACK" {
787        return None;
788    }
789    // Legacy form has exactly 9 tokens with no domain/account; anything longer
790    // carries the two-token `<domain>.<account>` prefix, shifting fields right.
791    let stream_seq_idx = if count == 9 { 5 } else { 7 };
792    head[stream_seq_idx].parse::<u64>().ok()
793}
794
795/// Check if a NATS message represents a KV delete/purge operation.
796fn is_kv_delete(msg: &async_nats::Message) -> bool {
797    msg.headers
798        .as_ref()
799        .and_then(|h| h.get("KV-Operation"))
800        .is_some()
801}
802
803/// Check if a `headers_only` delivery carries an empty value (a CAS tombstone
804/// written by `delete_with_version`).
805///
806/// When a consumer is created with `headers_only`, NATS strips the body and adds
807/// a `Nats-Msg-Size` header with the original payload length. Size 0 means the
808/// stored value is empty, which `get()`/`scan()` treat as absent. Messages
809/// without the header (e.g. non-`headers_only` deliveries) are not classified as
810/// empty here — callers on that path inspect the payload directly instead.
811fn is_empty_value(msg: &async_nats::Message) -> bool {
812    msg.headers
813        .as_ref()
814        .and_then(|h| h.get("Nats-Msg-Size"))
815        .map(|v| v.as_str() == "0")
816        .unwrap_or(false)
817}
818
819impl NatsKvReader {
820    /// Subscribe to last-per-subject messages for a KV prefix, calling `on_msg`
821    /// for each delivered message. Handles the subscribe-first race workaround,
822    /// consumer lifecycle, and cleanup.
823    async fn consume_last_per_subject(
824        &self,
825        prefix: &str,
826        headers_only: bool,
827        mut on_msg: impl FnMut(async_nats::Message, String),
828    ) -> Result<(), KvError> {
829        use async_nats::jetstream::consumer::push;
830        use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
831
832        // The bucket name is known at construction, so the subject filters are
833        // built directly from `self.bucket` — no `kv.status()` round-trip just
834        // to read it back. Every *remaining* setup await below is still bounded
835        // by `timed()`: a half-dead NATS connection (CLOSE_WAIT) would otherwise
836        // park here before the per-message drain timer downstream ever starts,
837        // hanging scan()/keys() indefinitely — the same failure `timed()` guards
838        // on the write path.
839        let bucket = self.bucket.as_str();
840
841        let nats_filter = if prefix.is_empty() {
842            format!("$KV.{bucket}.>")
843        } else {
844            format!("$KV.{bucket}.{prefix}>")
845        };
846
847        // Work around async-nats <=0.46 subscribe-after-create race:
848        // subscribe to the inbox FIRST, then create the consumer.
849        let inbox = self.client.new_inbox();
850        let mut sub = timed(self.client.subscribe(inbox.clone()))
851            .await?
852            .map_err(|e| KvError::OperationFailed(format!("subscribe inbox: {e}")))?;
853
854        let stream = timed(self.js.get_stream(format!("KV_{bucket}")))
855            .await?
856            .map_err(|e| KvError::OperationFailed(format!("get KV stream: {e}")))?;
857
858        let consumer = timed(stream.create_consumer(push::Config {
859            deliver_subject: inbox,
860            deliver_policy: DeliverPolicy::LastPerSubject,
861            filter_subject: nats_filter,
862            headers_only,
863            // This is a one-shot point-in-time drain — we never ack. Under
864            // the default `AckPolicy::Explicit`, JetStream stops delivering
865            // once `max_ack_pending` (default 1000) messages sit unacked,
866            // which would silently truncate scan()/keys() to the first ~1000
867            // keys (or stall waiting for deliveries that never come) on any
868            // larger bucket. `None` removes the ack-pending gate entirely.
869            ack_policy: AckPolicy::None,
870            // Safety net for the best-effort `delete_consumer` below: if that
871            // cleanup times out on a half-dead connection, JetStream still reaps
872            // this consumer after `CONSUMER_INACTIVE_THRESHOLD` of inactivity, so
873            // repeated timed-out scans can't pile orphaned consumers up against
874            // the per-stream limit.
875            inactive_threshold: CONSUMER_INACTIVE_THRESHOLD,
876            ..Default::default()
877        }))
878        .await?
879        .map_err(|e| KvError::OperationFailed(format!("create consumer: {e}")))?;
880
881        let num_pending = consumer.cached_info().num_pending;
882
883        // Drain exactly `num_pending` messages, but bound each await: a half-dead
884        // connection (CLOSE_WAIT) would otherwise park this loop forever, the same
885        // failure `timed()` guards on the write path. On timeout we still fall
886        // through to consumer cleanup, then surface `Timeout`.
887        let mut timed_out = false;
888        if num_pending > 0 {
889            let mut delivered = 0u64;
890            let kv_prefix = format!("$KV.{bucket}.");
891
892            while delivered < num_pending {
893                match tokio::time::timeout(KV_OP_TIMEOUT, sub.next()).await {
894                    Ok(Some(msg)) => {
895                        let key = msg
896                            .subject
897                            .strip_prefix(&kv_prefix)
898                            .unwrap_or(msg.subject.as_str())
899                            .to_string();
900
901                        on_msg(msg, key);
902                        delivered += 1;
903                    }
904                    Ok(None) => break, // subscription closed early
905                    Err(_) => {
906                        timed_out = true;
907                        break;
908                    }
909                }
910            }
911        }
912
913        // Clean up ephemeral consumer (best-effort), even on timeout — a stalled
914        // scan shouldn't also leak a server-side consumer. A leaked consumer
915        // lingers on the server and counts against per-stream limits, so surface
916        // failures in observability without failing the operation. Bound the
917        // delete with `timed()`: on the same half-dead (CLOSE_WAIT) connection
918        // that tripped the drain timeout above, an unbounded delete would re-park
919        // here forever, defeating the timeout recovery we just performed.
920        match timed(stream.delete_consumer(&consumer.cached_info().name)).await {
921            Ok(Ok(_)) => {}
922            Ok(Err(e)) => {
923                // `warn!`, not `debug!`: a leaked ephemeral consumer lingers on
924                // the server and counts against per-stream limits. Under a flaky
925                // NATS connection every scan()/keys() leaks one, so this must be
926                // visible in default spans before the pile-up hits the limit.
927                warn!(error = %e, "failed to delete ephemeral consumer (best-effort)");
928            }
929            Err(_) => {
930                warn!("timed out deleting ephemeral consumer (best-effort)");
931            }
932        }
933
934        if timed_out {
935            return Err(KvError::Timeout);
936        }
937        Ok(())
938    }
939}
940
941/// Convert a NATS KV entry to a KvUpdate.
942///
943/// Takes the entry by value so the key `String` moves into the `KvUpdate`
944/// instead of allocating a fresh copy per watch event.
945fn nats_entry_to_kv_update(entry: async_nats::jetstream::kv::Entry) -> KvUpdate {
946    use async_nats::jetstream::kv::Operation;
947    let version = VersionToken::from_u64(entry.revision);
948    match entry.operation {
949        Operation::Put => KvUpdate::Put(KvEntry {
950            key: entry.key,
951            value: entry.value.to_vec(),
952            version,
953        }),
954        Operation::Delete => KvUpdate::Delete {
955            key: entry.key,
956            version,
957        },
958        Operation::Purge => KvUpdate::Purge {
959            key: entry.key,
960            version,
961        },
962    }
963}
964
965/// Stream updates from a NATS Watch into a channel until it ends or the receiver drops.
966async fn stream_watch(
967    mut watcher: async_nats::jetstream::kv::Watch,
968    tx: &Sender<KvUpdate>,
969) -> Result<(), KvError> {
970    while let Some(entry) = watcher.next().await {
971        match entry {
972            Ok(entry) => {
973                let update = nats_entry_to_kv_update(entry);
974                if tx.send(update).await.is_err() {
975                    debug!("watch receiver closed");
976                    break;
977                }
978            }
979            Err(e) => {
980                error!(error = %e, "NATS KV watch error");
981                return Err(KvError::WatchError(e.to_string()));
982            }
983        }
984    }
985    Ok(())
986}
987
988/// Check if a NATS watch error indicates the requested start sequence is
989/// too old (compacted), meaning callers should fall back to a full watch.
990///
991/// async-nats has no granular error kind for this: `WatchErrorKind` is only
992/// `InvalidKey`/`TimedOut`/`ConsumerCreate`/`Other`, and "start sequence too old"
993/// arrives as `ConsumerCreate`/`Other` with the real reason buried in the source
994/// error's *message*. So we substring-match the full error string — which already
995/// includes the source, since `Error`'s `Display` renders `"{kind}: {source}"`.
996///
997/// Two deliberate choices make this robust to wording drift:
998/// - We lowercase first, so a capitalization change in NATS/async-nats can't slip
999///   past.
1000/// - Detection is biased toward `true`. A false positive only costs an
1001///   unnecessary (but always-safe) full `watch_all()` replay; a false negative
1002///   propagates `WatchError` and strands a caller that would otherwise recover.
1003///
1004/// If these messages ever change, `cursor_expired_matches_known_nats_error_strings`
1005/// is the canary that fails loudly on the next dependency bump.
1006fn is_cursor_expired_error(err: &str) -> bool {
1007    // Case-insensitive substring match without allocating a lowercased copy of
1008    // the (cold-path) error string. The needles are already lowercase.
1009    const NEEDLES: [&str; 4] = [
1010        "start sequence",
1011        "first sequence",
1012        "sequence not found",
1013        "too old",
1014    ];
1015    let haystack = err.as_bytes();
1016    NEEDLES.iter().any(|needle| {
1017        let n = needle.as_bytes();
1018        haystack.len() >= n.len() && haystack.windows(n.len()).any(|w| w.eq_ignore_ascii_case(n))
1019    })
1020}
1021
1022struct NatsKvWatcher {
1023    kv: Store,
1024}
1025
1026#[async_trait]
1027impl KvWatcher for NatsKvWatcher {
1028    async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1029        // Bound the watch *setup* with `timed()` for the same reason every KV op
1030        // is bounded: a half-dead (CLOSE_WAIT) NATS connection parks this await
1031        // forever instead of failing. The streaming drain in `stream_watch` is
1032        // intentionally unbounded (a watch is long-lived), but establishing it
1033        // must not be able to hang a reconnecting caller.
1034        let watcher = timed(self.kv.watch_all())
1035            .await?
1036            .map_err(|e| KvError::WatchError(e.to_string()))?;
1037        stream_watch(watcher, &tx).await
1038    }
1039
1040    async fn watch_prefix(&self, prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
1041        // Use native NATS subject-based filtering. KV key "node.abc" maps to
1042        // subject "$KV.BUCKET.node.abc", and ">" is the multi-level wildcard.
1043        let nats_key = format!("{prefix}>");
1044        let watcher = timed(self.kv.watch(&nats_key))
1045            .await?
1046            .map_err(|e| KvError::WatchError(e.to_string()))?;
1047        stream_watch(watcher, &tx).await
1048    }
1049
1050    async fn watch_all_from(
1051        &self,
1052        cursor: &WatchCursor,
1053        tx: Sender<KvUpdate>,
1054    ) -> Result<(), KvError> {
1055        let revision = match cursor.as_u64() {
1056            Some(rev) if rev > 0 => rev,
1057            _ => return self.watch_all(tx).await,
1058        };
1059
1060        let watcher = match timed(self.kv.watch_all_from_revision(revision + 1)).await? {
1061            Ok(w) => w,
1062            Err(e) => {
1063                let err_str = e.to_string();
1064                if is_cursor_expired_error(&err_str) {
1065                    warn!(revision, error = %err_str, "cursor expired, caller should fall back to full watch");
1066                    return Err(KvError::CursorExpired);
1067                }
1068                return Err(KvError::WatchError(err_str));
1069            }
1070        };
1071
1072        info!(revision, "resumed watch from cursor");
1073        stream_watch(watcher, &tx).await
1074    }
1075
1076    async fn watch_prefix_from(
1077        &self,
1078        prefix: &str,
1079        cursor: &WatchCursor,
1080        tx: Sender<KvUpdate>,
1081    ) -> Result<(), KvError> {
1082        let revision = match cursor.as_u64() {
1083            Some(rev) if rev > 0 => rev,
1084            _ => return self.watch_prefix(prefix, tx).await,
1085        };
1086
1087        let nats_key = format!("{prefix}>");
1088        let watcher = match timed(self.kv.watch_from_revision(&nats_key, revision + 1)).await? {
1089            Ok(w) => w,
1090            Err(e) => {
1091                let err_str = e.to_string();
1092                if is_cursor_expired_error(&err_str) {
1093                    warn!(revision, prefix, error = %err_str, "cursor expired for prefix watch, caller should fall back");
1094                    return Err(KvError::CursorExpired);
1095                }
1096                return Err(KvError::WatchError(err_str));
1097            }
1098        };
1099
1100        info!(revision, prefix, "resumed prefix watch from cursor");
1101        stream_watch(watcher, &tx).await
1102    }
1103}
1104
1105struct NatsKvWriterImpl {
1106    kv: Store,
1107}
1108
1109#[async_trait]
1110impl KvWriter for NatsKvWriterImpl {
1111    async fn put(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1112        let rev = timed(self.kv.put(key, value.to_vec().into()))
1113            .await?
1114            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1115        Ok(VersionToken::from_u64(rev))
1116    }
1117
1118    async fn delete(&self, key: &str) -> Result<bool, KvError> {
1119        // NATS delete doesn't tell us if key existed, so we always return true
1120        timed(self.kv.delete(key))
1121            .await?
1122            .map_err(|e| KvError::OperationFailed(e.to_string()))?;
1123        Ok(true)
1124    }
1125
1126    async fn create(&self, key: &str, value: &[u8]) -> Result<VersionToken, KvError> {
1127        use async_nats::jetstream::kv::CreateErrorKind;
1128        timed(self.kv.create(key, value.to_vec().into()))
1129            .await?
1130            .map(VersionToken::from_u64)
1131            .map_err(|e| {
1132                if e.kind() == CreateErrorKind::AlreadyExists {
1133                    KvError::AlreadyExists
1134                } else {
1135                    KvError::OperationFailed(e.to_string())
1136                }
1137            })
1138    }
1139
1140    async fn update(
1141        &self,
1142        key: &str,
1143        value: &[u8],
1144        expected: &VersionToken,
1145    ) -> Result<VersionToken, KvError> {
1146        use async_nats::jetstream::kv::UpdateErrorKind;
1147        let rev = expected.as_u64().ok_or_else(|| {
1148            KvError::OperationFailed("invalid version token for NATS update".into())
1149        })?;
1150        timed(self.kv.update(key, value.to_vec().into(), rev))
1151            .await?
1152            .map(VersionToken::from_u64)
1153            .map_err(|e| {
1154                if e.kind() == UpdateErrorKind::WrongLastRevision {
1155                    KvError::RevisionMismatch
1156                } else {
1157                    KvError::OperationFailed(e.to_string())
1158                }
1159            })
1160    }
1161
1162    async fn delete_with_version(
1163        &self,
1164        key: &str,
1165        expected: &VersionToken,
1166    ) -> Result<bool, KvError> {
1167        use async_nats::jetstream::kv::UpdateErrorKind;
1168        let rev = expected.as_u64().ok_or_else(|| {
1169            KvError::OperationFailed("invalid version token for NATS delete".into())
1170        })?;
1171        // Write empty value with CAS — logically deletes while preserving conflict detection
1172        timed(self.kv.update(key, Vec::new().into(), rev))
1173            .await?
1174            .map(|_| true)
1175            .map_err(|e| {
1176                if e.kind() == UpdateErrorKind::WrongLastRevision {
1177                    KvError::RevisionMismatch
1178                } else {
1179                    KvError::OperationFailed(e.to_string())
1180                }
1181            })
1182    }
1183}
1184
1185impl std::fmt::Debug for NatsConnection {
1186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1187        f.debug_struct("NatsConnection")
1188            .field("url", &self.config.url)
1189            .field("healthy", &self.healthy.load(Ordering::Relaxed))
1190            .finish()
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use super::*;
1197
1198    #[test]
1199    fn raw_create_success_has_no_error() {
1200        // A successful STREAM.CREATE echoes back the stream config, no "error".
1201        let payload = br#"{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"KV_certs"}}"#;
1202        assert_eq!(
1203            classify_raw_create_response(payload),
1204            RawCreateOutcome::Created
1205        );
1206    }
1207
1208    #[test]
1209    fn raw_create_swallows_stream_already_exists() {
1210        // 10058 = stream name already in use → the bucket already exists, OK.
1211        let payload =
1212            br#"{"error":{"code":400,"err_code":10058,"description":"stream name already in use"}}"#;
1213        assert_eq!(
1214            classify_raw_create_response(payload),
1215            RawCreateOutcome::AlreadyExists
1216        );
1217    }
1218
1219    #[test]
1220    fn raw_create_swallows_stream_limit() {
1221        // Synadia Cloud returns 400 + "maximum number of streams" at the limit,
1222        // but the bucket may already exist — treat as non-fatal.
1223        let payload =
1224            br#"{"error":{"code":400,"description":"maximum number of streams reached"}}"#;
1225        assert_eq!(
1226            classify_raw_create_response(payload),
1227            RawCreateOutcome::StreamLimit
1228        );
1229    }
1230
1231    #[test]
1232    fn raw_create_propagates_unknown_error() {
1233        // Any other JetStream error is fatal and must surface code + description.
1234        let payload = br#"{"error":{"code":403,"description":"insufficient permissions"}}"#;
1235        match classify_raw_create_response(payload) {
1236            RawCreateOutcome::Failed { code, description } => {
1237                assert_eq!(code, 403);
1238                assert_eq!(description, "insufficient permissions");
1239            }
1240            other => panic!("expected Failed, got {other:?}"),
1241        }
1242    }
1243
1244    #[test]
1245    fn raw_create_400_without_stream_limit_is_fatal() {
1246        // A bare 400 that isn't the stream-limit message must NOT be swallowed,
1247        // otherwise a genuine bad-config rejection would masquerade as success.
1248        let payload = br#"{"error":{"code":400,"description":"invalid stream config"}}"#;
1249        match classify_raw_create_response(payload) {
1250            RawCreateOutcome::Failed { code, description } => {
1251                assert_eq!(code, 400);
1252                assert!(description.contains("invalid stream config"));
1253            }
1254            other => panic!("expected Failed, got {other:?}"),
1255        }
1256    }
1257
1258    #[test]
1259    fn raw_create_unparseable_payload_is_treated_as_success() {
1260        // The caller re-verifies with get_key_value, so a garbled body must not
1261        // be reported as a hard failure here.
1262        assert_eq!(
1263            classify_raw_create_response(b"not json at all"),
1264            RawCreateOutcome::Created
1265        );
1266    }
1267
1268    #[test]
1269    fn ack_subject_legacy_format() {
1270        // $JS.ACK.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1271        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1272        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1273    }
1274
1275    #[test]
1276    fn ack_subject_modern_format_with_domain_and_account() {
1277        // $JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<stream_seq>.<consumer_seq>.<ts>.<pending>
1278        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.42.7.1700000000000000000.0";
1279        assert_eq!(stream_sequence_from_ack(reply), Some(42));
1280    }
1281
1282    #[test]
1283    fn ack_subject_modern_format_with_trailing_token() {
1284        // Some servers append a random trailing token (12 tokens total).
1285        let reply = "$JS.ACK.hub.AABBCC.KV_certs.cons.1.99.7.1700000000000000000.0.rng";
1286        assert_eq!(stream_sequence_from_ack(reply), Some(99));
1287    }
1288
1289    #[test]
1290    fn ack_subject_last_token_is_not_the_sequence() {
1291        // Regression guard: the final token is num_pending, never the sequence.
1292        // The old code returned this (0), corrupting every scanned entry's version.
1293        let reply = "$JS.ACK.KV_certs.cons.1.42.7.1700000000000000000.0";
1294        assert_ne!(stream_sequence_from_ack(reply), Some(0));
1295    }
1296
1297    #[test]
1298    fn ack_subject_rejects_garbage() {
1299        assert_eq!(stream_sequence_from_ack(""), None);
1300        assert_eq!(stream_sequence_from_ack("not.an.ack.subject"), None);
1301        assert_eq!(stream_sequence_from_ack("$JS.ACK.too.few.tokens"), None);
1302        // Right shape, non-numeric sequence field.
1303        assert_eq!(stream_sequence_from_ack("$JS.ACK.s.c.1.notnum.7.0.0"), None);
1304    }
1305
1306    #[test]
1307    fn cursor_expired_matches_known_nats_error_strings() {
1308        // These substrings come from async-nats error messages. If the library
1309        // rewrites them, watch_all_from would return WatchError instead of
1310        // CursorExpired, breaking callers that fall back to watch_all() on expiry.
1311        assert!(is_cursor_expired_error(
1312            "consumer start sequence is too old"
1313        ));
1314        assert!(is_cursor_expired_error("first sequence is 42, requested 1"));
1315        assert!(is_cursor_expired_error("sequence not found in stream"));
1316        // "too old" on its own (no "sequence" wording) must still be caught.
1317        assert!(is_cursor_expired_error("requested revision is too old"));
1318        // Case-insensitive: a capitalization change upstream must not slip past.
1319        assert!(is_cursor_expired_error("Consumer Start Sequence Too Old"));
1320        assert!(!is_cursor_expired_error("connection refused"));
1321        assert!(!is_cursor_expired_error("permission denied"));
1322        assert!(!is_cursor_expired_error("stream not found"));
1323    }
1324
1325    #[test]
1326    fn raw_create_already_exists_when_10058_in_code_field() {
1327        // Some Synadia Cloud deployments echo 10058 in `code` rather than
1328        // `err_code`. Both paths must return AlreadyExists, not Failed.
1329        let payload = br#"{"error":{"code":10058,"description":"stream name already in use"}}"#;
1330        assert_eq!(
1331            classify_raw_create_response(payload),
1332            RawCreateOutcome::AlreadyExists
1333        );
1334    }
1335
1336    #[test]
1337    fn raw_create_error_without_code_defaults_to_zero() {
1338        // Defensive: a malformed error object still classifies as Failed rather
1339        // than silently passing, with code defaulting to 0.
1340        let payload = br#"{"error":{"description":"mystery"}}"#;
1341        match classify_raw_create_response(payload) {
1342            RawCreateOutcome::Failed { code, description } => {
1343                assert_eq!(code, 0);
1344                assert_eq!(description, "mystery");
1345            }
1346            other => panic!("expected Failed, got {other:?}"),
1347        }
1348    }
1349}