noetl-server 2.53.0

NoETL Control Plane - Async Rust server for workflow orchestration
Documentation
//! Application-side snowflake ID generation.
//!
//! Per [`observability.md` Principle 3][rule], `execution_id` /
//! `event_id` / `command_id` are generated **in the application**
//! using this module's [`SnowflakeGenerator`], not by the DB-side
//! `noetl.snowflake_id()` Postgres function.  Reasons:
//!
//! 1. **Spans need the id at span-creation time.**  The previous
//!    path opened tracing spans AFTER the DB round-trip; the id
//!    arriving in `result_id` after the INSERT means the span
//!    for the dispatch can't reference it.  Generating in the app
//!    means the id is available before any I/O.
//! 2. **Retries are idempotent only with a stable id.**  If the DB
//!    assigns the id on first try, a network-blip retry creates a
//!    duplicate row OR a NULL id during the failure window.
//! 3. **Cross-component publish needs the id before publish.**
//!    The orchestrator must put `execution_id` on the NATS message;
//!    if the server's INSERT generates it, the publish has to wait
//!    for the INSERT (doubling the latency).
//! 4. **Tests need deterministic ids.**  App-side generation lets
//!    tests inject a known seed; DB-side `noetl.snowflake_id()`
//!    forces a live DB even for unit tests.
//! 5. **Sharded deployments can't agree on a single DB-side
//!    sequence.**  Phase F of noetl/ai-meta#49 partitions the
//!    `noetl.event` / `noetl.command` tables by `execution_id`.
//!    If each shard's `noetl.snowflake_id()` keeps generating from
//!    its own DB, the natural assignment `shard_for(execution_id)`
//!    always picks that shard (the machine_id portion encodes
//!    the shard).  This defeats the routing.  App-side
//!    generation puts the machine_id under control of the
//!    deployment manifest (`NOETL_SERVER_MACHINE_ID`).
//!
//! [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
//!
//! # ID layout
//!
//! ```text
//! 63                                               22       12       0
//!  ┌─┬─────────────────────────────────────────────┬────────┬────────┐
//!  │0│      timestamp (41 bits, ms since epoch)     │ mid(10)│ seq(12)│
//!  └─┴─────────────────────────────────────────────┴────────┴────────┘
//! ```
//!
//! - Sign bit always 0 (so the i64 is non-negative; matches the
//!   `bigint` column type without overflow surprises).
//! - **Timestamp**: 41 bits of milliseconds since the NoETL epoch
//!   (`2024-01-01T00:00:00Z` UTC).  41 bits gives ~69 years before
//!   wrap-around, so this stays valid through 2093.
//! - **Machine ID**: 10 bits, so 1024 distinct machines.  For
//!   noetl-server, the env var `NOETL_SERVER_MACHINE_ID` sets it
//!   directly; if unset (local dev), it's derived from
//!   `hostname()` hashed to 10 bits.
//! - **Sequence**: 12 bits, so 4096 ids per machine per ms.
//!   Resets when the timestamp ticks forward.
//!
//! Same shape as Twitter's original snowflake spec + the existing
//! Postgres `noetl.snowflake_id()` function, so ids generated here
//! and ids generated by the DB are mutually orderable.

use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

/// NoETL epoch in milliseconds since the Unix epoch.
/// `2024-01-01T00:00:00Z` UTC.  Picked once; never change.
pub const NOETL_EPOCH_MS: u64 = 1_704_067_200_000;

/// Number of bits reserved for the machine id.
const MACHINE_ID_BITS: u8 = 10;

/// Number of bits reserved for the sequence within a millisecond.
const SEQUENCE_BITS: u8 = 12;

/// Maximum machine id (inclusive): `2^10 - 1 = 1023`.
pub const MAX_MACHINE_ID: u16 = (1 << MACHINE_ID_BITS) - 1;

/// Maximum sequence (inclusive): `2^12 - 1 = 4095`.
const SEQUENCE_MASK: u16 = (1 << SEQUENCE_BITS) - 1;

/// Bit shift for the machine id portion.
const MACHINE_ID_SHIFT: u8 = SEQUENCE_BITS;

/// Bit shift for the timestamp portion.
const TIMESTAMP_SHIFT: u8 = SEQUENCE_BITS + MACHINE_ID_BITS;

/// Mutable state carried inside the [`SnowflakeGenerator`].
#[derive(Debug)]
struct State {
    /// Last timestamp (NoETL epoch ms) we generated an id for.
    last_timestamp: u64,
    /// Sequence counter within the current `last_timestamp` ms.
    /// Resets to 0 when the timestamp ticks forward.
    sequence: u16,
}

