Skip to main content

reddb_server/log/
id.rs

1//! Timestamp-based monotonic IDs for log entries.
2//!
3//! Layout (64 bits):
4//! ┌──────────────────────────────────┬──────────┐
5//! │ timestamp_us (52 bits)           │ seq (12) │
6//! │ ~142 years of range from epoch   │ 4096/µs  │
7//! └──────────────────────────────────┴──────────┘
8//!
9//! Properties:
10//! - Monotonically increasing (natural time ordering)
11//! - No collisions up to 4,095 entries per microsecond (~4B/sec theoretical)
12//! - Sortable: ORDER BY id = ORDER BY time
13//! - Extractable: timestamp_us = id >> 12, timestamp_ms = id >> 12 / 1000
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{SystemTime, UNIX_EPOCH};
17
18const SEQ_BITS: u64 = 12;
19const SEQ_MASK: u64 = (1 << SEQ_BITS) - 1; // 0xFFF
20
21/// Generator for timestamp-based log IDs.
22/// Uses a single AtomicU64 packing (timestamp << 12 | seq) so the entire
23/// state transitions atomically via fetch_add or compare_exchange.
24pub struct LogIdGenerator {
25    last_id: AtomicU64,
26}
27
28impl LogIdGenerator {
29    pub fn new() -> Self {
30        Self {
31            last_id: AtomicU64::new(0),
32        }
33    }
34
35    /// Generate the next log ID. Monotonically increasing, thread-safe.
36    /// Single atomic operation — no TOCTOU race possible.
37    pub fn next(&self) -> LogId {
38        let now = now_micros();
39        let candidate = now << SEQ_BITS;
40
41        loop {
42            let prev = self.last_id.load(Ordering::SeqCst);
43            let next = if candidate > prev {
44                candidate
45            } else {
46                prev + 1
47            };
48
49            match self
50                .last_id
51                .compare_exchange(prev, next, Ordering::SeqCst, Ordering::SeqCst)
52            {
53                Ok(_) => return LogId(next),
54                Err(_) => continue,
55            }
56        }
57    }
58
59    /// Restore generator state from the highest existing ID (for reload).
60    pub fn restore(&self, max_id: u64) {
61        loop {
62            let current = self.last_id.load(Ordering::SeqCst);
63            if max_id <= current {
64                break;
65            }
66            match self
67                .last_id
68                .compare_exchange(current, max_id, Ordering::SeqCst, Ordering::SeqCst)
69            {
70                Ok(_) => break,
71                Err(_) => continue,
72            }
73        }
74    }
75}
76
77/// A timestamp-based log entry ID.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
79pub struct LogId(pub u64);
80
81impl LogId {
82    pub fn raw(self) -> u64 {
83        self.0
84    }
85
86    /// Extract the timestamp component in microseconds.
87    pub fn timestamp_us(self) -> u64 {
88        self.0 >> SEQ_BITS
89    }
90
91    /// Extract the timestamp in milliseconds.
92    pub fn timestamp_ms(self) -> u64 {
93        self.timestamp_us() / 1_000
94    }
95
96    /// Extract the sequence within the same microsecond.
97    pub fn sequence(self) -> u16 {
98        (self.0 & SEQ_MASK) as u16
99    }
100
101    /// Create a LogId from a timestamp in milliseconds (for range queries).
102    pub fn from_ms(ms: u64) -> Self {
103        Self((ms * 1_000) << SEQ_BITS)
104    }
105
106    /// Create a LogId from a timestamp in microseconds.
107    pub fn from_us(us: u64) -> Self {
108        Self(us << SEQ_BITS)
109    }
110}
111
112impl std::fmt::Display for LogId {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}", self.0)
115    }
116}
117
118fn now_micros() -> u64 {
119    SystemTime::now()
120        .duration_since(UNIX_EPOCH)
121        .unwrap_or_default()
122        .as_micros() as u64
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128
129    #[test]
130    fn test_monotonic() {
131        let gen = LogIdGenerator::new();
132        let a = gen.next();
133        let b = gen.next();
134        let c = gen.next();
135        assert!(b.raw() > a.raw(), "b > a");
136        assert!(c.raw() > b.raw(), "c > b");
137    }
138
139    #[test]
140    fn test_timestamp_extraction() {
141        let gen = LogIdGenerator::new();
142        let id = gen.next();
143        let ts_us = id.timestamp_us();
144        let now = now_micros();
145        assert!((now - ts_us) < 1_000_000, "within 1 second");
146    }
147
148    #[test]
149    fn test_sequence_within_same_ns() {
150        let gen = LogIdGenerator::new();
151        let a = gen.next();
152        let b = gen.next();
153        // Both should have same or adjacent timestamp, different seq
154        assert_ne!(a.raw(), b.raw());
155    }
156
157    #[test]
158    fn test_from_ms() {
159        let id = LogId::from_ms(1712880000000);
160        assert_eq!(id.timestamp_ms(), 1712880000000);
161        assert_eq!(id.sequence(), 0);
162    }
163
164    #[test]
165    fn test_restore() {
166        let gen = LogIdGenerator::new();
167        let first = gen.next();
168        gen.restore(first.raw() + 1000);
169        let after = gen.next();
170        assert!(after.raw() > first.raw() + 1000);
171    }
172
173    #[test]
174    fn test_high_throughput_no_collision() {
175        let gen = LogIdGenerator::new();
176        let mut ids = Vec::with_capacity(10000);
177        for _ in 0..10000 {
178            ids.push(gen.next().raw());
179        }
180        // All unique
181        let mut deduped = ids.clone();
182        deduped.sort();
183        deduped.dedup();
184        assert_eq!(ids.len(), deduped.len(), "no collisions in 10K IDs");
185        // Monotonically increasing
186        for i in 1..ids.len() {
187            assert!(ids[i] > ids[i - 1], "monotonic at index {}", i);
188        }
189    }
190
191    #[test]
192    fn test_concurrent_no_collision() {
193        let gen = std::sync::Arc::new(LogIdGenerator::new());
194        let mut all_ids = Vec::new();
195
196        std::thread::scope(|s| {
197            let mut handles = Vec::new();
198            for _ in 0..10 {
199                let g = std::sync::Arc::clone(&gen);
200                handles.push(s.spawn(move || {
201                    let mut ids = Vec::with_capacity(1000);
202                    for _ in 0..1000 {
203                        ids.push(g.next().raw());
204                    }
205                    ids
206                }));
207            }
208            for h in handles {
209                all_ids.extend(h.join().unwrap());
210            }
211        });
212
213        assert_eq!(all_ids.len(), 10_000);
214        let mut deduped = all_ids.clone();
215        deduped.sort();
216        deduped.dedup();
217        assert_eq!(
218            all_ids.len(),
219            deduped.len(),
220            "no collisions across 10 threads × 1000 IDs"
221        );
222    }
223}