Skip to main content

net/adapter/
dedup_state.rs

1//! Persistent producer identity for cross-process dedup.
2//!
3//! Adapters that rely on backend-side dedup keyed on
4//! `(producer_nonce, shard, sequence_start, i)` (today: JetStream's
5//! `Nats-Msg-Id` header) need a `producer_nonce` that survives
6//! process restart. Without that, a producer that crashes mid-batch
7//! and restarts gets a fresh nonce, the post-restart retry writes
8//! new msg-ids, and JetStream's dedup window can't recognize them
9//! as duplicates of the pre-crash partial — the accepted half ends
10//! up persisted twice.
11//!
12//! `PersistentProducerNonce` provides exactly that: a u64 sampled
13//! once and stored on disk. On startup, callers `load_or_create` it
14//! against a known path; the second + Nth process loads the same
15//! nonce, so retries' msg-ids match the pre-crash incarnation's.
16//! Atomic write (`tempfile + rename`) so a crash between the
17//! random-sample and the final rename leaves either no file (next
18//! load creates fresh) or the complete file — never a partial
19//! write.
20//!
21//! When the bus is configured WITHOUT a path
22//! (`EventBusConfig::producer_nonce_path = None`), the existing
23//! per-process nonce is used. That keeps the behavior of every
24//! pre-fix caller unchanged and is documented as
25//! "at-most-once-across-restarts."
26
27use std::fs;
28use std::io;
29use std::path::{Path, PathBuf};
30
31/// Wire format: `[VERSION:u8 = 1][nonce:u64 LE]` = 9 bytes.
32///
33/// The version prefix lets a future format change (e.g.
34/// HMAC-keyed nonce, extended to 16 bytes for `(epoch, nonce)`)
35/// deploy without an out-of-band file migration — loaders just
36/// match on `data[0]` and dispatch to the matching parser.
37///
38/// A raw 8-byte format with no version prefix is **not
39/// supported**: a loader against such a file will surface
40/// `InvalidData`. Operators with legacy unversioned files can
41/// simply delete the existing nonce file — the next start will
42/// create a fresh v1, with a one-time loss of cross-restart
43/// dedup that's bounded by the JetStream / Redis dedup window.
44const NONCE_FILE_LEN_V1: usize = 1 + 8;
45
46/// Version byte for the current wire format.
47const NONCE_FORMAT_V1: u8 = 1;
48
49/// Persistent u64 nonce loaded from (or created at) a stable path.
50///
51/// Callers construct via [`Self::load_or_create`] and read the value
52/// via [`Self::nonce`]. The struct itself is cheap to clone — the
53/// nonce is a `u64` and the path is a `PathBuf` retained for
54/// debugging / logging.
55#[derive(Debug, Clone)]
56pub struct PersistentProducerNonce {
57    nonce: u64,
58    #[allow(dead_code)] // retained for diagnostic output
59    path: PathBuf,
60}
61
62impl PersistentProducerNonce {
63    /// Load (or create) the persistent nonce at `path`.
64    ///
65    /// On first call: samples a fresh u64 from `getrandom`, writes
66    /// it to `path` atomically (write to `<path>.tmp`, fsync, rename
67    /// to `path`), and returns the value.
68    ///
69    /// On subsequent calls (post-restart, same path): reads the
70    /// existing 8-byte file and returns its little-endian u64.
71    ///
72    /// Errors:
73    /// - `io::ErrorKind::NotFound` if the parent directory doesn't
74    ///   exist. We don't auto-create the parent — that's a
75    ///   configuration decision the caller should make explicitly.
76    /// - `io::ErrorKind::InvalidData` if the file exists but has
77    ///   length other than 8 bytes (corrupt or someone else's file
78    ///   at this path).
79    /// - Other `io::Error` from filesystem operations.
80    pub fn load_or_create(path: impl AsRef<Path>) -> io::Result<Self> {
81        let path = path.as_ref().to_path_buf();
82
83        // Fast path: file exists.
84        match fs::read(&path) {
85            Ok(bytes) => {
86                if bytes.len() != NONCE_FILE_LEN_V1 {
87                    return Err(io::Error::new(
88                        io::ErrorKind::InvalidData,
89                        format!(
90                            "producer-nonce file at {} has length {} (expected {} for v1)",
91                            path.display(),
92                            bytes.len(),
93                            NONCE_FILE_LEN_V1,
94                        ),
95                    ));
96                }
97                if bytes[0] != NONCE_FORMAT_V1 {
98                    return Err(io::Error::new(
99                        io::ErrorKind::InvalidData,
100                        format!(
101                            "producer-nonce file at {} has unknown version byte 0x{:02x} \
102                             (expected 0x{:02x} for v1)",
103                            path.display(),
104                            bytes[0],
105                            NONCE_FORMAT_V1,
106                        ),
107                    ));
108                }
109                let mut buf = [0u8; 8];
110                buf.copy_from_slice(&bytes[1..]);
111                let nonce = u64::from_le_bytes(buf);
112                Ok(Self { nonce, path })
113            }
114            Err(e) if e.kind() == io::ErrorKind::NotFound => {
115                // First-load path: sample, write atomically, return.
116                Self::create_new(path)
117            }
118            Err(e) => Err(e),
119        }
120    }
121
122    fn create_new(path: PathBuf) -> io::Result<Self> {
123        // Sample a fresh nonce. We can't depend on `getrandom` here
124        // — it's gated behind the `net` feature, but this module is
125        // unconditional (the bus uses it whether `net` is on or
126        // off, e.g. for JetStream/Redis-only deployments). Mix the
127        // same set of entropy sources `event::batch_process_nonce`
128        // uses, but DON'T share its `OnceLock` cache — distinct
129        // create_new calls in the same process must produce distinct
130        // nonces (e.g. two buses configured against different
131        // nonce paths should not silently collide). The OnceLock
132        // semantic is right for the per-process fallback nonce; it
133        // would be wrong here.
134        //
135        // The mix is identical in spirit to `batch_process_nonce`:
136        // wall-clock nanos + monotonic-clock marker + pid +
137        // ASLR-derived stack address + thread id, all hashed
138        // through xxh3. Adequate for a startup-time nonce — the
139        // collision risk we care about is two-processes-on-the-
140        // same-machine within a single nanosecond tick, which the
141        // pid + stack marker covers.
142        //
143        // Refuse `0` to keep parity with `batch_process_nonce` —
144        // some downstream consumers use 0 as a sentinel.
145        use std::hash::{Hash, Hasher};
146        use std::time::Instant;
147
148        let wall_nanos = std::time::SystemTime::now()
149            .duration_since(std::time::UNIX_EPOCH)
150            .map(|d| d.as_nanos() as u64)
151            .unwrap_or(0);
152        let mono_marker = format!("{:?}", Instant::now());
153        let pid = std::process::id() as u64;
154        let stack_local: u64 = wall_nanos;
155        let stack_marker = (&stack_local as *const u64) as usize;
156        let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
157        std::thread::current().id().hash(&mut tid_hasher);
158        let tid = tid_hasher.finish();
159
160        // Pull 16 bytes of OS-random entropy via the standard
161        // library's `RandomState`, which is itself seeded from
162        // platform-secure RNG (getrandom on Linux/macOS, BCrypt on
163        // Windows). Each `RandomState::new()` call draws a fresh
164        // SipHash key (16 bytes of OS entropy), and finishing a
165        // hasher built from that key against a fixed byte yields
166        // a 64-bit value derived from those 16 bytes — i.e. 64
167        // bits of OS-randomness folded into 64. Two independent
168        // samples gives us a full 128 bits of OS-derived entropy
169        // mixed into the nonce, on top of the existing
170        // pid/tid/wall/stack/mono inputs that are mostly
171        // predictable.
172        //
173        // Pre-fix the mix relied entirely on `(pid, tid, wall,
174        // stack_marker as usize, mono)`. On 32-bit targets
175        // `stack_marker as u64` is zero-extended from 32 bits,
176        // halving its entropy contribution; on 64-bit targets
177        // ASLR gives ~30 bits. Combined with predictable pid /
178        // wall-time, the total OS-independent entropy was
179        // ~50-60 bits — below the 64-bit nonce's stated promise.
180        // The OS-random samples below dominate the predictable
181        // sources and restore the security margin.
182        use std::hash::BuildHasher;
183        let os_entropy_a = std::collections::hash_map::RandomState::new().hash_one(0u64);
184        let os_entropy_b = std::collections::hash_map::RandomState::new().hash_one(0u64);
185
186        let mut hash_input = [0u8; 64];
187        hash_input[..8].copy_from_slice(&wall_nanos.to_le_bytes());
188        hash_input[8..16].copy_from_slice(&pid.to_le_bytes());
189        hash_input[16..24].copy_from_slice(&(stack_marker as u64).to_le_bytes());
190        hash_input[24..32].copy_from_slice(&tid.to_le_bytes());
191        // Trim the mono_marker slot to 16 bytes (was 32) and
192        // claim the trailing 16 bytes for the two OS-random
193        // samples. The mono marker's first 16 bytes still tie-
194        // break two same-instant calls within the same process;
195        // its longer tail was largely wall-time-correlated text
196        // that didn't add meaningful entropy.
197        let mono_bytes = mono_marker.as_bytes();
198        let n = mono_bytes.len().min(16);
199        hash_input[32..32 + n].copy_from_slice(&mono_bytes[..n]);
200        hash_input[48..56].copy_from_slice(&os_entropy_a.to_le_bytes());
201        hash_input[56..64].copy_from_slice(&os_entropy_b.to_le_bytes());
202
203        let mut nonce = xxhash_rust::xxh3::xxh3_64(&hash_input);
204        if nonce == 0 {
205            nonce = 1;
206        }
207        // v1 wire format — `[VERSION:u8 = 1][nonce:u64 LE]`.
208        // Versioning lets a future format change (HMAC-keyed nonce,
209        // 16-byte epoch+nonce, etc.) deploy without an out-of-band
210        // migration — the loader matches on length + version byte.
211        let mut buf = [0u8; NONCE_FILE_LEN_V1];
212        buf[0] = NONCE_FORMAT_V1;
213        buf[1..].copy_from_slice(&nonce.to_le_bytes());
214
215        // Atomic write: create a per-call-unique sibling tempfile,
216        // fsync it, rename over the target.
217        //
218        // Stamp the tempfile name with `pid + tid + nanos` so each
219        // caller writes to its own file. A fixed sibling like
220        // `<path>.tmp` would let concurrent first-loaders racing on
221        // the same path (two threads in one process, OR two
222        // daemons misconfigured to point at the same nonce file)
223        // interleave their writes at the OS layer and produce a
224        // corrupted 8-byte sequence, or one rename would `ENOENT`
225        // because the other already moved the tempfile, surfacing
226        // as a load_or_create failure. Last rename still wins
227        // (intended semantic — the first-loader race is rare and
228        // the cap on nonce divergence is "different per call"
229        // anyway, since each call samples fresh entropy), but each
230        // renamed file is now a complete, valid 8-byte nonce — no
231        // interleaved-write corruption.
232        let tmp_path = {
233            use std::hash::{Hash, Hasher};
234            let mut p = path.clone();
235            let mut name = p.file_name().map(|n| n.to_os_string()).unwrap_or_default();
236            let pid = std::process::id();
237            let nanos = std::time::SystemTime::now()
238                .duration_since(std::time::UNIX_EPOCH)
239                .map(|d| d.as_nanos())
240                .unwrap_or(0);
241            let mut tid_hasher = std::collections::hash_map::DefaultHasher::new();
242            std::thread::current().id().hash(&mut tid_hasher);
243            let tid = tid_hasher.finish();
244            // Mix in the freshly-sampled nonce too so the tempfile
245            // name is unique even if the wall clock tick is shared
246            // and the same thread retries (e.g., after a stale
247            // tempfile cleanup). The nonce is the load-bearing
248            // entropy source; this just borrows it for naming.
249            name.push(format!(".{pid}.{tid:x}.{nanos}.{nonce:x}.tmp"));
250            p.set_file_name(name);
251            p
252        };
253        // Pre-fix, the write/sync split was
254        //   fs::write(&tmp_path, buf)?;        // (a) write + close
255        //   if let Ok(f) = fs::File::open(&tmp_path) {
256        //       let _ = f.sync_all();          // (b) sync_all on
257        //                                      //     a read-only handle
258        //   }
259        // Two distinct hazards:
260        //   #40 — `let _ = f.sync_all()` swallowed disk-full / I/O
261        //         errors; the producer-nonce file was reported as
262        //         "saved" while still being only in the kernel
263        //         page cache. A power loss between rename and the
264        //         OS's own background flush left the nonce file
265        //         partial / undurable, breaking cross-restart
266        //         dedup on next start.
267        //   #68 — On Windows, `fs::File::open(&path)` opens
268        //         read-only. `File::sync_all` calls
269        //         `FlushFileBuffers`, which returns
270        //         `ERROR_ACCESS_DENIED` on a read-only handle —
271        //         the entire fsync was a silent no-op on every
272        //         Windows install.
273        //
274        // Post-fix uses a single writable handle for write+sync
275        // and propagates both errors. `OpenOptions` with
276        // `create_new(true)` matches the per-call-unique tmp_path
277        // contract.
278        //
279        // Pre-emptively remove any zombie tempfile at this exact
280        // path. The path hash mixes pid + tid + nanos + freshly-
281        // sampled nonce, so a same-named file can only be a
282        // crashed prior run of the SAME process+thread that
283        // happened to land on the identical nanos+nonce — vanishingly
284        // unlikely, but observable in practice if a system clock
285        // rewinds across a crash. Without this, `create_new` fails
286        // with `AlreadyExists` and there is no retry path; every
287        // subsequent save then errors out and the producer nonce
288        // never persists. `remove_file().ok()` is safe because no
289        // concurrent caller can be holding this exact path (the
290        // hash is unique per-call by construction).
291        let _ = fs::remove_file(&tmp_path);
292        {
293            use std::io::Write;
294            let mut f = fs::OpenOptions::new()
295                .write(true)
296                .create_new(true)
297                .open(&tmp_path)?;
298            f.write_all(&buf)?;
299            f.sync_all()?;
300        }
301        // `fs::rename` is `MoveFileEx(MOVEFILE_REPLACE_EXISTING)` on
302        // Windows / `rename(2)` on Unix — atomic replace on POSIX,
303        // best-effort on Windows. Per-call-unique source means the
304        // rename can't race against a sibling's rename (each
305        // `tmp_path` is its own file).
306        fs::rename(&tmp_path, &path)?;
307
308        Ok(Self { nonce, path })
309    }
310
311    /// The loaded (or freshly created) nonce.
312    #[inline]
313    pub fn nonce(&self) -> u64 {
314        self.nonce
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    fn temp_path(suffix: &str) -> PathBuf {
323        let mut p = std::env::temp_dir();
324        // Combine pid + nanos + suffix so concurrent test runs don't
325        // collide on a shared `temp_dir()`.
326        let pid = std::process::id();
327        let nanos = std::time::SystemTime::now()
328            .duration_since(std::time::UNIX_EPOCH)
329            .map(|d| d.as_nanos())
330            .unwrap_or(0);
331        p.push(format!("net-test-nonce-{pid}-{nanos}-{suffix}"));
332        p
333    }
334
335    #[test]
336    fn first_load_creates_a_random_nonzero_nonce() {
337        let path = temp_path("first");
338        let nonce = PersistentProducerNonce::load_or_create(&path)
339            .unwrap()
340            .nonce();
341        assert_ne!(nonce, 0, "first-load must sample a nonzero nonce");
342        // Cleanup.
343        let _ = fs::remove_file(&path);
344    }
345
346    #[test]
347    fn second_load_returns_the_same_nonce() {
348        let path = temp_path("second");
349        let first = PersistentProducerNonce::load_or_create(&path)
350            .unwrap()
351            .nonce();
352        let second = PersistentProducerNonce::load_or_create(&path)
353            .unwrap()
354            .nonce();
355        assert_eq!(
356            first, second,
357            "second load against same path must return the same nonce — \
358             this is the load-bearing cross-restart property",
359        );
360        let _ = fs::remove_file(&path);
361    }
362
363    #[test]
364    fn corrupt_file_surfaces_invalid_data_error() {
365        let path = temp_path("corrupt");
366        // Write 7 bytes (one short of NONCE_FILE_LEN).
367        fs::write(&path, b"shorty!").unwrap();
368
369        let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
370        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
371        assert!(
372            err.to_string().contains("length 7"),
373            "error message should pin the actual length; got: {err}",
374        );
375        let _ = fs::remove_file(&path);
376    }
377
378    #[test]
379    fn missing_parent_directory_surfaces_not_found_error() {
380        let mut path = temp_path("missing-parent");
381        path.push("subdir-that-does-not-exist");
382        path.push("nonce");
383
384        let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
385        // Either NotFound (Unix-y) or other kinds depending on platform;
386        // we just need a clear failure rather than silent success.
387        assert!(
388            err.kind() == io::ErrorKind::NotFound
389                || err.kind() == io::ErrorKind::PermissionDenied
390                || err.kind() == io::ErrorKind::Other,
391            "expected a clear filesystem error; got {err:?}",
392        );
393    }
394
395    /// Regression: the startup nonce mix must include OS-derived
396    /// entropy (via `RandomState`-keyed hashing) on top of the
397    /// pid/tid/wall/stack/mono inputs. Pre-fix the mix relied
398    /// entirely on those predictable sources (~50-60 bits of
399    /// effective entropy on 64-bit, ~30-40 bits on 32-bit due
400    /// to `as usize` zero-extending the stack address). Two
401    /// co-located pods restarting from the same checkpoint at
402    /// the same wall-clock instant carried tighter collision
403    /// margins than the 64-bit nonce promise implied.
404    ///
405    /// The strict "two co-located pods at the same wall-clock
406    /// instant produce different nonces" property is hard to
407    /// pin in a unit test (we'd need to fake all the system
408    /// inputs identically). Instead this test pins a weaker but
409    /// observable property: rapid back-to-back `create_new`
410    /// calls in the same process — where wall_nanos is nearly
411    /// identical, pid is the same, mono_marker is nearly
412    /// identical, and tid is identical for sequential calls —
413    /// must still produce distinct nonces. Without OS entropy,
414    /// the SipHash randomization of `tid_hasher` is the only
415    /// remaining variation, and that's per-process not per-call.
416    /// With OS entropy mixed in, every call samples fresh
417    /// `RandomState` keys.
418    #[test]
419    fn back_to_back_nonces_in_same_thread_differ_via_os_entropy() {
420        // Hammer 32 nonces from one thread; with OS entropy
421        // mixed in, every one should be unique. Pre-fix this
422        // would fail because pid/tid/wall_nanos/stack_marker
423        // were nearly identical across rapid calls and the
424        // hash output collided.
425        let mut nonces = std::collections::HashSet::new();
426        for i in 0..32 {
427            let path = temp_path(&format!("os_entropy_{i}"));
428            let nonce = PersistentProducerNonce::load_or_create(&path)
429                .unwrap()
430                .nonce();
431            assert!(
432                nonces.insert(nonce),
433                "regression: back-to-back nonces must differ — same-thread \
434                 same-instant calls have identical predictable inputs, so \
435                 OS-random entropy is the only thing that varies. \
436                 collision at i={i}, nonce={nonce}",
437            );
438            let _ = fs::remove_file(&path);
439        }
440    }
441
442    #[test]
443    fn two_distinct_paths_produce_two_distinct_nonces() {
444        let a = temp_path("a");
445        let b = temp_path("b");
446        let n_a = PersistentProducerNonce::load_or_create(&a).unwrap().nonce();
447        let n_b = PersistentProducerNonce::load_or_create(&b).unwrap().nonce();
448        assert_ne!(
449            n_a, n_b,
450            "two distinct nonce paths must produce distinct nonces (collision \
451             probability is ~2^-63 — if this fires twice, suspect getrandom)",
452        );
453        let _ = fs::remove_file(&a);
454        let _ = fs::remove_file(&b);
455    }
456
457    /// Cubic-ai P1: concurrent first-loaders against the SAME path
458    /// must not corrupt the on-disk nonce or fail startup. Pre-fix
459    /// every caller wrote to `<path>.tmp`, so two threads racing
460    /// the first-create could either:
461    ///   - interleave writes at the OS layer (resulting in a
462    ///     corrupted 8-byte sequence — our `from_le_bytes` would
463    ///     decode garbage, or a future length check would reject),
464    ///   - or one's `fs::rename` would ENOENT because the other
465    ///     already moved the tempfile (surfacing as
466    ///     `load_or_create` failure → `EventBus::new` failure).
467    ///
468    /// The test races N threads on a single path. Each MUST return
469    /// successfully; the resulting on-disk file MUST be exactly 8
470    /// bytes (no corruption); and a subsequent `load_or_create`
471    /// MUST decode a non-zero u64 (cross-thread last-rename-wins
472    /// stable state). Any pre-fix interleave or ENOENT would surface
473    /// as a panic in one of the threads.
474    #[test]
475    fn concurrent_first_load_does_not_corrupt_or_fail() {
476        use std::sync::Arc;
477        use std::thread;
478
479        const N: usize = 16;
480        let path = Arc::new(temp_path("concurrent-first-load"));
481
482        let barrier = Arc::new(std::sync::Barrier::new(N));
483        let mut handles = Vec::with_capacity(N);
484        for _ in 0..N {
485            let path = Arc::clone(&path);
486            let barrier = Arc::clone(&barrier);
487            handles.push(thread::spawn(move || {
488                barrier.wait();
489                // Pre-fix this could panic on `fs::rename` ENOENT
490                // when another thread already moved the shared
491                // tempfile. Post-fix every thread owns its own
492                // tempfile, so every load_or_create returns Ok.
493                PersistentProducerNonce::load_or_create(&*path)
494                    .expect("concurrent first-load must succeed")
495                    .nonce()
496            }));
497        }
498        let nonces: Vec<u64> = handles
499            .into_iter()
500            .map(|h| h.join().expect("worker must not panic"))
501            .collect();
502
503        // Every thread got a non-zero nonce.
504        assert!(
505            nonces.iter().all(|&n| n != 0),
506            "every concurrent first-loader must observe a non-zero nonce, \
507             got: {nonces:?}",
508        );
509
510        // CR-28: the on-disk file is exactly NONCE_FILE_LEN_V1 = 9
511        // bytes (1 version byte + 8 LE nonce bytes) — no
512        // interleaved-write corruption. (Pre-fix two threads could
513        // write to the same tempfile and the OS could split their
514        // writes mid-byte; the resulting file might be 4 + 4 bytes
515        // from different nonces.)
516        let on_disk = fs::read(&*path).expect("path must exist after concurrent first-load");
517        assert_eq!(
518            on_disk.len(),
519            NONCE_FILE_LEN_V1,
520            "on-disk nonce must be exactly {} bytes (no interleaved-write corruption)",
521            NONCE_FILE_LEN_V1,
522        );
523
524        // A subsequent load returns the nonce of whichever thread
525        // won the last rename — and it MUST equal one of the
526        // observed nonces. (If we got a value none of the threads
527        // produced, the file is corrupt.)
528        let post_load = PersistentProducerNonce::load_or_create(&*path)
529            .unwrap()
530            .nonce();
531        assert!(
532            nonces.contains(&post_load),
533            "post-load nonce {post_load:#x} must match one of the in-race \
534             samples {nonces:?} — anything else implies corruption",
535        );
536
537        let _ = fs::remove_file(&*path);
538    }
539
540    /// CR-28: legacy 8-byte (pre-versioning) files are NOT
541    /// supported. The feature shipped along with CR-28 itself, so
542    /// no production deployments of the legacy format exist;
543    /// loaders surface `InvalidData` and operators delete the
544    /// stale file to recover (next start writes a fresh v1, with
545    /// a one-time loss of cross-restart dedup bounded by the
546    /// JetStream/Redis dedup window). Pin the rejection so a
547    /// future refactor can't silently re-introduce the legacy
548    /// path.
549    #[test]
550    fn cr28_legacy_8_byte_file_is_rejected() {
551        let path = temp_path("legacy-8byte");
552        // Write 8 raw LE bytes — the pre-CR-28 wire format.
553        let stale: u64 = 0xDEAD_BEEF_CAFE_F00D;
554        fs::write(&path, stale.to_le_bytes()).unwrap();
555
556        let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
557        assert_eq!(
558            err.kind(),
559            io::ErrorKind::InvalidData,
560            "legacy 8-byte file must surface InvalidData (CR-28 dropped v0 support)"
561        );
562        assert!(
563            err.to_string().contains("length 8"),
564            "error message should pin the rejected length; got: {err}"
565        );
566        let _ = fs::remove_file(&path);
567    }
568
569    /// CR-28 v1 round-trip: the new versioned file format is
570    /// `[VERSION = 1][8 LE bytes]`. Pin the wire shape so a future
571    /// refactor can't silently break it.
572    #[test]
573    fn cr28_v1_versioned_9_byte_file_round_trip() {
574        let path = temp_path("v1-roundtrip");
575        let expected: u64 = 0xDEAD_BEEF_CAFE_F00D;
576        // Write [VERSION=1][8 LE bytes] by hand — the CR-28 wire
577        // format.
578        let mut bytes = Vec::with_capacity(9);
579        bytes.push(NONCE_FORMAT_V1);
580        bytes.extend_from_slice(&expected.to_le_bytes());
581        fs::write(&path, &bytes).unwrap();
582
583        let loaded = PersistentProducerNonce::load_or_create(&path)
584            .unwrap()
585            .nonce();
586        assert_eq!(
587            loaded, expected,
588            "CR-28: v1 file format is [VERSION=1][8 LE bytes]. Pin so a \
589             future refactor that flips byte order or drops the version \
590             byte doesn't silently produce a different nonce."
591        );
592        let _ = fs::remove_file(&path);
593    }
594
595    /// CR-28: a 9-byte file with an UNKNOWN version byte must
596    /// surface InvalidData. This is the forward-compat tripwire —
597    /// when v2 is introduced, a v2-aware reader will accept it,
598    /// but until then we refuse to silently misinterpret a v2
599    /// file as v1.
600    #[test]
601    fn cr28_unknown_version_byte_surfaces_invalid_data() {
602        let path = temp_path("v-unknown");
603        // 9 bytes with version byte = 0xFF (reserved future).
604        let mut bytes = Vec::with_capacity(9);
605        bytes.push(0xFF);
606        bytes.extend_from_slice(&0xDEAD_BEEFu64.to_le_bytes());
607        fs::write(&path, &bytes).unwrap();
608
609        let err = PersistentProducerNonce::load_or_create(&path).unwrap_err();
610        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
611        assert!(
612            err.to_string().contains("0xff") || err.to_string().contains("0xFF"),
613            "error message must name the unknown version byte; got: {err}"
614        );
615        let _ = fs::remove_file(&path);
616    }
617
618    /// CR-28: a freshly-created nonce file MUST be the v1 shape
619    /// (9 bytes, version byte = 1). Pin so a regression that
620    /// reverts to a legacy unversioned write is caught.
621    #[test]
622    fn cr28_create_new_writes_v1_format() {
623        let path = temp_path("v1-fresh");
624        let _ = PersistentProducerNonce::load_or_create(&path).unwrap();
625
626        let on_disk = fs::read(&path).unwrap();
627        assert_eq!(
628            on_disk.len(),
629            NONCE_FILE_LEN_V1,
630            "CR-28: freshly-created nonce file must be v1 (9 bytes); got {} bytes",
631            on_disk.len()
632        );
633        assert_eq!(
634            on_disk[0], NONCE_FORMAT_V1,
635            "CR-28: freshly-created nonce file must carry version byte 0x{:02x}; got 0x{:02x}",
636            NONCE_FORMAT_V1, on_disk[0]
637        );
638        let _ = fs::remove_file(&path);
639    }
640}