citadeldb-sync 0.12.0

Replication and sync layer for Citadel encrypted database
Documentation
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

/// Hybrid Logical Clock timestamp.
///
/// Layout (12 bytes):
/// - `wall_time`: `i64` - nanoseconds since Unix epoch (true nanosecond precision)
/// - `logical`: `i32` - counter for events within the same nanosecond
///
/// Comparison: wall_time first, then logical (total order).
/// Big-endian byte serialization preserves comparison order for non-negative values.
///
/// Range:
/// - wall_time: covers ~292 years from epoch (until year 2262)
/// - logical: up to 2,147,483,647 events per nanosecond
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct HlcTimestamp {
    wall_time: i64,
    logical: i32,
}

pub const HLC_TIMESTAMP_SIZE: usize = 12;

impl HlcTimestamp {
    pub const ZERO: Self = Self {
        wall_time: 0,
        logical: 0,
    };

    #[inline]
    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
        Self {
            wall_time: wall_time_ns,
            logical,
        }
    }

    #[inline]
    pub fn wall_time(&self) -> i64 {
        self.wall_time
    }

    #[inline]
    pub fn logical(&self) -> i32 {
        self.logical
    }

    /// Big-endian serialization preserves comparison order for non-negative values.
    #[inline]
    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
        buf
    }

    #[inline]
    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
        Self {
            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
        }
    }

    #[inline]
    pub fn is_zero(&self) -> bool {
        self.wall_time == 0 && self.logical == 0
    }
}

impl PartialOrd for HlcTimestamp {
    #[inline]
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for HlcTimestamp {
    #[inline]
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.wall_time
            .cmp(&other.wall_time)
            .then(self.logical.cmp(&other.logical))
    }
}

impl std::fmt::Debug for HlcTimestamp {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
    }
}

impl std::fmt::Display for HlcTimestamp {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}:{}", self.wall_time, self.logical)
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ClockError {
    #[error(
        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
    )]
    ClockDriftExceeded {
        remote_ns: i64,
        physical_ns: i64,
        max_drift_ns: i64,
    },

    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
    CounterOverflow,
}

/// Source of physical time for the HLC.
///
/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
pub trait PhysicalClock: Send {
    /// Current time in nanoseconds since Unix epoch.
    fn now_ns(&self) -> i64;
}

/// Physical clock backed by the system clock.
///
/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
/// On macOS: microsecond precision for wall clock.
/// On Windows: ~100ns precision (system timer resolution).
pub struct SystemClock;

impl PhysicalClock for SystemClock {
    fn now_ns(&self) -> i64 {
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("system clock before UNIX epoch")
            .as_nanos() as i64
    }
}

pub struct ManualClock {
    time_ns: AtomicI64,
}

impl ManualClock {
    pub fn new(initial_ns: i64) -> Self {
        Self {
            time_ns: AtomicI64::new(initial_ns),
        }
    }

    pub fn set(&self, time_ns: i64) {
        self.time_ns.store(time_ns, Ordering::SeqCst);
    }

    pub fn advance(&self, delta_ns: i64) {
        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
    }
}

impl PhysicalClock for ManualClock {
    fn now_ns(&self) -> i64 {
        self.time_ns.load(Ordering::SeqCst)
    }
}

const SECOND_NS: i64 = 1_000_000_000;
const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;

/// Hybrid Logical Clock state machine.
///
/// Two core operations:
/// - [`now()`](HlcClock::now) - generate a timestamp for a local event
/// - [`update()`](HlcClock::update) - merge a remote timestamp into local state
///
/// `now()` always returns a strictly increasing timestamp.
/// `update()` advances the internal clock state without generating a new timestamp.
/// Timestamps are only generated by local events; received timestamps only advance
/// the clock.
pub struct HlcClock<C: PhysicalClock = SystemClock> {
    last: HlcTimestamp,
    max_drift_ns: i64,
    clock: C,
}

impl Default for HlcClock<SystemClock> {
    fn default() -> Self {
        Self::new()
    }
}

impl HlcClock<SystemClock> {
    pub fn new() -> Self {
        Self {
            last: HlcTimestamp::ZERO,
            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
            clock: SystemClock,
        }
    }
}

impl<C: PhysicalClock> HlcClock<C> {
    pub fn with_clock(clock: C) -> Self {
        Self {
            last: HlcTimestamp::ZERO,
            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
            clock,
        }
    }

    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
        self.max_drift_ns = max_drift_ns;
    }

    /// Call on startup to restore monotonicity after a restart.
    pub fn set_last(&mut self, ts: HlcTimestamp) {
        self.last = ts;
    }

    /// Generate a monotonically increasing timestamp for a local event.
    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
        let pt = self.clock.now_ns();

        let ts = if self.last.wall_time >= pt {
            // Physical clock hasn't advanced past stored wall time.
            // Increment logical counter.
            let new_logical = self
                .last
                .logical
                .checked_add(1)
                .ok_or(ClockError::CounterOverflow)?;

            self.check_drift(self.last.wall_time, pt)?;

            HlcTimestamp::new(self.last.wall_time, new_logical)
        } else {
            // Physical clock advanced - use it, reset logical to 0.
            HlcTimestamp::new(pt, 0)
        };

        self.last = ts;
        Ok(ts)
    }

    /// Merge a remote timestamp into local state (does not generate a new timestamp).
    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
        let pt = self.clock.now_ns();

        // Reject remote timestamps that are too far ahead
        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
            return Err(ClockError::ClockDriftExceeded {
                remote_ns: remote.wall_time,
                physical_ns: pt,
                max_drift_ns: self.max_drift_ns,
            });
        }

        if remote.wall_time > self.last.wall_time {
            // Remote is ahead: adopt its wall time and logical
            self.last = remote;
        } else if remote.wall_time == self.last.wall_time {
            // Same wall time: take the max logical
            if remote.logical > self.last.logical {
                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
            }
        }
        // If remote.wall_time < self.last.wall_time: do nothing

        Ok(())
    }

    pub fn last_timestamp(&self) -> HlcTimestamp {
        self.last
    }

    pub fn physical_clock(&self) -> &C {
        &self.clock
    }

    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
            return Err(ClockError::ClockDriftExceeded {
                remote_ns: wall_time_ns,
                physical_ns,
                max_drift_ns: self.max_drift_ns,
            });
        }
        Ok(())
    }
}

#[cfg(test)]
#[path = "hlc_tests.rs"]
mod tests;