Skip to main content

noetl_server/
snowflake.rs

1//! Application-side snowflake ID generation.
2//!
3//! Per [`observability.md` Principle 3][rule], `execution_id` /
4//! `event_id` / `command_id` are generated **in the application**
5//! using this module's [`SnowflakeGenerator`], not by the DB-side
6//! `noetl.snowflake_id()` Postgres function.  Reasons:
7//!
8//! 1. **Spans need the id at span-creation time.**  The previous
9//!    path opened tracing spans AFTER the DB round-trip; the id
10//!    arriving in `result_id` after the INSERT means the span
11//!    for the dispatch can't reference it.  Generating in the app
12//!    means the id is available before any I/O.
13//! 2. **Retries are idempotent only with a stable id.**  If the DB
14//!    assigns the id on first try, a network-blip retry creates a
15//!    duplicate row OR a NULL id during the failure window.
16//! 3. **Cross-component publish needs the id before publish.**
17//!    The orchestrator must put `execution_id` on the NATS message;
18//!    if the server's INSERT generates it, the publish has to wait
19//!    for the INSERT (doubling the latency).
20//! 4. **Tests need deterministic ids.**  App-side generation lets
21//!    tests inject a known seed; DB-side `noetl.snowflake_id()`
22//!    forces a live DB even for unit tests.
23//! 5. **Sharded deployments can't agree on a single DB-side
24//!    sequence.**  Phase F of noetl/ai-meta#49 partitions the
25//!    `noetl.event` / `noetl.command` tables by `execution_id`.
26//!    If each shard's `noetl.snowflake_id()` keeps generating from
27//!    its own DB, the natural assignment `shard_for(execution_id)`
28//!    always picks that shard (the machine_id portion encodes
29//!    the shard).  This defeats the routing.  App-side
30//!    generation puts the machine_id under control of the
31//!    deployment manifest (`NOETL_SERVER_MACHINE_ID`).
32//!
33//! [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
34//!
35//! # ID layout
36//!
37//! ```text
38//! 63                                               22       12       0
39//!  ┌─┬─────────────────────────────────────────────┬────────┬────────┐
40//!  │0│      timestamp (41 bits, ms since epoch)     │ mid(10)│ seq(12)│
41//!  └─┴─────────────────────────────────────────────┴────────┴────────┘
42//! ```
43//!
44//! - Sign bit always 0 (so the i64 is non-negative; matches the
45//!   `bigint` column type without overflow surprises).
46//! - **Timestamp**: 41 bits of milliseconds since the NoETL epoch
47//!   (`2024-01-01T00:00:00Z` UTC).  41 bits gives ~69 years before
48//!   wrap-around, so this stays valid through 2093.
49//! - **Machine ID**: 10 bits, so 1024 distinct machines.  For
50//!   noetl-server, the env var `NOETL_SERVER_MACHINE_ID` sets it
51//!   directly; if unset (local dev), it's derived from
52//!   `hostname()` hashed to 10 bits.
53//! - **Sequence**: 12 bits, so 4096 ids per machine per ms.
54//!   Resets when the timestamp ticks forward.
55//!
56//! Same shape as Twitter's original snowflake spec + the existing
57//! Postgres `noetl.snowflake_id()` function, so ids generated here
58//! and ids generated by the DB are mutually orderable.
59
60use std::sync::Mutex;
61use std::time::{SystemTime, UNIX_EPOCH};
62
63/// NoETL epoch in milliseconds since the Unix epoch.
64/// `2024-01-01T00:00:00Z` UTC.  Picked once; never change.
65pub const NOETL_EPOCH_MS: u64 = 1_704_067_200_000;
66
67/// Number of bits reserved for the machine id.
68const MACHINE_ID_BITS: u8 = 10;
69
70/// Number of bits reserved for the sequence within a millisecond.
71const SEQUENCE_BITS: u8 = 12;
72
73/// Maximum machine id (inclusive): `2^10 - 1 = 1023`.
74pub const MAX_MACHINE_ID: u16 = (1 << MACHINE_ID_BITS) - 1;
75
76/// Maximum sequence (inclusive): `2^12 - 1 = 4095`.
77const SEQUENCE_MASK: u16 = (1 << SEQUENCE_BITS) - 1;
78
79/// Bit shift for the machine id portion.
80const MACHINE_ID_SHIFT: u8 = SEQUENCE_BITS;
81
82/// Bit shift for the timestamp portion.
83const TIMESTAMP_SHIFT: u8 = SEQUENCE_BITS + MACHINE_ID_BITS;
84
85/// Mutable state carried inside the [`SnowflakeGenerator`].
86#[derive(Debug)]
87struct State {
88    /// Last timestamp (NoETL epoch ms) we generated an id for.
89    last_timestamp: u64,
90    /// Sequence counter within the current `last_timestamp` ms.
91    /// Resets to 0 when the timestamp ticks forward.
92    sequence: u16,
93}
94
95/// Application-side snowflake ID generator.
96///
97/// Construct via [`SnowflakeGenerator::new`] with the machine id
98/// read from `NOETL_SERVER_MACHINE_ID` (see [`derive_machine_id`]).
99/// Call [`SnowflakeGenerator::generate`] to mint a fresh id.
100///
101/// Thread-safe via an internal `Mutex`; the contention window is
102/// tiny (one atomic compare + a few arithmetic ops) so this is
103/// fine for the server's expected concurrency.  If a hotter
104/// approach is needed later, switch to lock-free atomics; the
105/// public API stays the same.
106#[derive(Debug)]
107pub struct SnowflakeGenerator {
108    machine_id: u16,
109    state: Mutex<State>,
110}
111
112impl SnowflakeGenerator {
113    /// Construct a generator pinned to `machine_id`.
114    ///
115    /// Returns an error if `machine_id > 1023` (10 bits) — the
116    /// caller should reject the configured `NOETL_SERVER_MACHINE_ID`
117    /// at startup rather than truncate.
118    pub fn new(machine_id: u16) -> Result<Self, SnowflakeError> {
119        if machine_id > MAX_MACHINE_ID {
120            return Err(SnowflakeError::MachineIdOutOfRange { machine_id });
121        }
122        Ok(Self {
123            machine_id,
124            state: Mutex::new(State {
125                last_timestamp: 0,
126                sequence: 0,
127            }),
128        })
129    }
130
131    /// Mint a fresh snowflake id.
132    ///
133    /// Blocks briefly only if the local sequence counter exhausts
134    /// its 12-bit space within the same millisecond (4096 ids in
135    /// one ms — the busy-wait would last sub-millisecond).  Never
136    /// fails in practice; returns an error only if the system
137    /// clock falls before [`NOETL_EPOCH_MS`].
138    pub fn generate(&self) -> Result<i64, SnowflakeError> {
139        let mut state = self
140            .state
141            .lock()
142            .map_err(|_| SnowflakeError::StateLockPoisoned)?;
143
144        let mut now = current_noetl_ms()?;
145
146        // Clock went backwards on us (NTP step, machine resumed
147        // from suspend, etc.).  Reuse the previous ms with a fresh
148        // sequence rather than emit a backwards-ordered id.  This
149        // also covers the within-same-ms case below.
150        if now < state.last_timestamp {
151            now = state.last_timestamp;
152        }
153
154        if now == state.last_timestamp {
155            // Within the same ms — bump the sequence.
156            state.sequence = (state.sequence + 1) & SEQUENCE_MASK;
157            if state.sequence == 0 {
158                // Sequence overflowed — busy-wait until next ms.
159                now = wait_until_next_ms(state.last_timestamp)?;
160                state.last_timestamp = now;
161            }
162        } else {
163            // Timestamp ticked forward — reset sequence.
164            state.last_timestamp = now;
165            state.sequence = 0;
166        }
167
168        let id = ((now as i64) << TIMESTAMP_SHIFT)
169            | ((self.machine_id as i64) << MACHINE_ID_SHIFT)
170            | (state.sequence as i64);
171        Ok(id)
172    }
173
174    /// Return the machine id this generator is pinned to (for
175    /// diagnostics, span fields, and the startup log line).
176    pub fn machine_id(&self) -> u16 {
177        self.machine_id
178    }
179}
180
181/// Derive a 10-bit machine id from a string seed.
182///
183/// Used when `NOETL_SERVER_MACHINE_ID` is unset (local dev).
184/// Hashes the input with a stable hash (FNV-1a, hand-rolled to
185/// avoid `std::collections::hash_map::DefaultHasher`'s release-
186/// to-release instability) and masks to 10 bits.
187pub fn derive_machine_id(seed: &str) -> u16 {
188    let mut hash: u64 = 0xcbf29ce484222325;
189    for b in seed.bytes() {
190        hash ^= b as u64;
191        hash = hash.wrapping_mul(0x100000001b3);
192    }
193    (hash & MAX_MACHINE_ID as u64) as u16
194}
195
196/// Errors the snowflake generator can return.
197#[derive(Debug, thiserror::Error)]
198pub enum SnowflakeError {
199    #[error("machine_id {machine_id} exceeds 10-bit max {MAX_MACHINE_ID}")]
200    MachineIdOutOfRange { machine_id: u16 },
201
202    #[error("system clock is before NoETL epoch (2024-01-01); fix NTP")]
203    ClockBeforeEpoch,
204
205    #[error("snowflake generator state mutex was poisoned")]
206    StateLockPoisoned,
207}
208
209fn current_noetl_ms() -> Result<u64, SnowflakeError> {
210    let unix_ms = SystemTime::now()
211        .duration_since(UNIX_EPOCH)
212        .map_err(|_| SnowflakeError::ClockBeforeEpoch)?
213        .as_millis() as u64;
214    if unix_ms < NOETL_EPOCH_MS {
215        return Err(SnowflakeError::ClockBeforeEpoch);
216    }
217    Ok(unix_ms - NOETL_EPOCH_MS)
218}
219
220fn wait_until_next_ms(last: u64) -> Result<u64, SnowflakeError> {
221    loop {
222        let now = current_noetl_ms()?;
223        if now > last {
224            return Ok(now);
225        }
226        // Busy-wait: we're already within the same ms, so this
227        // loops at most a few hundred iterations before the
228        // clock ticks.  std::thread::yield_now() doesn't help on
229        // a clock-bound loop.
230        std::hint::spin_loop();
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use std::collections::HashSet;
238    use std::sync::Arc;
239
240    #[test]
241    fn rejects_machine_id_above_10_bits() {
242        let err = SnowflakeGenerator::new(1024).unwrap_err();
243        match err {
244            SnowflakeError::MachineIdOutOfRange { machine_id } => {
245                assert_eq!(machine_id, 1024);
246            }
247            other => panic!("expected MachineIdOutOfRange, got {other:?}"),
248        }
249    }
250
251    #[test]
252    fn accepts_machine_id_at_max() {
253        SnowflakeGenerator::new(MAX_MACHINE_ID).expect("max machine id is valid");
254    }
255
256    #[test]
257    fn generated_id_is_non_negative_i64() {
258        // Sign bit must always be 0 — caller treats the id as
259        // `bigint` (signed) but the value is non-negative.
260        let gen = SnowflakeGenerator::new(1).unwrap();
261        for _ in 0..100 {
262            let id = gen.generate().unwrap();
263            assert!(id >= 0, "id {id} negative — sign bit leaked");
264        }
265    }
266
267    #[test]
268    fn ids_are_monotonic_within_a_single_thread() {
269        let gen = SnowflakeGenerator::new(7).unwrap();
270        let mut prev = gen.generate().unwrap();
271        for _ in 0..10_000 {
272            let id = gen.generate().unwrap();
273            assert!(
274                id > prev,
275                "ids not monotonic: prev={prev} current={id}"
276            );
277            prev = id;
278        }
279    }
280
281    #[test]
282    fn machine_id_is_preserved_in_generated_ids() {
283        let gen = SnowflakeGenerator::new(42).unwrap();
284        let id = gen.generate().unwrap();
285        // Extract the machine_id back out: shift right by SEQUENCE_BITS,
286        // then mask with MAX_MACHINE_ID.
287        let extracted = ((id >> MACHINE_ID_SHIFT) as u16) & MAX_MACHINE_ID;
288        assert_eq!(extracted, 42);
289    }
290
291    #[test]
292    fn sequence_rolls_over_after_4096_within_one_ms() {
293        // Force-generate >4096 ids back to back; verify they all
294        // come back unique even when the sequence overflows.
295        let gen = SnowflakeGenerator::new(3).unwrap();
296        let mut seen = HashSet::with_capacity(10_000);
297        for _ in 0..10_000 {
298            let id = gen.generate().unwrap();
299            assert!(seen.insert(id), "id {id} repeated — sequence overflow not handled");
300        }
301    }
302
303    #[test]
304    fn concurrent_generators_produce_unique_ids() {
305        // Stress test: 8 threads each minting 1000 ids on a shared
306        // generator.  All 8000 ids must be unique.
307        let gen = Arc::new(SnowflakeGenerator::new(5).unwrap());
308        let mut handles = Vec::new();
309        for _ in 0..8 {
310            let g = gen.clone();
311            handles.push(std::thread::spawn(move || {
312                let mut local = Vec::with_capacity(1000);
313                for _ in 0..1000 {
314                    local.push(g.generate().unwrap());
315                }
316                local
317            }));
318        }
319        let mut all = HashSet::new();
320        for h in handles {
321            for id in h.join().unwrap() {
322                assert!(all.insert(id), "duplicate id {id} from concurrent generators");
323            }
324        }
325        assert_eq!(all.len(), 8000);
326    }
327
328    #[test]
329    fn derive_machine_id_is_stable_for_same_input() {
330        let a = derive_machine_id("noetl-server-pod-0");
331        let b = derive_machine_id("noetl-server-pod-0");
332        assert_eq!(a, b);
333    }
334
335    #[test]
336    fn derive_machine_id_differs_for_different_inputs() {
337        let a = derive_machine_id("pod-0");
338        let b = derive_machine_id("pod-1");
339        // Not strictly guaranteed (10-bit space → 1024 buckets),
340        // but FNV-1a on these inputs gives different values.
341        assert_ne!(a, b);
342    }
343
344    #[test]
345    fn derive_machine_id_stays_within_10_bits() {
346        for s in &["", "a", "noetl-server", "very-long-hostname-with-pid-12345"] {
347            assert!(derive_machine_id(s) <= MAX_MACHINE_ID);
348        }
349    }
350}