/// Application-side snowflake ID generator.
///
/// Construct via [`SnowflakeGenerator::new`] with the machine id
/// read from `NOETL_SERVER_MACHINE_ID` (see [`derive_machine_id`]).
/// Call [`SnowflakeGenerator::generate`] to mint a fresh id.
///
/// Thread-safe via an internal `Mutex`; the contention window is
/// tiny (one atomic compare + a few arithmetic ops) so this is
/// fine for the server's expected concurrency.  If a hotter
/// approach is needed later, switch to lock-free atomics; the
/// public API stays the same.
#[derive(Debug)]
pub struct SnowflakeGenerator {
    machine_id: u16,
    state: Mutex<State>,
}

impl SnowflakeGenerator {
    /// Construct a generator pinned to `machine_id`.
    ///
    /// Returns an error if `machine_id > 1023` (10 bits) — the
    /// caller should reject the configured `NOETL_SERVER_MACHINE_ID`
    /// at startup rather than truncate.
    pub fn new(machine_id: u16) -> Result<Self, SnowflakeError> {
        if machine_id > MAX_MACHINE_ID {
            return Err(SnowflakeError::MachineIdOutOfRange { machine_id });
        }
        Ok(Self {
            machine_id,
            state: Mutex::new(State {
                last_timestamp: 0,
                sequence: 0,
            }),
        })
    }

    /// Mint a fresh snowflake id.
    ///
    /// Blocks briefly only if the local sequence counter exhausts
    /// its 12-bit space within the same millisecond (4096 ids in
    /// one ms — the busy-wait would last sub-millisecond).  Never
    /// fails in practice; returns an error only if the system
    /// clock falls before [`NOETL_EPOCH_MS`].
    pub fn generate(&self) -> Result<i64, SnowflakeError> {
        let mut state = self
            .state
            .lock()
            .map_err(|_| SnowflakeError::StateLockPoisoned)?;

        let mut now = current_noetl_ms()?;

        // Clock went backwards on us (NTP step, machine resumed
        // from suspend, etc.).  Reuse the previous ms with a fresh
        // sequence rather than emit a backwards-ordered id.  This
        // also covers the within-same-ms case below.
        if now < state.last_timestamp {
            now = state.last_timestamp;
        }

        if now == state.last_timestamp {
            // Within the same ms — bump the sequence.
            state.sequence = (state.sequence + 1) & SEQUENCE_MASK;
            if state.sequence == 0 {
                // Sequence overflowed — busy-wait until next ms.
                now = wait_until_next_ms(state.last_timestamp)?;
                state.last_timestamp = now;
            }
        } else {
            // Timestamp ticked forward — reset sequence.
            state.last_timestamp = now;
            state.sequence = 0;
        }

        let id = ((now as i64) << TIMESTAMP_SHIFT)
            | ((self.machine_id as i64) << MACHINE_ID_SHIFT)
            | (state.sequence as i64);
        Ok(id)
    }

    /// Return the machine id this generator is pinned to (for
    /// diagnostics, span fields, and the startup log line).
    pub fn machine_id(&self) -> u16 {
        self.machine_id
    }
}

/// Derive a 10-bit machine id from a string seed.
///
/// Used when `NOETL_SERVER_MACHINE_ID` is unset (local dev).
/// Hashes the input with a stable hash (FNV-1a, hand-rolled to
/// avoid `std::collections::hash_map::DefaultHasher`'s release-
/// to-release instability) and masks to 10 bits.
pub fn derive_machine_id(seed: &str) -> u16 {
    let mut hash: u64 = 0xcbf29ce484222325;
    for b in seed.bytes() {
        hash ^= b as u64;
        hash = hash.wrapping_mul(0x100000001b3);
    }
    (hash & MAX_MACHINE_ID as u64) as u16
}

/// Errors the snowflake generator can return.
#[derive(Debug, thiserror::Error)]
pub enum SnowflakeError {
    #[error("machine_id {machine_id} exceeds 10-bit max {MAX_MACHINE_ID}")]
    MachineIdOutOfRange { machine_id: u16 },

    #[error("system clock is before NoETL epoch (2024-01-01); fix NTP")]
    ClockBeforeEpoch,

    #[error("snowflake generator state mutex was poisoned")]
    StateLockPoisoned,
}

