hdm_am/seq.rs
1//! Request sequence-number provider (spec §4.4.5).
2//!
3//! The HDM enforces strictly-increasing sequence numbers per session as an anti-replay defence.
4//! The crate models this as a trait so consumers can plug in their own persistence backend
5//! (`Redis`, `SQLite`, etc.); a basic in-memory and atomic file-backed implementation ship in-tree.
6//!
7//! # Choosing an implementation
8//!
9//! - For single-process, single-session use, `InMemorySeq` is sufficient.
10//! - For agents that must survive restarts without colliding with a previously-sent number,
11//! use `FileSeq`. Cost: one atomic file write per operation.
12//! - For multi-process / clustered consumers, implement [`SequenceProvider`] yourself against
13//! your shared store. Take an exclusive lock around the increment-and-persist sequence.
14
15use std::fs;
16use std::io;
17use std::io::Write;
18use std::path::{Path, PathBuf};
19
20/// Provider of monotonically-increasing sequence numbers for HDM requests.
21pub trait SequenceProvider: Send {
22 /// Advance and return the next sequence number.
23 ///
24 /// The returned value must be strictly greater than every previously-returned value across
25 /// the lifetime of the provider. Successive calls need not produce contiguous values;
26 /// "wasted" numbers caused by client-side failures are acceptable.
27 ///
28 /// # Errors
29 /// Implementations may surface I/O errors from their persistence backend. The default
30 /// in-memory implementation never fails.
31 fn next(&mut self) -> Result<i64, io::Error>;
32}
33
34/// In-memory sequence counter.
35///
36/// Resets on process restart — use only when the consumer either performs a fresh `login`
37/// after restart (which establishes a new HDM session) or doesn't care about cross-restart
38/// continuity.
39#[derive(Debug)]
40pub struct InMemorySeq {
41 current: i64,
42}
43
44impl InMemorySeq {
45 /// Start the counter so that the first call to [`Self::next`] returns `initial + 1`.
46 #[must_use]
47 pub const fn starting_at(initial: i64) -> Self {
48 Self { current: initial }
49 }
50
51 /// Start the counter so that the first call to [`Self::next`] returns `1`.
52 #[must_use]
53 pub const fn from_zero() -> Self {
54 Self::starting_at(0)
55 }
56}
57
58impl Default for InMemorySeq {
59 fn default() -> Self {
60 Self::from_zero()
61 }
62}
63
64impl SequenceProvider for InMemorySeq {
65 fn next(&mut self) -> Result<i64, io::Error> {
66 self.current = self
67 .current
68 .checked_add(1)
69 .ok_or_else(|| io::Error::other("sequence counter overflowed i64"))?;
70 Ok(self.current)
71 }
72}
73
74/// File-backed sequence counter that survives restarts, including power loss.
75///
76/// Each write goes to a sibling temp file that is `fsync`'d, atomically renamed over the target,
77/// and (on Unix) followed by an `fsync` of the parent directory so the rename itself is durable.
78/// This matters because the counter is a fiscal anti-replay guard — a value that silently reverted
79/// after a crash would re-issue a sequence number the device already saw (rejected as
80/// `BadSequenceNumber`).
81///
82/// The file's content is a single decimal integer (UTF-8, optionally followed by whitespace).
83/// On open, missing files initialise to `0`; corrupted contents are surfaced as
84/// [`io::ErrorKind::InvalidData`] rather than silently reset.
85///
86/// **This is not safe to share between processes.** The crate does not take an OS-level file
87/// lock. Consumers running multiple processes against the same counter file must coordinate
88/// externally.
89#[derive(Debug)]
90pub struct FileSeq {
91 path: PathBuf,
92 current: i64,
93}
94
95impl FileSeq {
96 /// Open or create a counter file. If the file exists, its contents are parsed as the last-
97 /// returned value; otherwise the counter starts at zero.
98 ///
99 /// # Errors
100 /// Surfaces filesystem failures and parsing failures (corrupted file contents).
101 pub fn open_or_create(path: impl AsRef<Path>) -> io::Result<Self> {
102 let path = path.as_ref().to_path_buf();
103 let current = if path.exists() {
104 let raw = fs::read_to_string(&path)?;
105 raw.trim().parse::<i64>().map_err(|err| {
106 io::Error::new(
107 io::ErrorKind::InvalidData,
108 format!("seq counter file is corrupted: {err}"),
109 )
110 })?
111 } else {
112 0
113 };
114 Ok(Self { path, current })
115 }
116
117 /// Most-recently-issued value (without advancing the counter).
118 #[must_use]
119 pub const fn current(&self) -> i64 {
120 self.current
121 }
122
123 fn persist(&self) -> io::Result<()> {
124 // Append ".tmp" to the *full* file name (not `with_extension`, which would strip a real
125 // extension and could collide, e.g. `seq.dat` and `seq.bak` both -> `seq.seq.tmp`).
126 let mut tmp = self.path.clone().into_os_string();
127 tmp.push(".tmp");
128 let tmp = PathBuf::from(tmp);
129
130 // Write + fsync the data before swapping it in.
131 {
132 let mut file = fs::File::create(&tmp)?;
133 file.write_all(self.current.to_string().as_bytes())?;
134 file.sync_all()?;
135 }
136 fs::rename(&tmp, &self.path)?;
137
138 // fsync the parent directory so the rename survives a crash. Directory fsync is a Unix
139 // facility; on other platforms the file fsync + atomic rename is the best portable effort.
140 #[cfg(unix)]
141 {
142 let dir = self
143 .path
144 .parent()
145 .filter(|p| !p.as_os_str().is_empty())
146 .unwrap_or_else(|| Path::new("."));
147 fs::File::open(dir)?.sync_all()?;
148 }
149 Ok(())
150 }
151}
152
153impl SequenceProvider for FileSeq {
154 fn next(&mut self) -> io::Result<i64> {
155 let next = self
156 .current
157 .checked_add(1)
158 .ok_or_else(|| io::Error::other("sequence counter overflowed i64"))?;
159 self.current = next;
160 self.persist()?;
161 Ok(next)
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use std::env;
169 use std::path::PathBuf;
170
171 /// Unique temp path for each test run — uses thread ID + nanos to avoid collisions when
172 /// `cargo test` runs in parallel.
173 fn unique_tempfile(suffix: &str) -> PathBuf {
174 let nanos = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)
176 .map(|d| d.as_nanos())
177 .unwrap_or_default();
178 let tid = std::thread::current().id();
179 env::temp_dir().join(format!("hdm-am-seq-test-{tid:?}-{nanos}-{suffix}"))
180 }
181
182 /// In-memory: first call returns `initial + 1`, then increments monotonically.
183 #[test]
184 fn in_memory_seq_starts_at_initial_plus_one() {
185 let mut seq = InMemorySeq::starting_at(42);
186 assert_eq!(seq.next().unwrap(), 43);
187 assert_eq!(seq.next().unwrap(), 44);
188 assert_eq!(seq.next().unwrap(), 45);
189 }
190
191 /// In-memory default starts at 0 → first next is 1.
192 #[test]
193 fn in_memory_seq_default_starts_at_one() {
194 let mut seq = InMemorySeq::default();
195 assert_eq!(seq.next().unwrap(), 1);
196 }
197
198 /// Overflow at `i64::MAX` surfaces as `io::Error`, not a silent wrap or panic.
199 #[test]
200 fn in_memory_seq_overflows_safely() {
201 let mut seq = InMemorySeq::starting_at(i64::MAX);
202 let err = seq.next().expect_err("expected overflow");
203 assert_eq!(err.kind(), io::ErrorKind::Other);
204 }
205
206 /// File-backed: persists the counter across re-opens.
207 #[test]
208 fn file_seq_persists_across_reopen() {
209 let path = unique_tempfile("persist");
210 let _ = fs::remove_file(&path);
211
212 {
213 let mut seq = FileSeq::open_or_create(&path).unwrap();
214 assert_eq!(seq.next().unwrap(), 1);
215 assert_eq!(seq.next().unwrap(), 2);
216 assert_eq!(seq.next().unwrap(), 3);
217 }
218
219 // Re-open: counter must continue from 3, not reset.
220 {
221 let mut seq = FileSeq::open_or_create(&path).unwrap();
222 assert_eq!(seq.current(), 3);
223 assert_eq!(seq.next().unwrap(), 4);
224 }
225
226 let _ = fs::remove_file(&path);
227 }
228
229 /// File-backed: missing file starts the counter at zero (first next = 1).
230 #[test]
231 fn file_seq_starts_at_zero_when_missing() {
232 let path = unique_tempfile("missing");
233 let _ = fs::remove_file(&path);
234
235 let mut seq = FileSeq::open_or_create(&path).unwrap();
236 assert_eq!(seq.current(), 0);
237 assert_eq!(seq.next().unwrap(), 1);
238
239 let _ = fs::remove_file(&path);
240 }
241
242 /// Corrupted file contents surface as `InvalidData` — never a silent reset.
243 #[test]
244 fn file_seq_refuses_to_open_corrupted_file() {
245 let path = unique_tempfile("corrupt");
246 fs::write(&path, "this is not a number").unwrap();
247
248 let err = FileSeq::open_or_create(&path).expect_err("expected parse error");
249 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
250
251 let _ = fs::remove_file(&path);
252 }
253
254 /// File-backed: after `next()`, the on-disk content reflects the new value.
255 #[test]
256 fn file_seq_persists_to_disk_each_call() {
257 let path = unique_tempfile("disk");
258 let _ = fs::remove_file(&path);
259
260 let mut seq = FileSeq::open_or_create(&path).unwrap();
261 seq.next().unwrap();
262 seq.next().unwrap();
263 seq.next().unwrap();
264
265 let raw = fs::read_to_string(&path).unwrap();
266 assert_eq!(raw.trim(), "3");
267
268 let _ = fs::remove_file(&path);
269 }
270}