sparkplug_b/sequence.rs
1//! Sequence (`seq`) and birth/death sequence (`bdSeq`) counters, plus bdSeq
2//! persistence (spec §6.4 sequence rules).
3//!
4//! Both counters are `0..=255` and wrap `255 -> 0`; `u8::wrapping_add` gives the
5//! spec's wrap for free (ADR-6), avoiding the `== 256` sentinel of the Java/Python
6//! references.
7
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU8, Ordering};
10
11/// The payload sequence number (`seq`), reset to 0 on every (re)birth and
12/// incremented by one (mod 256) on every subsequent Edge Node message
13/// (`tck-id-payloads-sequence-num-incrementing`).
14#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
15pub struct Seq(u8);
16
17impl Seq {
18 /// A counter starting at 0 (the value an NBIRTH carries).
19 #[must_use]
20 pub const fn new() -> Self {
21 Self(0)
22 }
23
24 /// The current value (without advancing).
25 #[must_use]
26 pub const fn get(self) -> u8 {
27 self.0
28 }
29
30 /// Return the current value, then advance (wrapping `255 -> 0`). Use this
31 /// to stamp a message's `seq` and move the counter forward.
32 pub fn next_value(&mut self) -> u8 {
33 let current = self.0;
34 self.0 = self.0.wrapping_add(1);
35 current
36 }
37
38 /// Reset to 0 (on NBIRTH / rebirth).
39 pub fn reset(&mut self) {
40 self.0 = 0;
41 }
42}
43
44/// The birth/death sequence number (`bdSeq`), incremented once per MQTT CONNECT
45/// (not per message) and persisted across restarts so the Host can correlate an
46/// NDEATH with its NBIRTH (`tck-id-payloads-nbirth-bdseq*`).
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
48pub struct BdSeq(u8);
49
50impl BdSeq {
51 /// A counter starting at `start`.
52 #[must_use]
53 pub const fn new(start: u8) -> Self {
54 Self(start)
55 }
56
57 /// The current value.
58 #[must_use]
59 pub const fn get(self) -> u8 {
60 self.0
61 }
62
63 /// Advance by one (wrapping `255 -> 0`).
64 pub fn advance(&mut self) {
65 self.0 = self.0.wrapping_add(1);
66 }
67}
68
69/// Persistence for the bdSeq counter so it survives process restarts.
70///
71/// `load_next_death` returns the bdSeq value to use for the next connection's
72/// NDEATH (and matching NBIRTH); `store_next_death` persists the *next* value.
73pub trait BdSeqStore {
74 /// Load the bdSeq value to use for the next death/will (0 if none stored).
75 ///
76 /// # Errors
77 /// Returns an I/O error if a stored value exists but cannot be read/parsed.
78 fn load_next_death(&self) -> std::io::Result<u8>;
79
80 /// Persist the bdSeq value to use for the next connection.
81 ///
82 /// # Errors
83 /// Returns an I/O error if the value cannot be written.
84 fn store_next_death(&self, value: u8) -> std::io::Result<()>;
85}
86
87/// A shared reference to a store is itself a store (the methods take `&self`).
88impl<B: BdSeqStore + ?Sized> BdSeqStore for &B {
89 fn load_next_death(&self) -> std::io::Result<u8> {
90 (**self).load_next_death()
91 }
92 fn store_next_death(&self, value: u8) -> std::io::Result<()> {
93 (**self).store_next_death(value)
94 }
95}
96
97/// An in-memory bdSeq store (non-persistent; useful for tests and ephemeral nodes).
98#[derive(Debug, Default)]
99pub struct InMemoryBdSeqStore {
100 value: AtomicU8,
101}
102
103impl InMemoryBdSeqStore {
104 /// A store seeded with `start`.
105 #[must_use]
106 pub fn new(start: u8) -> Self {
107 Self {
108 value: AtomicU8::new(start),
109 }
110 }
111}
112
113impl BdSeqStore for InMemoryBdSeqStore {
114 fn load_next_death(&self) -> std::io::Result<u8> {
115 Ok(self.value.load(Ordering::SeqCst))
116 }
117
118 fn store_next_death(&self, value: u8) -> std::io::Result<()> {
119 self.value.store(value, Ordering::SeqCst);
120 Ok(())
121 }
122}
123
124/// A file-backed bdSeq store. Writes atomically (temp file + rename) to a
125/// **durable** path (unlike Tahu's OS temp dir).
126#[derive(Clone, Debug)]
127pub struct FileBdSeqStore {
128 path: PathBuf,
129}
130
131impl FileBdSeqStore {
132 /// A store backed by `path`. The parent directory must exist.
133 #[must_use]
134 pub fn new(path: impl Into<PathBuf>) -> Self {
135 Self { path: path.into() }
136 }
137}
138
139impl BdSeqStore for FileBdSeqStore {
140 fn load_next_death(&self) -> std::io::Result<u8> {
141 match std::fs::read_to_string(&self.path) {
142 Ok(s) => s.trim().parse::<u8>().map_err(|e| {
143 std::io::Error::new(
144 std::io::ErrorKind::InvalidData,
145 format!("corrupt bdSeq file {:?}: {e}", self.path),
146 )
147 }),
148 // A missing file means "never connected"; start at 0.
149 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(0),
150 Err(e) => Err(e),
151 }
152 }
153
154 fn store_next_death(&self, value: u8) -> std::io::Result<()> {
155 // Derive the temp name from the full file name plus the PID so sibling
156 // stores (or processes) never collide on a shared `.tmp` path.
157 let file_name = self
158 .path
159 .file_name()
160 .map(|n| n.to_string_lossy().into_owned())
161 .unwrap_or_else(|| "bdseq".to_owned());
162 let tmp = self
163 .path
164 .with_file_name(format!("{file_name}.tmp.{}", std::process::id()));
165 std::fs::write(&tmp, value.to_string())?;
166 std::fs::rename(&tmp, &self.path)
167 }
168}