net/adapter/dedup_state.rs
1//! Persistent producer identity for cross-process dedup.
2//!
3//! Adapters that rely on backend-side dedup keyed on
4//! `(producer_nonce, shard, sequence_start, i)` (today: JetStream's
5//! `Nats-Msg-Id` header) need a `producer_nonce` that survives
6//! process restart. Without that, a producer that crashes mid-batch
7//! and restarts gets a fresh nonce, the post-restart retry writes
8//! new msg-ids, and JetStream's dedup window can't recognize them
9//! as duplicates of the pre-crash partial — the accepted half ends
10//! up persisted twice.
11//!
12//! `PersistentProducerNonce` provides exactly that: a u64 sampled
13//! once and stored on disk. On startup, callers `load_or_create` it
14//! against a known path; the second + Nth process loads the same
15//! nonce, so retries' msg-ids match the pre-crash incarnation's.
16//! Atomic write (`tempfile + rename`) so a crash between the
17//! random-sample and the final rename leaves either no file (next
18//! load creates fresh) or the complete file — never a partial
19//! write.
20//!
21//! When the bus is configured WITHOUT a path
22//! (`EventBusConfig::producer_nonce_path = None`), the existing
23//! per-process nonce is used. That keeps the behavior of every
24//! pre-fix caller unchanged and is documented as
25//! "at-most-once-across-restarts."
26
27use std::fs;
28use std::io;
29use std::path::{Path, PathBuf};
30
31/// Wire format: `[VERSION:u8 = 1][nonce:u64 LE]` = 9 bytes.
32///
33/// The version prefix lets a future format change (e.g.
34/// HMAC-keyed nonce, extended to 16 bytes for `(epoch, nonce)`)
35/// deploy without an out-of-band file migration — loaders just
36/// match on `data[0]` and dispatch to the matching parser.
37///
38/// A raw 8-byte format with no version prefix is **not
39/// supported**: a loader against such a file will surface
40/// `InvalidData`. Operators with legacy unversioned files can
41/// simply delete the existing nonce file — the next start will
42/// create a fresh v1, with a one-time loss of cross-restart
43/// dedup that's bounded by the JetStream / Redis dedup window.
44const NONCE_FILE_LEN_V1: usize = 1 + 8;
45
46/// Version byte for the current wire format.
47const NONCE_FORMAT_V1: u8 = 1;
48
49/// Persistent u64 nonce loaded from (or created at) a stable path.
50///
51/// Callers construct via [`Self::load_or_create`] and read the value
52/// via [`Self::nonce`]. The struct itself is cheap to clone — the
53/// nonce is a `u64` and the path is a `PathBuf` retained for
54/// debugging / logging.
55#[derive(Debug, Clone)]
56pub struct PersistentProducerNonce {
57 nonce: u64,
58 #[allow(dead_code)] // retained for diagnostic output
59 path: PathBuf,
60}
61
62impl PersistentProducerNonce {
63 /// Load (or create) the persistent nonce at `path`.
64 ///
65 /// On first call: samples a fresh u64 from `getrandom`, writes
66 /// it to `path` atomically (write to `<path>.tmp`, fsync, rename
67 /// to `path`), and returns the value.
68 ///
69 /// On subsequent calls (post-restart, same path): reads the
70 /// existing 8-byte file and returns its little-endian u64.
71 ///
72 /// Errors:
73 /// - `io::ErrorKind::NotFound` if the parent directory doesn't
74 /// exist. We don't auto-create the parent — that's a
75 /// configuration decision the caller should make explicitly.
76 /// - `io::ErrorKind::InvalidData` if the file exists but has
77 /// length other than 8 bytes (corrupt or someone else's file
78 /// at this path).
79 /// - Other `io::Error` from filesystem operations.
80 pub fn load_or_create(path: impl AsRef<Path>) -> io::Result<Self> {
81 let path = path.as_ref().to_path_buf();
82
83 // Fast path: file exists.
84 match fs::read(&path) {
85 Ok(bytes) => {
86 if bytes.len() != NONCE_FILE_LEN_V1 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 format!(
90 "producer-nonce file at {} has length {} (expected {} for v1)",
91 path.display(),
92 bytes.len(),
93 NONCE_FILE_LEN_V1,
94 ),
95 ));
96 }
97 if bytes[0] != NONCE_FORMAT_V1 {
98 return Err(io::Error::new(
99 io::ErrorKind::InvalidData,
100 format!(
101 "producer-nonce file at {} has unknown version byte 0x{:02x} \
102 (expected 0x{:02x} for v1)",
103 path.display(),
104 bytes[0],
105 NONCE_FORMAT_V1,
106 ),
107 ));
108 }
109 let mut buf = [0u8; 8];
110 buf.copy_from_slice(&bytes[1..]);
111 let nonce = u64::from_le_bytes(buf);
112 Ok(Self { nonce, path })
113 }
114 Err(e) if e.kind() == io::ErrorKind::NotFound => {
115 // First-load path: sample, write atomically, return.
116 Self::create_new(path)
117 }
118 Err(e) => Err(e),
119 }
120 }
121
122 fn create_new(path: PathBuf) -> io::Result<Self> {
123 // Sample a fresh nonce. We can't depend on `getrandom` here
124 // — it's gated behind the `net` feature, but this module is
125 // unconditional (the bus uses it whether `net` is on or
126 // off, e.g. for JetStream/Redis-only deployments). Mix the
127 // same set of entropy sources `event::batch_process_nonce`
128 // uses, but DON'T share its `OnceLock` cache — distinct
129 // create_new calls in the same process must produce distinct
130 // nonces (e.g. two buses configured against different
131 // nonce paths should not silently collide). The OnceLock
132 // semantic is right for the per-process fallback nonce; it
133 // would be wrong here.
134 //
135 // The mix is identical in spirit to `batch_process_nonce`:
136 // wall-clock nanos + monotonic-clock marker + pid +
137 // ASLR-derived stack address + thread id, all hashed
138 // through xxh3. Adequate for a startup-time nonce — the
139 // collision risk we care about is two-processes-on-the-
140 // same-machine within a single nanosecond tick, which the
141 // pid + stack marker covers.
142 //
143 // Refuse `0` to keep parity with `batch_process_nonce` —
144 // some downstream consumers use 0 as a sentinel.
145 use std::hash::{Hash, Hasher};
146 use std::time::Instant;
147
148 let wall_nanos = std::time::SystemTime::now()
149 .duration_since(std::time::UNIX_EPOCH)
150 .map(|d| d.as_nanos() as u64)
151 .unwrap_or(0);
152 let mono_marker = format!("{:?}", Instant::now());
153 let pid = std::process::id() as u64;
154 let stack_local: u64 = wall_nanos;
155 let stack_marker = (&stack_local as *const u64) as usize;
156 let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
157 std::thread::current().id().hash(&mut tid_hasher);
158 let tid = tid_hasher.finish();
159
160 // Pull 16 bytes of OS-random entropy via the standard
161 // library's `RandomState`, which is itself seeded from
162 // platform-secure RNG (getrandom on Linux/macOS, BCrypt on
163 // Windows). Each `RandomState::new()` call draws a fresh
164 // SipHash key (16 bytes of OS entropy), and finishing a
165 // hasher built from that key against a fixed byte yields
166 // a 64-bit value derived from those 16 bytes — i.e. 64
167 // bits of OS-randomness folded into 64. Two independent
168 // samples gives us a full 128 bits of OS-derived entropy
169 // mixed into the nonce, on top of the existing
170 // pid/tid/wall/stack/mono inputs that are mostly
171 // predictable.
172 //
173 // Pre-fix the mix relied entirely on `(pid, tid, wall,
174 // stack_marker as usize, mono)`. On 32-bit targets
175 // `stack_marker as u64` is zero-extended from 32 bits,
176 // halving its entropy contribution; on 64-bit targets
177 // ASLR gives ~30 bits. Combined with predictable pid /
178 // wall-time, the total OS-independent entropy was
179 // ~50-60 bits — below the 64-bit nonce's stated promise.
180 // The OS-random samples below dominate the predictable
181 // sources and restore the security margin.
182 use std::hash::BuildHasher;
183 let os_entropy_a = std::collections::hash_map::RandomState::new().hash_one(0u64);
184 let os_entropy_b = std::collections::hash_map::RandomState::new().hash_one(0u64);
185
186 let mut hash_input = [0u8; 64];
187 hash_input[..8].copy_from_slice(&wall_nanos.to_le_bytes());
188 hash_input[8..16].copy_from_slice(&pid.to_le_bytes());
189 hash_input[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
190 hash_input[24..32].copy_from_slice(&tid.to_le_bytes());
191 // Trim the mono_marker slot to 16 bytes (was 32) and
192 // claim the trailing 16 bytes for the two OS-random
193 // samples. The mono marker's first 16 bytes still tie-
194 // break two same-instant calls within the same process;
195 // its longer tail was largely wall-time-correlated text
196 // that didn't add meaningful entropy.
197 let mono_bytes = mono_marker.as_bytes();
198 let n = mono_bytes.len().min(16);
199 hash_input[32..32 + n].copy_from_slice(&mono_bytes[..n]);
200 hash_input[48..56].copy_from_slice(&os_entropy_a.to_le_bytes());
201 hash_input[56..64].copy_from_slice(&os_entropy_b.to_le_bytes());
202
203 let mut nonce = xxhash_rust::xxh3::xxh3_64(&hash_input);
204 if nonce == 0 {
205 nonce = 1;
206 }
207 // v1 wire format — `[VERSION:u8 = 1][nonce:u64 LE]`.
208 // Versioning lets a future format change (HMAC-keyed nonce,
209 // 16-byte epoch+nonce, etc.) deploy without an out-of-band
210 // migration — the loader matches on length + version byte.
211 let mut buf = [0u8; NONCE_FILE_LEN_V1];
212 buf[0] = NONCE_FORMAT_V1;
213 buf[1..].copy_from_slice(&nonce.to_le_bytes());
214
215 // Atomic write: create a per-call-unique sibling tempfile,
216 // fsync it, rename over the target.
217 //
218 // Stamp the tempfile name with `pid + tid + nanos` so each
219 // caller writes to its own file. A fixed sibling like
220 // `<path>.tmp` would let concurrent first-loaders racing on
221 // the same path (two threads in one process, OR two
222 // daemons misconfigured to point at the same nonce file)
223 // interleave their writes at the OS layer and produce a
224 // corrupted 8-byte sequence, or one rename would `ENOENT`
225 // because the other already moved the tempfile, surfacing
226 // as a load_or_create failure. Last rename still wins
227 // (intended semantic — the first-loader race is rare and
228 // the cap on nonce divergence is "different per call"
229 // anyway, since each call samples fresh entropy), but each
230 // renamed file is now a complete, valid 8-byte nonce — no
231 // interleaved-write corruption.
232 let tmp_path = {
233 use std::hash::{Hash, Hasher};
234 let mut p = path.clone();
235 let mut name = p.file_name().map(|n| n.to_os_string()).unwrap_or_default();
236 let pid = std::process::id();
237 let nanos = std::time::SystemTime::now()
238 .duration_since(std::time::UNIX_EPOCH)
239 .map(|d| d.as_nanos())
240 .unwrap_or(0);
241 let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
242 std::thread::current().id().hash(&mut tid_hasher);
243 let tid = tid_hasher.finish();
244 // Mix in the freshly-sampled nonce too so the tempfile
245 // name is unique even if the wall clock tick is shared
246 // and the same thread retries (e.g., after a stale
247 // tempfile cleanup). The nonce is the load-bearing
248 // entropy source; this just borrows it for naming.
249 name.push(format!(".{pid}.{tid:x}.{nanos}.{nonce:x}.tmp"));
250 p.set_file_name(name);
251 p
252 };
253 // Pre-fix, the write/sync split was
254 // fs::write(&tmp_path, buf)?; // (a) write + close
255 // if let Ok(f) = fs::File::open(&tmp_path) {
256 // let _ = f.sync_all(); // (b) sync_all on
257 // // a read-only handle
258 // }
259 // Two distinct hazards:
260 // #40 — `let _ = f.sync_all()` swallowed disk-full / I/O
261 // errors; the producer-nonce file was reported as
262 // "saved" while still being only in the kernel
263 // page cache. A power loss between rename and the
264 // OS's own background flush left the nonce file
265 // partial / undurable, breaking cross-restart
266 // dedup on next start.
267 // #68 — On Windows, `fs::File::open(&path)` opens
268 // read-only. `File::sync_all` calls
269 // `FlushFileBuffers`, which returns
270 // `ERROR_ACCESS_DENIED` on a read-only handle —
271 // the entire fsync was a silent no-op on every
272 // Windows install.
273 //
274 // Post-fix uses a single writable handle for write+sync
275 // and propagates both errors. `OpenOptions` with
276 // `create_new(true)` matches the per-call-unique tmp_path
277 // contract.
278 //
279 // Pre-emptively remove any zombie tempfile at this exact
280 // path. The path hash mixes pid + tid + nanos + freshly-
281 // sampled nonce, so a same-named file can only be a
282 // crashed prior run of the SAME process+thread that
283 // happened to land on the identical nanos+nonce — vanishingly
284 // unlikely, but observable in practice if a system clock
285 // rewinds across a crash. Without this, `create_new` fails
286 // with `AlreadyExists` and there is no retry path; every
287 // subsequent save then errors out and the producer nonce
288 // never persists. `remove_file().ok()` is safe because no
289 // concurrent caller can be holding this exact path (the
290 // hash is unique per-call by construction).
291 let _ = fs::remove_file(&tmp_path);
292 {
293 use std::io::Write;
294 let mut f = fs::OpenOptions::new()
295 .write(true)
296 .create_new(true)
297 .open(&tmp_path)?;
298 f.write_all(&buf)?;
299 f.sync_all()?;
300 }
301 // `fs::rename` is `MoveFileEx(MOVEFILE_REPLACE_EXISTING)` on
302 // Windows / `rename(2)` on Unix — atomic replace on POSIX,
303 // best-effort on Windows. Per-call-unique source means the
304 // rename can't race against a sibling's rename (each
305 // `tmp_path` is its own file).
306 fs::rename(&tmp_path, &path)?;
307
308 Ok(Self { nonce, path })
309 }
310
311 /// The loaded (or freshly created) nonce.
312 #[inline]
313 pub fn nonce(&self) -> u64 {
314 self.nonce
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321
322 fn temp_path(suffix: &str) -> PathBuf {
323 let mut p = std::env::temp_dir();
324 // Combine pid + nanos + suffix so concurrent test runs don't
325 // collide on a shared `temp_dir()`.
326 let pid = std::process::id();
327 let nanos = std::time::SystemTime::now()
328 .duration_since(std::time::UNIX_EPOCH)
329 .map(|d| d.as_nanos())
330 .unwrap_or(0);
331 p.push(format!("net-test-nonce-{pid}-{nanos}-{suffix}"));
332 p
333 }
334
335 #[test]
336 fn first_load_creates_a_random_nonzero_nonce() {
337 let path = temp_path("first");
338 let nonce = PersistentProducerNonce::load_or_create(&path)
339 .unwrap()
340 .nonce();
341 assert_ne!(nonce, 0, "first-load must sample a nonzero nonce");
342 // Cleanup.
343 let _ = fs::remove_file(&path);
344 }
345
346 #[test]
347 fn second_load_returns_the_same_nonce() {
348 let path = temp_path("second");
349 let first = PersistentProducerNonce::load_or_create(&path)
350 .unwrap()
351 .nonce();
352 let second = PersistentProducerNonce::load_or_create(&path)
353 .unwrap()
354 .nonce();
355 assert_eq!(
356 first, second,
357 "second load against same path must return the same nonce — \
358 this is the load-bearing cross-restart property",
359 );
360 let _ = fs::remove_file(&path);
361 }
362
363 #[test]
364 fn corrupt_file_surfaces_invalid_data_error() {
365 let path = temp_path("corrupt");
366 // Write 7 bytes (one short of NONCE_FILE_LEN).
367 fs::write(&path, b"shorty!").unwrap();
368
369 let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
370 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
371 assert!(
372 err.to_string().contains("length 7"),
373 "error message should pin the actual length; got: {err}",
374 );
375 let _ = fs::remove_file(&path);
376 }
377
378 #[test]
379 fn missing_parent_directory_surfaces_not_found_error() {
380 let mut path = temp_path("missing-parent");
381 path.push("subdir-that-does-not-exist");
382 path.push("nonce");
383
384 let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
385 // Either NotFound (Unix-y) or other kinds depending on platform;
386 // we just need a clear failure rather than silent success.
387 assert!(
388 err.kind() == io::ErrorKind::NotFound
389 || err.kind() == io::ErrorKind::PermissionDenied
390 || err.kind() == io::ErrorKind::Other,
391 "expected a clear filesystem error; got {err:?}",
392 );
393 }
394
395 /// Regression: the startup nonce mix must include OS-derived
396 /// entropy (via `RandomState`-keyed hashing) on top of the
397 /// pid/tid/wall/stack/mono inputs. Pre-fix the mix relied
398 /// entirely on those predictable sources (~50-60 bits of
399 /// effective entropy on 64-bit, ~30-40 bits on 32-bit due
400 /// to `as usize` zero-extending the stack address). Two
401 /// co-located pods restarting from the same checkpoint at
402 /// the same wall-clock instant carried tighter collision
403 /// margins than the 64-bit nonce promise implied.
404 ///
405 /// The strict "two co-located pods at the same wall-clock
406 /// instant produce different nonces" property is hard to
407 /// pin in a unit test (we'd need to fake all the system
408 /// inputs identically). Instead this test pins a weaker but
409 /// observable property: rapid back-to-back `create_new`
410 /// calls in the same process — where wall_nanos is nearly
411 /// identical, pid is the same, mono_marker is nearly
412 /// identical, and tid is identical for sequential calls —
413 /// must still produce distinct nonces. Without OS entropy,
414 /// the SipHash randomization of `tid_hasher` is the only
415 /// remaining variation, and that's per-process not per-call.
416 /// With OS entropy mixed in, every call samples fresh
417 /// `RandomState` keys.
418 #[test]
419 fn back_to_back_nonces_in_same_thread_differ_via_os_entropy() {
420 // Hammer 32 nonces from one thread; with OS entropy
421 // mixed in, every one should be unique. Pre-fix this
422 // would fail because pid/tid/wall_nanos/stack_marker
423 // were nearly identical across rapid calls and the
424 // hash output collided.
425 let mut nonces = std::collections::HashSet::new();
426 for i in 0..32 {
427 let path = temp_path(&format!("os_entropy_{i}"));
428 let nonce = PersistentProducerNonce::load_or_create(&path)
429 .unwrap()
430 .nonce();
431 assert!(
432 nonces.insert(nonce),
433 "regression: back-to-back nonces must differ — same-thread \
434 same-instant calls have identical predictable inputs, so \
435 OS-random entropy is the only thing that varies. \
436 collision at i={i}, nonce={nonce}",
437 );
438 let _ = fs::remove_file(&path);
439 }
440 }
441
442 #[test]
443 fn two_distinct_paths_produce_two_distinct_nonces() {
444 let a = temp_path("a");
445 let b = temp_path("b");
446 let n_a = PersistentProducerNonce::load_or_create(&a).unwrap().nonce();
447 let n_b = PersistentProducerNonce::load_or_create(&b).unwrap().nonce();
448 assert_ne!(
449 n_a, n_b,
450 "two distinct nonce paths must produce distinct nonces (collision \
451 probability is ~2^-63 — if this fires twice, suspect getrandom)",
452 );
453 let _ = fs::remove_file(&a);
454 let _ = fs::remove_file(&b);
455 }
456
457 /// Cubic-ai P1: concurrent first-loaders against the SAME path
458 /// must not corrupt the on-disk nonce or fail startup. Pre-fix
459 /// every caller wrote to `<path>.tmp`, so two threads racing
460 /// the first-create could either:
461 /// - interleave writes at the OS layer (resulting in a
462 /// corrupted 8-byte sequence — our `from_le_bytes` would
463 /// decode garbage, or a future length check would reject),
464 /// - or one's `fs::rename` would ENOENT because the other
465 /// already moved the tempfile (surfacing as
466 /// `load_or_create` failure → `EventBus::new` failure).
467 ///
468 /// The test races N threads on a single path. Each MUST return
469 /// successfully; the resulting on-disk file MUST be exactly 8
470 /// bytes (no corruption); and a subsequent `load_or_create`
471 /// MUST decode a non-zero u64 (cross-thread last-rename-wins
472 /// stable state). Any pre-fix interleave or ENOENT would surface
473 /// as a panic in one of the threads.
474 #[test]
475 fn concurrent_first_load_does_not_corrupt_or_fail() {
476 use std::sync::Arc;
477 use std::thread;
478
479 const N: usize = 16;
480 let path = Arc::new(temp_path("concurrent-first-load"));
481
482 let barrier = Arc::new(std::sync::Barrier::new(N));
483 let mut handles = Vec::with_capacity(N);
484 for _ in 0..N {
485 let path = Arc::clone(&path);
486 let barrier = Arc::clone(&barrier);
487 handles.push(thread::spawn(move || {
488 barrier.wait();
489 // Pre-fix this could panic on `fs::rename` ENOENT
490 // when another thread already moved the shared
491 // tempfile. Post-fix every thread owns its own
492 // tempfile, so every load_or_create returns Ok.
493 PersistentProducerNonce::load_or_create(&*path)
494 .expect("concurrent first-load must succeed")
495 .nonce()
496 }));
497 }
498 let nonces: Vec<u64> = handles
499 .into_iter()
500 .map(|h| h.join().expect("worker must not panic"))
501 .collect();
502
503 // Every thread got a non-zero nonce.
504 assert!(
505 nonces.iter().all(|&n| n != 0),
506 "every concurrent first-loader must observe a non-zero nonce, \
507 got: {nonces:?}",
508 );
509
510 // CR-28: the on-disk file is exactly NONCE_FILE_LEN_V1 = 9
511 // bytes (1 version byte + 8 LE nonce bytes) — no
512 // interleaved-write corruption. (Pre-fix two threads could
513 // write to the same tempfile and the OS could split their
514 // writes mid-byte; the resulting file might be 4 + 4 bytes
515 // from different nonces.)
516 let on_disk = fs::read(&*path).expect("path must exist after concurrent first-load");
517 assert_eq!(
518 on_disk.len(),
519 NONCE_FILE_LEN_V1,
520 "on-disk nonce must be exactly {} bytes (no interleaved-write corruption)",
521 NONCE_FILE_LEN_V1,
522 );
523
524 // A subsequent load returns the nonce of whichever thread
525 // won the last rename — and it MUST equal one of the
526 // observed nonces. (If we got a value none of the threads
527 // produced, the file is corrupt.)
528 let post_load = PersistentProducerNonce::load_or_create(&*path)
529 .unwrap()
530 .nonce();
531 assert!(
532 nonces.contains(&post_load),
533 "post-load nonce {post_load:#x} must match one of the in-race \
534 samples {nonces:?} — anything else implies corruption",
535 );
536
537 let _ = fs::remove_file(&*path);
538 }
539
540 /// CR-28: legacy 8-byte (pre-versioning) files are NOT
541 /// supported. The feature shipped along with CR-28 itself, so
542 /// no production deployments of the legacy format exist;
543 /// loaders surface `InvalidData` and operators delete the
544 /// stale file to recover (next start writes a fresh v1, with
545 /// a one-time loss of cross-restart dedup bounded by the
546 /// JetStream/Redis dedup window). Pin the rejection so a
547 /// future refactor can't silently re-introduce the legacy
548 /// path.
549 #[test]
550 fn cr28_legacy_8_byte_file_is_rejected() {
551 let path = temp_path("legacy-8byte");
552 // Write 8 raw LE bytes — the pre-CR-28 wire format.
553 let stale: u64 = 0xDEAD_BEEF_CAFE_F00D;
554 fs::write(&path, stale.to_le_bytes()).unwrap();
555
556 let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
557 assert_eq!(
558 err.kind(),
559 io::ErrorKind::InvalidData,
560 "legacy 8-byte file must surface InvalidData (CR-28 dropped v0 support)"
561 );
562 assert!(
563 err.to_string().contains("length 8"),
564 "error message should pin the rejected length; got: {err}"
565 );
566 let _ = fs::remove_file(&path);
567 }
568
569 /// CR-28 v1 round-trip: the new versioned file format is
570 /// `[VERSION = 1][8 LE bytes]`. Pin the wire shape so a future
571 /// refactor can't silently break it.
572 #[test]
573 fn cr28_v1_versioned_9_byte_file_round_trip() {
574 let path = temp_path("v1-roundtrip");
575 let expected: u64 = 0xDEAD_BEEF_CAFE_F00D;
576 // Write [VERSION=1][8 LE bytes] by hand — the CR-28 wire
577 // format.
578 let mut bytes = Vec::with_capacity(9);
579 bytes.push(NONCE_FORMAT_V1);
580 bytes.extend_from_slice(&expected.to_le_bytes());
581 fs::write(&path, &bytes).unwrap();
582
583 let loaded = PersistentProducerNonce::load_or_create(&path)
584 .unwrap()
585 .nonce();
586 assert_eq!(
587 loaded, expected,
588 "CR-28: v1 file format is [VERSION=1][8 LE bytes]. Pin so a \
589 future refactor that flips byte order or drops the version \
590 byte doesn't silently produce a different nonce."
591 );
592 let _ = fs::remove_file(&path);
593 }
594
595 /// CR-28: a 9-byte file with an UNKNOWN version byte must
596 /// surface InvalidData. This is the forward-compat tripwire —
597 /// when v2 is introduced, a v2-aware reader will accept it,
598 /// but until then we refuse to silently misinterpret a v2
599 /// file as v1.
600 #[test]
601 fn cr28_unknown_version_byte_surfaces_invalid_data() {
602 let path = temp_path("v-unknown");
603 // 9 bytes with version byte = 0xFF (reserved future).
604 let mut bytes = Vec::with_capacity(9);
605 bytes.push(0xFF);
606 bytes.extend_from_slice(&0xDEAD_BEEFu64.to_le_bytes());
607 fs::write(&path, &bytes).unwrap();
608
609 let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
610 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
611 assert!(
612 err.to_string().contains("0xff") || err.to_string().contains("0xFF"),
613 "error message must name the unknown version byte; got: {err}"
614 );
615 let _ = fs::remove_file(&path);
616 }
617
618 /// CR-28: a freshly-created nonce file MUST be the v1 shape
619 /// (9 bytes, version byte = 1). Pin so a regression that
620 /// reverts to a legacy unversioned write is caught.
621 #[test]
622 fn cr28_create_new_writes_v1_format() {
623 let path = temp_path("v1-fresh");
624 let _ = PersistentProducerNonce::load_or_create(&path).unwrap();
625
626 let on_disk = fs::read(&path).unwrap();
627 assert_eq!(
628 on_disk.len(),
629 NONCE_FILE_LEN_V1,
630 "CR-28: freshly-created nonce file must be v1 (9 bytes); got {} bytes",
631 on_disk.len()
632 );
633 assert_eq!(
634 on_disk[0], NONCE_FORMAT_V1,
635 "CR-28: freshly-created nonce file must carry version byte 0x{:02x}; got 0x{:02x}",
636 NONCE_FORMAT_V1, on_disk[0]
637 );
638 let _ = fs::remove_file(&path);
639 }
640}