Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::{info, warn};
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30    BUCKET_GROUP_CONTACTS, BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_NOTIFICATIONS_READ,
31    BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
32    OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_COLLECTIONS, OBJECT_RESULT_OUTPUT,
33    OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY,
34    STREAM_NOTIFICATIONS, STREAM_OBS_EVENTS, STREAM_RESULTS,
35};
36
37/// Create-or-update an Object Store, but never let it wedge backend
38/// startup. `create_object_store` neither reconciles an existing
39/// store's config nor has a `create_or_update` form in async-nats
40/// 0.49, so a store whose desired config drifted — e.g. the #518
41/// `max_bytes` cap added after the bucket was first created uncapped,
42/// which the broker then rejects with error 10058 ("stream name
43/// already in use with a different configuration") — would otherwise
44/// fail `ensure_jetstream_resources` and crash the backend on boot
45/// (production outage 2026-06-11). Fall back to the existing store
46/// (uncapped, as it already was) and warn. #506 tracks real
47/// reconciliation of object-store config.
48async fn ensure_object_store(js: &jetstream::Context, cfg: ObjectStoreConfig) -> Result<()> {
49    let name = cfg.bucket.clone();
50    if let Err(e) = js.create_object_store(cfg).await {
51        // The fallback is deliberately broad — any create error is
52        // tolerated AS LONG AS the store already exists, because the
53        // alternative is a wedged backend and "never crash on boot"
54        // wins over "surface this specific error". The expected error
55        // is 10058 (config drift, the incident), but auth/network
56        // blips on an already-bootstrapped broker take this path too;
57        // they remain visible via the `warn!`. Only a genuine
58        // "can't create AND doesn't exist" is fatal.
59        if js.get_object_store(&name).await.is_err() {
60            return Err(e).with_context(|| {
61                format!("create_object_store {name} (and no existing store to fall back to)")
62            });
63        }
64        warn!(
65            store = %name, error = %e,
66            "object store exists with a different config; using it as-is (cap not reconciled)",
67        );
68    }
69    info!(store = %name, "ready");
70    Ok(())
71}
72
73/// Idempotently create every NATS JetStream resource the kanade
74/// fleet relies on. Calling repeatedly is safe — `create_*` returns
75/// the existing resource if it's already configured.
76///
77/// Returns once every resource is in place. The function is async
78/// so backends can `await` it as part of their startup sequence
79/// (one round-trip per resource — ~10 RTTs total).
80pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
81    // ── Streams ──────────────────────────────────────────────────
82    // #518: every stream carries a `max_bytes` cap with
83    // `Discard::Old` on top of its `max_age` window. Within their
84    // age windows the streams used to be unbounded by size, and
85    // JetStream's file store shares a disk with SQLite on the
86    // backend host — one job printing 200 KB per run fleet-wide
87    // could exhaust the store, at which point EVERY publish fails
88    // (results, obs, audit, KV puts). With the caps, worst-case
89    // degradation is "shorter history on the offending stream"
90    // instead of "broker down".
91    //
92    // Sizing: JetStream RESERVES each `max_bytes` against its
93    // available storage (min of max_file_store and free disk) at
94    // create/update time and fails with error 10047 when the sum
95    // doesn't fit, so these must stay small enough for modest
96    // hosts. That's fine: every stream here is a transport +
97    // replay buffer — the durable record is the backend's SQLite
98    // (results/inventory/obs/audit are all projected within
99    // seconds) — so the caps are runaway-output backstops, not
100    // history budgets. Total reservation ≈ 5.3 GiB including the
101    // result_output object store below.
102    const MIB: i64 = 1024 * 1024;
103    const GIB: i64 = 1024 * MIB;
104
105    // INVENTORY — 90-day rolling history (spec §2.3.1).
106    js.create_or_update_stream(StreamConfig {
107        name: STREAM_INVENTORY.into(),
108        subjects: vec!["inventory.>".into()],
109        max_age: Duration::from_secs(90 * 24 * 60 * 60),
110        max_bytes: GIB,
111        discard: DiscardPolicy::Old,
112        ..Default::default()
113    })
114    .await
115    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
116    info!(stream = STREAM_INVENTORY, "ready");
117
118    // RESULTS — 30-day rolling history. The biggest producer by
119    // far (every job run on every PC, with up to 256 KB of inline
120    // stdout/stderr per message), so it gets the largest slice of
121    // the disk budget.
122    js.create_or_update_stream(StreamConfig {
123        name: STREAM_RESULTS.into(),
124        subjects: vec!["results.>".into()],
125        max_age: Duration::from_secs(30 * 24 * 60 * 60),
126        max_bytes: 2 * GIB,
127        discard: DiscardPolicy::Old,
128        ..Default::default()
129    })
130    .await
131    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
132    info!(stream = STREAM_RESULTS, "ready");
133
134    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
135    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
136    // single backend publish lands in BOTH the agent's live core
137    // subscription AND the stream's retention store. Reconnecting
138    // agents catch up via a durable consumer with
139    // `DeliverPolicy::LastPerSubject` — they receive the most
140    // recent Command per subject they care about, no matter how
141    // long they were offline (within `max_age`).
142    js.create_or_update_stream(StreamConfig {
143        name: STREAM_EXEC.into(),
144        subjects: vec!["commands.>".into()],
145        max_messages_per_subject: 1,
146        max_age: Duration::from_secs(7 * 24 * 60 * 60),
147        // Latest-per-subject keeps this tiny (one Command per
148        // group/pc subject); the cap is a backstop against subject
149        // cardinality bugs, not a working budget.
150        max_bytes: 64 * MIB,
151        discard: DiscardPolicy::Old,
152        ..Default::default()
153    })
154    .await
155    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
156    info!(stream = STREAM_EXEC, "ready");
157
158    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
159    // 7-day window matches the EXEC spec window.
160    js.create_or_update_stream(StreamConfig {
161        name: STREAM_EVENTS.into(),
162        subjects: vec!["events.>".into()],
163        max_age: Duration::from_secs(7 * 24 * 60 * 60),
164        max_bytes: 256 * MIB,
165        discard: DiscardPolicy::Old,
166        ..Default::default()
167    })
168    .await
169    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
170    info!(stream = STREAM_EVENTS, "ready");
171
172    // AUDIT — operator-action record (spec §2.3.1). The DURABLE
173    // copy is the backend's SQLite `audit_log` table (the projector
174    // INSERTs each message, idempotently since #501; 365-day
175    // retention since #486) — the stream is transport + replay
176    // buffer, not the archive, so it can be bounded like the rest.
177    // 90 days / 512 MiB is far more than the projector ever lags;
178    // previously this stream had NO limits at all, making it an
179    // unbounded disk leak on the broker host.
180    js.create_or_update_stream(StreamConfig {
181        name: STREAM_AUDIT.into(),
182        subjects: vec!["audit.>".into()],
183        max_age: Duration::from_secs(90 * 24 * 60 * 60),
184        max_bytes: 512 * MIB,
185        discard: DiscardPolicy::Old,
186        ..Default::default()
187    })
188    .await
189    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
190    info!(stream = STREAM_AUDIT, "ready");
191
192    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
193    // 90-day window matches `obs_events` table retention so a
194    // backend bootstrapping after long downtime can catch up but
195    // doesn't carry data the table will discard anyway. Subject
196    // filter `obs.>` catches every PC without a per-PC subscription.
197    //
198    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
199    // 60` open-coded across bootstrap + cleanup; the matching prune
200    // window in `kanade-backend::cleanup` quotes the same number
201    // separately (SQLite-relative string syntax there, not a
202    // duration), so it can't share a constant — but a single
203    // arithmetic spell-out here makes the relationship grep-able.
204    const SECS_PER_DAY: u64 = 24 * 60 * 60;
205    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
206    js.create_or_update_stream(StreamConfig {
207        name: STREAM_OBS_EVENTS.into(),
208        subjects: vec!["obs.>".into()],
209        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
210        max_bytes: 512 * MIB,
211        discard: DiscardPolicy::Old,
212        ..Default::default()
213    })
214    .await
215    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
216    info!(stream = STREAM_OBS_EVENTS, "ready");
217
218    // NOTIFICATIONS — end-user notification history (SPEC §2.3.1 /
219    // Phase E). 90-day window matches INVENTORY: a Client App that
220    // connects after a notification was sent fetches the missed ones
221    // via KLP `notifications.list`. Subject filter `notifications.>`
222    // catches every fan-out target (`all` / `group.X` / `pc.Y`) with
223    // one stream. Retains all messages per subject — each notification
224    // is its own history entry, not a latest-only state like EXEC.
225    // #518: 512 MiB cap + DiscardPolicy::Old, matching the other
226    // 90-day streams (AUDIT / OBS_EVENTS) — notification payloads are
227    // small, so this is generous headroom while still bounding the
228    // broker's disk lease.
229    js.create_or_update_stream(StreamConfig {
230        name: STREAM_NOTIFICATIONS.into(),
231        subjects: vec!["notifications.>".into()],
232        max_age: Duration::from_secs(90 * 24 * 60 * 60),
233        max_bytes: 512 * MIB,
234        discard: DiscardPolicy::Old,
235        ..Default::default()
236    })
237    .await
238    .with_context(|| format!("create_or_update_stream {STREAM_NOTIFICATIONS}"))?;
239    info!(stream = STREAM_NOTIFICATIONS, "ready");
240
241    // ── KV buckets ───────────────────────────────────────────────
242    // script_current — cmd_id → version (spec §2.6 Layer 2).
243    js.create_or_update_key_value(KvConfig {
244        bucket: BUCKET_SCRIPT_CURRENT.into(),
245        history: 5,
246        ..Default::default()
247    })
248    .await
249    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
250    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
251
252    // script_status — cmd_id → ACTIVE / REVOKED.
253    js.create_or_update_key_value(KvConfig {
254        bucket: BUCKET_SCRIPT_STATUS.into(),
255        history: 5,
256        ..Default::default()
257    })
258    .await
259    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
260    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
261
262    // agents_state — pc_id → latest hw snapshot (history=1).
263    js.create_or_update_key_value(KvConfig {
264        bucket: BUCKET_AGENTS_STATE.into(),
265        history: 1,
266        ..Default::default()
267    })
268    .await
269    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
270    info!(bucket = BUCKET_AGENTS_STATE, "ready");
271
272    // agent_config — Sprint 6 layered scopes (global / groups.* /
273    // pcs.*) plus the legacy target_version key.
274    js.create_or_update_key_value(KvConfig {
275        bucket: BUCKET_AGENT_CONFIG.into(),
276        history: 5,
277        ..Default::default()
278    })
279    .await
280    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
281    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
282
283    // agent_groups — Sprint 5 per-pc group membership.
284    js.create_or_update_key_value(KvConfig {
285        bucket: BUCKET_AGENT_GROUPS.into(),
286        history: 5,
287        ..Default::default()
288    })
289    .await
290    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
291    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
292
293    // group_contacts — per-group notification email addresses
294    // (operator-managed via the SPA Groups page).
295    js.create_or_update_key_value(KvConfig {
296        bucket: BUCKET_GROUP_CONTACTS.into(),
297        history: 5,
298        ..Default::default()
299    })
300    .await
301    .with_context(|| format!("create_or_update_key_value {BUCKET_GROUP_CONTACTS}"))?;
302    info!(bucket = BUCKET_GROUP_CONTACTS, "ready");
303
304    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
305    // Backend's scheduler.rs also creates this on startup; calling
306    // twice is harmless.
307    js.create_or_update_key_value(KvConfig {
308        bucket: BUCKET_SCHEDULES.into(),
309        history: 5,
310        ..Default::default()
311    })
312    .await
313    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
314    info!(bucket = BUCKET_SCHEDULES, "ready");
315
316    // jobs — v0.15 operator-registered Manifest catalog. Schedules
317    // reference rows here by id; editing a job rewrites what future
318    // schedule fires exec.
319    js.create_or_update_key_value(KvConfig {
320        bucket: BUCKET_JOBS.into(),
321        history: 5,
322        ..Default::default()
323    })
324    .await
325    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
326    info!(bucket = BUCKET_JOBS, "ready");
327
328    // fleet_config — #418 Phase 5 fleet-wide singletons (the global
329    // change-freeze under KEY_FREEZE). history: 1 — only the current
330    // state matters; both schedulers watch it.
331    js.create_or_update_key_value(KvConfig {
332        bucket: BUCKET_FLEET_CONFIG.into(),
333        history: 1,
334        ..Default::default()
335    })
336    .await
337    .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
338    info!(bucket = BUCKET_FLEET_CONFIG, "ready");
339
340    // notifications_read — per-(pc, user, notification) read/ack state
341    // (SPEC §2.3.2 / Phase E). The agent writes here on KLP
342    // `notifications.ack`; `notifications.list` reads it back to filter
343    // the unread bucket. history: 1 — only the latest ack per key
344    // matters.
345    js.create_or_update_key_value(KvConfig {
346        bucket: BUCKET_NOTIFICATIONS_READ.into(),
347        history: 1,
348        ..Default::default()
349    })
350    .await
351    .with_context(|| format!("create_or_update_key_value {BUCKET_NOTIFICATIONS_READ}"))?;
352    info!(bucket = BUCKET_NOTIFICATIONS_READ, "ready");
353
354    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
355    // alongside the JSON catalogs above. Same key shape (manifest id
356    // / schedule id), but the value is the raw YAML bytes so the
357    // SPA's YAML editor preserves comments + script block-scalar
358    // indentation across edits. Agents/scheduler don't read these.
359    js.create_or_update_key_value(KvConfig {
360        bucket: BUCKET_JOBS_YAML.into(),
361        history: 5,
362        ..Default::default()
363    })
364    .await
365    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
366    info!(bucket = BUCKET_JOBS_YAML, "ready");
367
368    js.create_or_update_key_value(KvConfig {
369        bucket: BUCKET_SCHEDULES_YAML.into(),
370        history: 5,
371        ..Default::default()
372    })
373    .await
374    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
375    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
376
377    // ── Object Store ─────────────────────────────────────────────
378    // agent_releases — one object per version, raw exe bytes.
379    ensure_object_store(
380        js,
381        ObjectStoreConfig {
382            bucket: OBJECT_AGENT_RELEASES.into(),
383            ..Default::default()
384        },
385    )
386    .await?;
387
388    // app_packages — generic operator-uploaded binary distribution
389    // (kanade-client today; third-party installers like Webex /
390    // Teams once those flows land). Object keys are
391    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
392    // for the full rationale.
393    ensure_object_store(
394        js,
395        ObjectStoreConfig {
396            bucket: OBJECT_APP_PACKAGES.into(),
397            ..Default::default()
398        },
399    )
400    .await?;
401
402    // scripts — manifest script bodies referenced by
403    // `Execute::script_object` (SPEC §2.4.1). Sibling of
404    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
405    // the bucket-split rationale (smaller payloads + manifest-
406    // coupled lifecycle vs operator-curated installers).
407    ensure_object_store(
408        js,
409        ObjectStoreConfig {
410            bucket: OBJECT_SCRIPTS.into(),
411            ..Default::default()
412        },
413    )
414    .await?;
415
416    // result_output — overflow stdout / stderr blobs for the
417    // `ExecResult` wire kind (#227). Anything larger than the agent's
418    // 256 KB inline threshold gets uploaded here under
419    // `<request_id>/{stdout,stderr}`; the backend's results
420    // projector derefs the pointer fields before INSERT so SQLite
421    // + the SPA see the full text inline. 30-day max_age matches
422    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
423    // resolvable in execution_results never points at a missing
424    // blob.
425    // #518: capped like the streams — a job whose output overflows
426    // the inline threshold writes blobs HERE instead of
427    // STREAM_RESULTS, so without its own cap this store bypasses
428    // the stream budget entirely and can still fill the file store.
429    // The projector derefs blobs within seconds of publish, so
430    // eviction only ever hits already-projected (or expired)
431    // output.
432    ensure_object_store(
433        js,
434        ObjectStoreConfig {
435            bucket: OBJECT_RESULT_OUTPUT.into(),
436            max_age: Duration::from_secs(SECS_PER_DAY * 30),
437            max_bytes: GIB,
438            ..Default::default()
439        },
440    )
441    .await?;
442
443    // #219: collected file bundles. A `collect:` job's agent zips the
444    // script's listed files and uploads the archive here under
445    // `<pc_id>/<job_id>/<rfc3339>.zip`; the SPA Collect page lists /
446    // downloads them. 30-day max_age — bundles are debugging / audit
447    // artifacts (not curated config like app_packages / scripts), so
448    // they auto-expire and the bucket doesn't grow unbounded. Capped
449    // at 5 GiB (DiscardPolicy::Old evicts oldest first) so a fleet's
450    // worth of bundles can't fill the file store.
451    ensure_object_store(
452        js,
453        ObjectStoreConfig {
454            bucket: OBJECT_COLLECTIONS.into(),
455            max_age: Duration::from_secs(SECS_PER_DAY * 30),
456            max_bytes: 5 * GIB,
457            ..Default::default()
458        },
459    )
460    .await?;
461
462    Ok(())
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use std::process::Stdio;
469
470    /// Throwaway `nats-server -js` on a random port, like the
471    /// kv_cas_live / offline_boot harnesses. Ignored tests only.
472    struct Broker {
473        js: jetstream::Context,
474        _server: tokio::process::Child,
475        _storage: tempfile::TempDir,
476    }
477
478    async fn spawn_broker() -> Broker {
479        let port = portpicker::pick_unused_port().expect("pick port");
480        let storage = tempfile::TempDir::new().expect("storage tempdir");
481        let server = tokio::process::Command::new("nats-server")
482            .arg("-js")
483            .arg("-p")
484            .arg(port.to_string())
485            .arg("-sd")
486            .arg(storage.path())
487            .stdout(Stdio::null())
488            .stderr(Stdio::null())
489            .kill_on_drop(true)
490            .spawn()
491            .expect("spawn nats-server (is it in PATH?)");
492        let url = format!("nats://127.0.0.1:{port}");
493        let mut client = None;
494        for _ in 0..50 {
495            if let Ok(c) = async_nats::connect(&url).await {
496                client = Some(c);
497                break;
498            }
499            tokio::time::sleep(Duration::from_millis(100)).await;
500        }
501        Broker {
502            js: jetstream::new(client.expect("nats-server did not come up in 5s")),
503            _server: server,
504            _storage: storage,
505        }
506    }
507
508    /// #506 / 2026-06-11 incident: `create_object_store` neither
509    /// reconciles config nor has a create-or-update form, so adding
510    /// the #518 `max_bytes` cap to a store first created uncapped made
511    /// the broker reject the create (error 10058 "name already in use
512    /// with a different configuration") and crashed the backend on
513    /// boot. `ensure_object_store` must instead accept the existing
514    /// store and let startup proceed.
515    #[tokio::test]
516    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
517    async fn ensure_object_store_accepts_config_drift() {
518        let b = spawn_broker().await;
519        // First create: uncapped, as the pre-#518 backend did.
520        ensure_object_store(
521            &b.js,
522            ObjectStoreConfig {
523                bucket: "result_output".into(),
524                ..Default::default()
525            },
526        )
527        .await
528        .expect("fresh create");
529
530        // Second create with a conflicting config (now capped) must
531        // NOT error — it accepts the existing store.
532        ensure_object_store(
533            &b.js,
534            ObjectStoreConfig {
535                bucket: "result_output".into(),
536                max_bytes: 1024 * 1024 * 1024,
537                ..Default::default()
538            },
539        )
540        .await
541        .expect("config drift must not wedge startup");
542
543        // The store is still usable.
544        let store = b.js.get_object_store("result_output").await.expect("store");
545        store
546            .put("k", &mut &b"hi"[..])
547            .await
548            .expect("put after drift");
549    }
550
551    /// A fresh create with a cap succeeds on a broker with room (the
552    /// normal first-boot path).
553    #[tokio::test]
554    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
555    async fn ensure_object_store_fresh_create_with_cap() {
556        let b = spawn_broker().await;
557        ensure_object_store(
558            &b.js,
559            ObjectStoreConfig {
560                bucket: "fresh".into(),
561                max_bytes: 64 * 1024 * 1024,
562                ..Default::default()
563            },
564        )
565        .await
566        .expect("fresh capped create");
567        b.js.get_object_store("fresh").await.expect("exists");
568    }
569
570    /// The fatal path: when create fails for a store that ALSO does
571    /// not exist, the error must propagate (we only swallow errors we
572    /// can fall back from). An invalid bucket name fails create's
573    /// charset validation and never creates a store to fall back to.
574    #[tokio::test]
575    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
576    async fn ensure_object_store_propagates_when_no_fallback() {
577        let b = spawn_broker().await;
578        let err = ensure_object_store(
579            &b.js,
580            ObjectStoreConfig {
581                // Spaces / '!' are rejected by the object-store name
582                // rules, so create fails and get also finds nothing.
583                bucket: "bad name!".into(),
584                ..Default::default()
585            },
586        )
587        .await
588        .expect_err("a create failure with no existing store must be fatal");
589        assert!(
590            err.to_string()
591                .contains("no existing store to fall back to"),
592            "unexpected error: {err:#}",
593        );
594    }
595}