noetl_server/snowflake.rs
1//! Application-side snowflake ID generation.
2//!
3//! Per [`observability.md` Principle 3][rule], `execution_id` /
4//! `event_id` / `command_id` are generated **in the application**
5//! using this module's [`SnowflakeGenerator`], not by the DB-side
6//! `noetl.snowflake_id()` Postgres function. Reasons:
7//!
8//! 1. **Spans need the id at span-creation time.** The previous
9//! path opened tracing spans AFTER the DB round-trip; the id
10//! arriving in `result_id` after the INSERT means the span
11//! for the dispatch can't reference it. Generating in the app
12//! means the id is available before any I/O.
13//! 2. **Retries are idempotent only with a stable id.** If the DB
14//! assigns the id on first try, a network-blip retry creates a
15//! duplicate row OR a NULL id during the failure window.
16//! 3. **Cross-component publish needs the id before publish.**
17//! The orchestrator must put `execution_id` on the NATS message;
18//! if the server's INSERT generates it, the publish has to wait
19//! for the INSERT (doubling the latency).
20//! 4. **Tests need deterministic ids.** App-side generation lets
21//! tests inject a known seed; DB-side `noetl.snowflake_id()`
22//! forces a live DB even for unit tests.
23//! 5. **Sharded deployments can't agree on a single DB-side
24//! sequence.** Phase F of noetl/ai-meta#49 partitions the
25//! `noetl.event` / `noetl.command` tables by `execution_id`.
26//! If each shard's `noetl.snowflake_id()` keeps generating from
27//! its own DB, the natural assignment `shard_for(execution_id)`
28//! always picks that shard (the machine_id portion encodes
29//! the shard). This defeats the routing. App-side
30//! generation puts the machine_id under control of the
31//! deployment manifest (`NOETL_SERVER_MACHINE_ID`).
32//!
33//! [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
34//!
35//! # ID layout
36//!
37//! ```text
38//! 63 22 12 0
39//! ┌─┬─────────────────────────────────────────────┬────────┬────────┐
40//! │0│ timestamp (41 bits, ms since epoch) │ mid(10)│ seq(12)│
41//! └─┴─────────────────────────────────────────────┴────────┴────────┘
42//! ```
43//!
44//! - Sign bit always 0 (so the i64 is non-negative; matches the
45//! `bigint` column type without overflow surprises).
46//! - **Timestamp**: 41 bits of milliseconds since the NoETL epoch
47//! (`2024-01-01T00:00:00Z` UTC). 41 bits gives ~69 years before
48//! wrap-around, so this stays valid through 2093.
49//! - **Machine ID**: 10 bits, so 1024 distinct machines. For
50//! noetl-server, the env var `NOETL_SERVER_MACHINE_ID` sets it
51//! directly; if unset (local dev), it's derived from
52//! `hostname()` hashed to 10 bits.
53//! - **Sequence**: 12 bits, so 4096 ids per machine per ms.
54//! Resets when the timestamp ticks forward.
55//!
56//! Same shape as Twitter's original snowflake spec + the existing
57//! Postgres `noetl.snowflake_id()` function, so ids generated here
58//! and ids generated by the DB are mutually orderable.
59
60use std::sync::Mutex;
61use std::time::{SystemTime, UNIX_EPOCH};
62
63/// NoETL epoch in milliseconds since the Unix epoch.
64/// `2024-01-01T00:00:00Z` UTC. Picked once; never change.
65pub const NOETL_EPOCH_MS: u64 = 1_704_067_200_000;
66
67/// Number of bits reserved for the machine id.
68const MACHINE_ID_BITS: u8 = 10;
69
70/// Number of bits reserved for the sequence within a millisecond.
71const SEQUENCE_BITS: u8 = 12;
72
73/// Maximum machine id (inclusive): `2^10 - 1 = 1023`.
74pub const MAX_MACHINE_ID: u16 = (1 << MACHINE_ID_BITS) - 1;
75
76/// Maximum sequence (inclusive): `2^12 - 1 = 4095`.
77const SEQUENCE_MASK: u16 = (1 << SEQUENCE_BITS) - 1;
78
79/// Bit shift for the machine id portion.
80const MACHINE_ID_SHIFT: u8 = SEQUENCE_BITS;
81
82/// Bit shift for the timestamp portion.
83const TIMESTAMP_SHIFT: u8 = SEQUENCE_BITS + MACHINE_ID_BITS;
84
85/// Mutable state carried inside the [`SnowflakeGenerator`].
86#[derive(Debug)]
87struct State {
88 /// Last timestamp (NoETL epoch ms) we generated an id for.
89 last_timestamp: u64,
90 /// Sequence counter within the current `last_timestamp` ms.
91 /// Resets to 0 when the timestamp ticks forward.
92 sequence: u16,
93}
94
95/// Application-side snowflake ID generator.
96///
97/// Construct via [`SnowflakeGenerator::new`] with the machine id
98/// read from `NOETL_SERVER_MACHINE_ID` (see [`derive_machine_id`]).
99/// Call [`SnowflakeGenerator::generate`] to mint a fresh id.
100///
101/// Thread-safe via an internal `Mutex`; the contention window is
102/// tiny (one atomic compare + a few arithmetic ops) so this is
103/// fine for the server's expected concurrency. If a hotter
104/// approach is needed later, switch to lock-free atomics; the
105/// public API stays the same.
106#[derive(Debug)]
107pub struct SnowflakeGenerator {
108 machine_id: u16,
109 state: Mutex<State>,
110}
111
112impl SnowflakeGenerator {
113 /// Construct a generator pinned to `machine_id`.
114 ///
115 /// Returns an error if `machine_id > 1023` (10 bits) — the
116 /// caller should reject the configured `NOETL_SERVER_MACHINE_ID`
117 /// at startup rather than truncate.
118 pub fn new(machine_id: u16) -> Result<Self, SnowflakeError> {
119 if machine_id > MAX_MACHINE_ID {
120 return Err(SnowflakeError::MachineIdOutOfRange { machine_id });
121 }
122 Ok(Self {
123 machine_id,
124 state: Mutex::new(State {
125 last_timestamp: 0,
126 sequence: 0,
127 }),
128 })
129 }
130
131 /// Mint a fresh snowflake id.
132 ///
133 /// Blocks briefly only if the local sequence counter exhausts
134 /// its 12-bit space within the same millisecond (4096 ids in
135 /// one ms — the busy-wait would last sub-millisecond). Never
136 /// fails in practice; returns an error only if the system
137 /// clock falls before [`NOETL_EPOCH_MS`].
138 pub fn generate(&self) -> Result<i64, SnowflakeError> {
139 let mut state = self
140 .state
141 .lock()
142 .map_err(|_| SnowflakeError::StateLockPoisoned)?;
143
144 let mut now = current_noetl_ms()?;
145
146 // Clock went backwards on us (NTP step, machine resumed
147 // from suspend, etc.). Reuse the previous ms with a fresh
148 // sequence rather than emit a backwards-ordered id. This
149 // also covers the within-same-ms case below.
150 if now < state.last_timestamp {
151 now = state.last_timestamp;
152 }
153
154 if now == state.last_timestamp {
155 // Within the same ms — bump the sequence.
156 state.sequence = (state.sequence + 1) & SEQUENCE_MASK;
157 if state.sequence == 0 {
158 // Sequence overflowed — busy-wait until next ms.
159 now = wait_until_next_ms(state.last_timestamp)?;
160 state.last_timestamp = now;
161 }
162 } else {
163 // Timestamp ticked forward — reset sequence.
164 state.last_timestamp = now;
165 state.sequence = 0;
166 }
167
168 let id = ((now as i64) << TIMESTAMP_SHIFT)
169 | ((self.machine_id as i64) << MACHINE_ID_SHIFT)
170 | (state.sequence as i64);
171 Ok(id)
172 }
173
174 /// Return the machine id this generator is pinned to (for
175 /// diagnostics, span fields, and the startup log line).
176 pub fn machine_id(&self) -> u16 {
177 self.machine_id
178 }
179}
180
181/// Derive a 10-bit machine id from a string seed.
182///
183/// Used when `NOETL_SERVER_MACHINE_ID` is unset (local dev).
184/// Hashes the input with a stable hash (FNV-1a, hand-rolled to
185/// avoid `std::collections::hash_map::DefaultHasher`'s release-
186/// to-release instability) and masks to 10 bits.
187pub fn derive_machine_id(seed: &str) -> u16 {
188 let mut hash: u64 = 0xcbf29ce484222325;
189 for b in seed.bytes() {
190 hash ^= b as u64;
191 hash = hash.wrapping_mul(0x100000001b3);
192 }
193 (hash & MAX_MACHINE_ID as u64) as u16
194}
195
196/// Errors the snowflake generator can return.
197#[derive(Debug, thiserror::Error)]
198pub enum SnowflakeError {
199 #[error("machine_id {machine_id} exceeds 10-bit max {MAX_MACHINE_ID}")]
200 MachineIdOutOfRange { machine_id: u16 },
201
202 #[error("system clock is before NoETL epoch (2024-01-01); fix NTP")]
203 ClockBeforeEpoch,
204
205 #[error("snowflake generator state mutex was poisoned")]
206 StateLockPoisoned,
207}
208
209fn current_noetl_ms() -> Result<u64, SnowflakeError> {
210 let unix_ms = SystemTime::now()
211 .duration_since(UNIX_EPOCH)
212 .map_err(|_| SnowflakeError::ClockBeforeEpoch)?
213 .as_millis() as u64;
214 if unix_ms < NOETL_EPOCH_MS {
215 return Err(SnowflakeError::ClockBeforeEpoch);
216 }
217 Ok(unix_ms - NOETL_EPOCH_MS)
218}
219
220fn wait_until_next_ms(last: u64) -> Result<u64, SnowflakeError> {
221 loop {
222 let now = current_noetl_ms()?;
223 if now > last {
224 return Ok(now);
225 }
226 // Busy-wait: we're already within the same ms, so this
227 // loops at most a few hundred iterations before the
228 // clock ticks. std::thread::yield_now() doesn't help on
229 // a clock-bound loop.
230 std::hint::spin_loop();
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use std::collections::HashSet;
238 use std::sync::Arc;
239
240 #[test]
241 fn rejects_machine_id_above_10_bits() {
242 let err = SnowflakeGenerator::new(1024).unwrap_err();
243 match err {
244 SnowflakeError::MachineIdOutOfRange { machine_id } => {
245 assert_eq!(machine_id, 1024);
246 }
247 other => panic!("expected MachineIdOutOfRange, got {other:?}"),
248 }
249 }
250
251 #[test]
252 fn accepts_machine_id_at_max() {
253 SnowflakeGenerator::new(MAX_MACHINE_ID).expect("max machine id is valid");
254 }
255
256 #[test]
257 fn generated_id_is_non_negative_i64() {
258 // Sign bit must always be 0 — caller treats the id as
259 // `bigint` (signed) but the value is non-negative.
260 let gen = SnowflakeGenerator::new(1).unwrap();
261 for _ in 0..100 {
262 let id = gen.generate().unwrap();
263 assert!(id >= 0, "id {id} negative — sign bit leaked");
264 }
265 }
266
267 #[test]
268 fn ids_are_monotonic_within_a_single_thread() {
269 let gen = SnowflakeGenerator::new(7).unwrap();
270 let mut prev = gen.generate().unwrap();
271 for _ in 0..10_000 {
272 let id = gen.generate().unwrap();
273 assert!(
274 id > prev,
275 "ids not monotonic: prev={prev} current={id}"
276 );
277 prev = id;
278 }
279 }
280
281 #[test]
282 fn machine_id_is_preserved_in_generated_ids() {
283 let gen = SnowflakeGenerator::new(42).unwrap();
284 let id = gen.generate().unwrap();
285 // Extract the machine_id back out: shift right by SEQUENCE_BITS,
286 // then mask with MAX_MACHINE_ID.
287 let extracted = ((id >> MACHINE_ID_SHIFT) as u16) & MAX_MACHINE_ID;
288 assert_eq!(extracted, 42);
289 }
290
291 #[test]
292 fn sequence_rolls_over_after_4096_within_one_ms() {
293 // Force-generate >4096 ids back to back; verify they all
294 // come back unique even when the sequence overflows.
295 let gen = SnowflakeGenerator::new(3).unwrap();
296 let mut seen = HashSet::with_capacity(10_000);
297 for _ in 0..10_000 {
298 let id = gen.generate().unwrap();
299 assert!(seen.insert(id), "id {id} repeated — sequence overflow not handled");
300 }
301 }
302
303 #[test]
304 fn concurrent_generators_produce_unique_ids() {
305 // Stress test: 8 threads each minting 1000 ids on a shared
306 // generator. All 8000 ids must be unique.
307 let gen = Arc::new(SnowflakeGenerator::new(5).unwrap());
308 let mut handles = Vec::new();
309 for _ in 0..8 {
310 let g = gen.clone();
311 handles.push(std::thread::spawn(move || {
312 let mut local = Vec::with_capacity(1000);
313 for _ in 0..1000 {
314 local.push(g.generate().unwrap());
315 }
316 local
317 }));
318 }
319 let mut all = HashSet::new();
320 for h in handles {
321 for id in h.join().unwrap() {
322 assert!(all.insert(id), "duplicate id {id} from concurrent generators");
323 }
324 }
325 assert_eq!(all.len(), 8000);
326 }
327
328 #[test]
329 fn derive_machine_id_is_stable_for_same_input() {
330 let a = derive_machine_id("noetl-server-pod-0");
331 let b = derive_machine_id("noetl-server-pod-0");
332 assert_eq!(a, b);
333 }
334
335 #[test]
336 fn derive_machine_id_differs_for_different_inputs() {
337 let a = derive_machine_id("pod-0");
338 let b = derive_machine_id("pod-1");
339 // Not strictly guaranteed (10-bit space → 1024 buckets),
340 // but FNV-1a on these inputs gives different values.
341 assert_ne!(a, b);
342 }
343
344 #[test]
345 fn derive_machine_id_stays_within_10_bits() {
346 for s in &["", "a", "noetl-server", "very-long-hostname-with-pid-12345"] {
347 assert!(derive_machine_id(s) <= MAX_MACHINE_ID);
348 }
349 }
350}