fn current_noetl_ms() -> Result<u64, SnowflakeError> {
    let unix_ms = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_err(|_| SnowflakeError::ClockBeforeEpoch)?
        .as_millis() as u64;
    if unix_ms < NOETL_EPOCH_MS {
        return Err(SnowflakeError::ClockBeforeEpoch);
    }
    Ok(unix_ms - NOETL_EPOCH_MS)
}

fn wait_until_next_ms(last: u64) -> Result<u64, SnowflakeError> {
    loop {
        let now = current_noetl_ms()?;
        if now > last {
            return Ok(now);
        }
        // Busy-wait: we're already within the same ms, so this
        // loops at most a few hundred iterations before the
        // clock ticks.  std::thread::yield_now() doesn't help on
        // a clock-bound loop.
        std::hint::spin_loop();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::HashSet;
    use std::sync::Arc;

    #[test]
    fn rejects_machine_id_above_10_bits() {
        let err = SnowflakeGenerator::new(1024).unwrap_err();
        match err {
            SnowflakeError::MachineIdOutOfRange { machine_id } => {
                assert_eq!(machine_id, 1024);
            }
            other => panic!("expected MachineIdOutOfRange, got {other:?}"),
        }
    }

    #[test]
    fn accepts_machine_id_at_max() {
        SnowflakeGenerator::new(MAX_MACHINE_ID).expect("max machine id is valid");
    }

    #[test]
    fn generated_id_is_non_negative_i64() {
        // Sign bit must always be 0 — caller treats the id as
        // `bigint` (signed) but the value is non-negative.
        let gen = SnowflakeGenerator::new(1).unwrap();
        for _ in 0..100 {
            let id = gen.generate().unwrap();
            assert!(id >= 0, "id {id} negative — sign bit leaked");
        }
    }

    #[test]
    fn ids_are_monotonic_within_a_single_thread() {
        let gen = SnowflakeGenerator::new(7).unwrap();
        let mut prev = gen.generate().unwrap();
        for _ in 0..10_000 {
            let id = gen.generate().unwrap();
            assert!(
                id > prev,
                "ids not monotonic: prev={prev} current={id}"
            );
            prev = id;
        }
    }

    #[test]
    fn machine_id_is_preserved_in_generated_ids() {
        let gen = SnowflakeGenerator::new(42).unwrap();
        let id = gen.generate().unwrap();
        // Extract the machine_id back out: shift right by SEQUENCE_BITS,
        // then mask with MAX_MACHINE_ID.
        let extracted = ((id >> MACHINE_ID_SHIFT) as u16) & MAX_MACHINE_ID;
        assert_eq!(extracted, 42);
    }

    #[test]
    fn sequence_rolls_over_after_4096_within_one_ms() {
        // Force-generate >4096 ids back to back; verify they all
        // come back unique even when the sequence overflows.
        let gen = SnowflakeGenerator::new(3).unwrap();
        let mut seen = HashSet::with_capacity(10_000);
        for _ in 0..10_000 {
            let id = gen.generate().unwrap();
            assert!(seen.insert(id), "id {id} repeated — sequence overflow not handled");
        }
    }

    #[test]
    fn concurrent_generators_produce_unique_ids() {
        // Stress test: 8 threads each minting 1000 ids on a shared
        // generator.  All 8000 ids must be unique.
        let gen = Arc::new(SnowflakeGenerator::new(5).unwrap());
        let mut handles = Vec::new();
        for _ in 0..8 {
            let g = gen.clone();
            handles.push(std::thread::spawn(move || {
                let mut local = Vec::with_capacity(1000);
                for _ in 0..1000 {
                    local.push(g.generate().unwrap());
                }
                local
            }));
        }
        let mut all = HashSet::new();
        for h in handles {
            for id in h.join().unwrap() {
                assert!(all.insert(id), "duplicate id {id} from concurrent generators");
            }
        }
        assert_eq!(all.len(), 8000);
    }

    #[test]
    fn derive_machine_id_is_stable_for_same_input() {
        let a = derive_machine_id("noetl-server-pod-0");
        let b = derive_machine_id("noetl-server-pod-0");
        assert_eq!(a, b);
    }

    #[test]
    fn derive_machine_id_differs_for_different_inputs() {
        let a = derive_machine_id("pod-0");
        let b = derive_machine_id("pod-1");
        // Not strictly guaranteed (10-bit space → 1024 buckets),
        // but FNV-1a on these inputs gives different values.
        assert_ne!(a, b);
    }

    #[test]
    fn derive_machine_id_stays_within_10_bits() {
        for s in &["", "a", "noetl-server", "very-long-hostname-with-pid-12345"] {
            assert!(derive_machine_id(s) <= MAX_MACHINE_ID);
        }
    }
}