kanade_shared/bootstrap.rs
1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::{info, warn};
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30 BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_NOTIFICATIONS_READ, BUCKET_SCHEDULES,
31 BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES,
32 OBJECT_APP_PACKAGES, OBJECT_COLLECTIONS, OBJECT_RESULT_OUTPUT, OBJECT_SCRIPTS, STREAM_AUDIT,
33 STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_NOTIFICATIONS, STREAM_OBS_EVENTS,
34 STREAM_RESULTS,
35};
36
37/// Create-or-update an Object Store, but never let it wedge backend
38/// startup. `create_object_store` neither reconciles an existing
39/// store's config nor has a `create_or_update` form in async-nats
40/// 0.49, so a store whose desired config drifted — e.g. the #518
41/// `max_bytes` cap added after the bucket was first created uncapped,
42/// which the broker then rejects with error 10058 ("stream name
43/// already in use with a different configuration") — would otherwise
44/// fail `ensure_jetstream_resources` and crash the backend on boot
45/// (production outage 2026-06-11). Fall back to the existing store
46/// (uncapped, as it already was) and warn. #506 tracks real
47/// reconciliation of object-store config.
48async fn ensure_object_store(js: &jetstream::Context, cfg: ObjectStoreConfig) -> Result<()> {
49 let name = cfg.bucket.clone();
50 if let Err(e) = js.create_object_store(cfg).await {
51 // The fallback is deliberately broad — any create error is
52 // tolerated AS LONG AS the store already exists, because the
53 // alternative is a wedged backend and "never crash on boot"
54 // wins over "surface this specific error". The expected error
55 // is 10058 (config drift, the incident), but auth/network
56 // blips on an already-bootstrapped broker take this path too;
57 // they remain visible via the `warn!`. Only a genuine
58 // "can't create AND doesn't exist" is fatal.
59 if js.get_object_store(&name).await.is_err() {
60 return Err(e).with_context(|| {
61 format!("create_object_store {name} (and no existing store to fall back to)")
62 });
63 }
64 warn!(
65 store = %name, error = %e,
66 "object store exists with a different config; using it as-is (cap not reconciled)",
67 );
68 }
69 info!(store = %name, "ready");
70 Ok(())
71}
72
73/// Idempotently create every NATS JetStream resource the kanade
74/// fleet relies on. Calling repeatedly is safe — `create_*` returns
75/// the existing resource if it's already configured.
76///
77/// Returns once every resource is in place. The function is async
78/// so backends can `await` it as part of their startup sequence
79/// (one round-trip per resource — ~10 RTTs total).
80pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
81 // ── Streams ──────────────────────────────────────────────────
82 // #518: every stream carries a `max_bytes` cap with
83 // `Discard::Old` on top of its `max_age` window. Within their
84 // age windows the streams used to be unbounded by size, and
85 // JetStream's file store shares a disk with SQLite on the
86 // backend host — one job printing 200 KB per run fleet-wide
87 // could exhaust the store, at which point EVERY publish fails
88 // (results, obs, audit, KV puts). With the caps, worst-case
89 // degradation is "shorter history on the offending stream"
90 // instead of "broker down".
91 //
92 // Sizing: JetStream RESERVES each `max_bytes` against its
93 // available storage (min of max_file_store and free disk) at
94 // create/update time and fails with error 10047 when the sum
95 // doesn't fit, so these must stay small enough for modest
96 // hosts. That's fine: every stream here is a transport +
97 // replay buffer — the durable record is the backend's SQLite
98 // (results/inventory/obs/audit are all projected within
99 // seconds) — so the caps are runaway-output backstops, not
100 // history budgets. Total reservation ≈ 5.3 GiB including the
101 // result_output object store below.
102 const MIB: i64 = 1024 * 1024;
103 const GIB: i64 = 1024 * MIB;
104
105 // INVENTORY — 90-day rolling history (spec §2.3.1).
106 js.create_or_update_stream(StreamConfig {
107 name: STREAM_INVENTORY.into(),
108 subjects: vec!["inventory.>".into()],
109 max_age: Duration::from_secs(90 * 24 * 60 * 60),
110 max_bytes: GIB,
111 discard: DiscardPolicy::Old,
112 ..Default::default()
113 })
114 .await
115 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
116 info!(stream = STREAM_INVENTORY, "ready");
117
118 // RESULTS — 30-day rolling history. The biggest producer by
119 // far (every job run on every PC, with up to 256 KB of inline
120 // stdout/stderr per message), so it gets the largest slice of
121 // the disk budget.
122 js.create_or_update_stream(StreamConfig {
123 name: STREAM_RESULTS.into(),
124 subjects: vec!["results.>".into()],
125 max_age: Duration::from_secs(30 * 24 * 60 * 60),
126 max_bytes: 2 * GIB,
127 discard: DiscardPolicy::Old,
128 ..Default::default()
129 })
130 .await
131 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
132 info!(stream = STREAM_RESULTS, "ready");
133
134 // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
135 // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
136 // single backend publish lands in BOTH the agent's live core
137 // subscription AND the stream's retention store. Reconnecting
138 // agents catch up via a durable consumer with
139 // `DeliverPolicy::LastPerSubject` — they receive the most
140 // recent Command per subject they care about, no matter how
141 // long they were offline (within `max_age`).
142 js.create_or_update_stream(StreamConfig {
143 name: STREAM_EXEC.into(),
144 subjects: vec!["commands.>".into()],
145 max_messages_per_subject: 1,
146 max_age: Duration::from_secs(7 * 24 * 60 * 60),
147 // Latest-per-subject keeps this tiny (one Command per
148 // group/pc subject); the cap is a backstop against subject
149 // cardinality bugs, not a working budget.
150 max_bytes: 64 * MIB,
151 discard: DiscardPolicy::Old,
152 ..Default::default()
153 })
154 .await
155 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
156 info!(stream = STREAM_EXEC, "ready");
157
158 // EVENTS — short-lived broadcast bus for kill / revoke / etc.
159 // 7-day window matches the EXEC spec window.
160 js.create_or_update_stream(StreamConfig {
161 name: STREAM_EVENTS.into(),
162 subjects: vec!["events.>".into()],
163 max_age: Duration::from_secs(7 * 24 * 60 * 60),
164 max_bytes: 256 * MIB,
165 discard: DiscardPolicy::Old,
166 ..Default::default()
167 })
168 .await
169 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
170 info!(stream = STREAM_EVENTS, "ready");
171
172 // AUDIT — operator-action record (spec §2.3.1). The DURABLE
173 // copy is the backend's SQLite `audit_log` table (the projector
174 // INSERTs each message, idempotently since #501; 365-day
175 // retention since #486) — the stream is transport + replay
176 // buffer, not the archive, so it can be bounded like the rest.
177 // 90 days / 512 MiB is far more than the projector ever lags;
178 // previously this stream had NO limits at all, making it an
179 // unbounded disk leak on the broker host.
180 js.create_or_update_stream(StreamConfig {
181 name: STREAM_AUDIT.into(),
182 subjects: vec!["audit.>".into()],
183 max_age: Duration::from_secs(90 * 24 * 60 * 60),
184 max_bytes: 512 * MIB,
185 discard: DiscardPolicy::Old,
186 ..Default::default()
187 })
188 .await
189 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
190 info!(stream = STREAM_AUDIT, "ready");
191
192 // OBS_EVENTS — per-PC observability timeline (Issue #246). The
193 // 90-day window matches `obs_events` table retention so a
194 // backend bootstrapping after long downtime can catch up but
195 // doesn't carry data the table will discard anyway. Subject
196 // filter `obs.>` catches every PC without a per-PC subscription.
197 //
198 // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
199 // 60` open-coded across bootstrap + cleanup; the matching prune
200 // window in `kanade-backend::cleanup` quotes the same number
201 // separately (SQLite-relative string syntax there, not a
202 // duration), so it can't share a constant — but a single
203 // arithmetic spell-out here makes the relationship grep-able.
204 const SECS_PER_DAY: u64 = 24 * 60 * 60;
205 const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
206 js.create_or_update_stream(StreamConfig {
207 name: STREAM_OBS_EVENTS.into(),
208 subjects: vec!["obs.>".into()],
209 max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
210 max_bytes: 512 * MIB,
211 discard: DiscardPolicy::Old,
212 ..Default::default()
213 })
214 .await
215 .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
216 info!(stream = STREAM_OBS_EVENTS, "ready");
217
218 // NOTIFICATIONS — end-user notification history (SPEC §2.3.1 /
219 // Phase E). 90-day window matches INVENTORY: a Client App that
220 // connects after a notification was sent fetches the missed ones
221 // via KLP `notifications.list`. Subject filter `notifications.>`
222 // catches every fan-out target (`all` / `group.X` / `pc.Y`) with
223 // one stream. Retains all messages per subject — each notification
224 // is its own history entry, not a latest-only state like EXEC.
225 // #518: 512 MiB cap + DiscardPolicy::Old, matching the other
226 // 90-day streams (AUDIT / OBS_EVENTS) — notification payloads are
227 // small, so this is generous headroom while still bounding the
228 // broker's disk lease.
229 js.create_or_update_stream(StreamConfig {
230 name: STREAM_NOTIFICATIONS.into(),
231 subjects: vec!["notifications.>".into()],
232 max_age: Duration::from_secs(90 * 24 * 60 * 60),
233 max_bytes: 512 * MIB,
234 discard: DiscardPolicy::Old,
235 ..Default::default()
236 })
237 .await
238 .with_context(|| format!("create_or_update_stream {STREAM_NOTIFICATIONS}"))?;
239 info!(stream = STREAM_NOTIFICATIONS, "ready");
240
241 // ── KV buckets ───────────────────────────────────────────────
242 // script_current — cmd_id → version (spec §2.6 Layer 2).
243 js.create_or_update_key_value(KvConfig {
244 bucket: BUCKET_SCRIPT_CURRENT.into(),
245 history: 5,
246 ..Default::default()
247 })
248 .await
249 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
250 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
251
252 // script_status — cmd_id → ACTIVE / REVOKED.
253 js.create_or_update_key_value(KvConfig {
254 bucket: BUCKET_SCRIPT_STATUS.into(),
255 history: 5,
256 ..Default::default()
257 })
258 .await
259 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
260 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
261
262 // agents_state — pc_id → latest hw snapshot (history=1).
263 js.create_or_update_key_value(KvConfig {
264 bucket: BUCKET_AGENTS_STATE.into(),
265 history: 1,
266 ..Default::default()
267 })
268 .await
269 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
270 info!(bucket = BUCKET_AGENTS_STATE, "ready");
271
272 // agent_config — Sprint 6 layered scopes (global / groups.* /
273 // pcs.*) plus the legacy target_version key.
274 js.create_or_update_key_value(KvConfig {
275 bucket: BUCKET_AGENT_CONFIG.into(),
276 history: 5,
277 ..Default::default()
278 })
279 .await
280 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
281 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
282
283 // agent_groups — Sprint 5 per-pc group membership.
284 js.create_or_update_key_value(KvConfig {
285 bucket: BUCKET_AGENT_GROUPS.into(),
286 history: 5,
287 ..Default::default()
288 })
289 .await
290 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
291 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
292
293 // schedules — admin-API CRUD'd cron table (spec §2.5.3).
294 // Backend's scheduler.rs also creates this on startup; calling
295 // twice is harmless.
296 js.create_or_update_key_value(KvConfig {
297 bucket: BUCKET_SCHEDULES.into(),
298 history: 5,
299 ..Default::default()
300 })
301 .await
302 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
303 info!(bucket = BUCKET_SCHEDULES, "ready");
304
305 // jobs — v0.15 operator-registered Manifest catalog. Schedules
306 // reference rows here by id; editing a job rewrites what future
307 // schedule fires exec.
308 js.create_or_update_key_value(KvConfig {
309 bucket: BUCKET_JOBS.into(),
310 history: 5,
311 ..Default::default()
312 })
313 .await
314 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
315 info!(bucket = BUCKET_JOBS, "ready");
316
317 // fleet_config — #418 Phase 5 fleet-wide singletons (the global
318 // change-freeze under KEY_FREEZE). history: 1 — only the current
319 // state matters; both schedulers watch it.
320 js.create_or_update_key_value(KvConfig {
321 bucket: BUCKET_FLEET_CONFIG.into(),
322 history: 1,
323 ..Default::default()
324 })
325 .await
326 .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
327 info!(bucket = BUCKET_FLEET_CONFIG, "ready");
328
329 // notifications_read — per-(pc, user, notification) read/ack state
330 // (SPEC §2.3.2 / Phase E). The agent writes here on KLP
331 // `notifications.ack`; `notifications.list` reads it back to filter
332 // the unread bucket. history: 1 — only the latest ack per key
333 // matters.
334 js.create_or_update_key_value(KvConfig {
335 bucket: BUCKET_NOTIFICATIONS_READ.into(),
336 history: 1,
337 ..Default::default()
338 })
339 .await
340 .with_context(|| format!("create_or_update_key_value {BUCKET_NOTIFICATIONS_READ}"))?;
341 info!(bucket = BUCKET_NOTIFICATIONS_READ, "ready");
342
343 // jobs_yaml / schedules_yaml — operator source-of-truth YAML
344 // alongside the JSON catalogs above. Same key shape (manifest id
345 // / schedule id), but the value is the raw YAML bytes so the
346 // SPA's YAML editor preserves comments + script block-scalar
347 // indentation across edits. Agents/scheduler don't read these.
348 js.create_or_update_key_value(KvConfig {
349 bucket: BUCKET_JOBS_YAML.into(),
350 history: 5,
351 ..Default::default()
352 })
353 .await
354 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
355 info!(bucket = BUCKET_JOBS_YAML, "ready");
356
357 js.create_or_update_key_value(KvConfig {
358 bucket: BUCKET_SCHEDULES_YAML.into(),
359 history: 5,
360 ..Default::default()
361 })
362 .await
363 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
364 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
365
366 // ── Object Store ─────────────────────────────────────────────
367 // agent_releases — one object per version, raw exe bytes.
368 ensure_object_store(
369 js,
370 ObjectStoreConfig {
371 bucket: OBJECT_AGENT_RELEASES.into(),
372 ..Default::default()
373 },
374 )
375 .await?;
376
377 // app_packages — generic operator-uploaded binary distribution
378 // (kanade-client today; third-party installers like Webex /
379 // Teams once those flows land). Object keys are
380 // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
381 // for the full rationale.
382 ensure_object_store(
383 js,
384 ObjectStoreConfig {
385 bucket: OBJECT_APP_PACKAGES.into(),
386 ..Default::default()
387 },
388 )
389 .await?;
390
391 // scripts — manifest script bodies referenced by
392 // `Execute::script_object` (SPEC §2.4.1). Sibling of
393 // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
394 // the bucket-split rationale (smaller payloads + manifest-
395 // coupled lifecycle vs operator-curated installers).
396 ensure_object_store(
397 js,
398 ObjectStoreConfig {
399 bucket: OBJECT_SCRIPTS.into(),
400 ..Default::default()
401 },
402 )
403 .await?;
404
405 // result_output — overflow stdout / stderr blobs for the
406 // `ExecResult` wire kind (#227). Anything larger than the agent's
407 // 256 KB inline threshold gets uploaded here under
408 // `<request_id>/{stdout,stderr}`; the backend's results
409 // projector derefs the pointer fields before INSERT so SQLite
410 // + the SPA see the full text inline. 30-day max_age matches
411 // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
412 // resolvable in execution_results never points at a missing
413 // blob.
414 // #518: capped like the streams — a job whose output overflows
415 // the inline threshold writes blobs HERE instead of
416 // STREAM_RESULTS, so without its own cap this store bypasses
417 // the stream budget entirely and can still fill the file store.
418 // The projector derefs blobs within seconds of publish, so
419 // eviction only ever hits already-projected (or expired)
420 // output.
421 ensure_object_store(
422 js,
423 ObjectStoreConfig {
424 bucket: OBJECT_RESULT_OUTPUT.into(),
425 max_age: Duration::from_secs(SECS_PER_DAY * 30),
426 max_bytes: GIB,
427 ..Default::default()
428 },
429 )
430 .await?;
431
432 // #219: collected file bundles. A `collect:` job's agent zips the
433 // script's listed files and uploads the archive here under
434 // `<pc_id>/<job_id>/<rfc3339>.zip`; the SPA Collect page lists /
435 // downloads them. 30-day max_age — bundles are debugging / audit
436 // artifacts (not curated config like app_packages / scripts), so
437 // they auto-expire and the bucket doesn't grow unbounded. Capped
438 // at 5 GiB (DiscardPolicy::Old evicts oldest first) so a fleet's
439 // worth of bundles can't fill the file store.
440 ensure_object_store(
441 js,
442 ObjectStoreConfig {
443 bucket: OBJECT_COLLECTIONS.into(),
444 max_age: Duration::from_secs(SECS_PER_DAY * 30),
445 max_bytes: 5 * GIB,
446 ..Default::default()
447 },
448 )
449 .await?;
450
451 Ok(())
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::process::Stdio;
458
459 /// Throwaway `nats-server -js` on a random port, like the
460 /// kv_cas_live / offline_boot harnesses. Ignored tests only.
461 struct Broker {
462 js: jetstream::Context,
463 _server: tokio::process::Child,
464 _storage: tempfile::TempDir,
465 }
466
467 async fn spawn_broker() -> Broker {
468 let port = portpicker::pick_unused_port().expect("pick port");
469 let storage = tempfile::TempDir::new().expect("storage tempdir");
470 let server = tokio::process::Command::new("nats-server")
471 .arg("-js")
472 .arg("-p")
473 .arg(port.to_string())
474 .arg("-sd")
475 .arg(storage.path())
476 .stdout(Stdio::null())
477 .stderr(Stdio::null())
478 .kill_on_drop(true)
479 .spawn()
480 .expect("spawn nats-server (is it in PATH?)");
481 let url = format!("nats://127.0.0.1:{port}");
482 let mut client = None;
483 for _ in 0..50 {
484 if let Ok(c) = async_nats::connect(&url).await {
485 client = Some(c);
486 break;
487 }
488 tokio::time::sleep(Duration::from_millis(100)).await;
489 }
490 Broker {
491 js: jetstream::new(client.expect("nats-server did not come up in 5s")),
492 _server: server,
493 _storage: storage,
494 }
495 }
496
497 /// #506 / 2026-06-11 incident: `create_object_store` neither
498 /// reconciles config nor has a create-or-update form, so adding
499 /// the #518 `max_bytes` cap to a store first created uncapped made
500 /// the broker reject the create (error 10058 "name already in use
501 /// with a different configuration") and crashed the backend on
502 /// boot. `ensure_object_store` must instead accept the existing
503 /// store and let startup proceed.
504 #[tokio::test]
505 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
506 async fn ensure_object_store_accepts_config_drift() {
507 let b = spawn_broker().await;
508 // First create: uncapped, as the pre-#518 backend did.
509 ensure_object_store(
510 &b.js,
511 ObjectStoreConfig {
512 bucket: "result_output".into(),
513 ..Default::default()
514 },
515 )
516 .await
517 .expect("fresh create");
518
519 // Second create with a conflicting config (now capped) must
520 // NOT error — it accepts the existing store.
521 ensure_object_store(
522 &b.js,
523 ObjectStoreConfig {
524 bucket: "result_output".into(),
525 max_bytes: 1024 * 1024 * 1024,
526 ..Default::default()
527 },
528 )
529 .await
530 .expect("config drift must not wedge startup");
531
532 // The store is still usable.
533 let store = b.js.get_object_store("result_output").await.expect("store");
534 store
535 .put("k", &mut &b"hi"[..])
536 .await
537 .expect("put after drift");
538 }
539
540 /// A fresh create with a cap succeeds on a broker with room (the
541 /// normal first-boot path).
542 #[tokio::test]
543 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
544 async fn ensure_object_store_fresh_create_with_cap() {
545 let b = spawn_broker().await;
546 ensure_object_store(
547 &b.js,
548 ObjectStoreConfig {
549 bucket: "fresh".into(),
550 max_bytes: 64 * 1024 * 1024,
551 ..Default::default()
552 },
553 )
554 .await
555 .expect("fresh capped create");
556 b.js.get_object_store("fresh").await.expect("exists");
557 }
558
559 /// The fatal path: when create fails for a store that ALSO does
560 /// not exist, the error must propagate (we only swallow errors we
561 /// can fall back from). An invalid bucket name fails create's
562 /// charset validation and never creates a store to fall back to.
563 #[tokio::test]
564 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
565 async fn ensure_object_store_propagates_when_no_fallback() {
566 let b = spawn_broker().await;
567 let err = ensure_object_store(
568 &b.js,
569 ObjectStoreConfig {
570 // Spaces / '!' are rejected by the object-store name
571 // rules, so create fails and get also finds nothing.
572 bucket: "bad name!".into(),
573 ..Default::default()
574 },
575 )
576 .await
577 .expect_err("a create failure with no existing store must be fatal");
578 assert!(
579 err.to_string()
580 .contains("no existing store to fall back to"),
581 "unexpected error: {err:#}",
582 );
583 }
584}