use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU64, Ordering};
use thread_local::ThreadLocal;
const DEFAULT_POOL_SIZE: u64 = 10000;
#[allow(dead_code)]
const EPOCH_BITS: u32 = 32;
const SEQUENCE_BITS: u32 = 32;
struct TimestampPool {
epoch: u64,
sequence: u64,
limit: u64,
}
impl TimestampPool {
fn new(epoch: u64, pool_size: u64) -> Self {
Self {
epoch,
sequence: 0,
limit: pool_size,
}
}
}
pub struct HierarchicalTimestampOracle {
global_epoch: AtomicU64,
thread_pools: ThreadLocal<UnsafeCell<TimestampPool>>,
pool_size: u64,
}
impl HierarchicalTimestampOracle {
pub fn new() -> Self {
Self::with_pool_size(DEFAULT_POOL_SIZE)
}
pub fn with_pool_size(pool_size: u64) -> Self {
Self {
global_epoch: AtomicU64::new(0),
thread_pools: ThreadLocal::new(),
pool_size,
}
}
#[inline]
pub fn allocate(&self) -> u64 {
let pool_cell = self.thread_pools.get_or(|| {
let epoch = self.global_epoch.fetch_add(1, Ordering::SeqCst);
UnsafeCell::new(TimestampPool::new(epoch, self.pool_size))
});
let pool = unsafe { &mut *pool_cell.get() };
if pool.sequence < pool.limit {
let ts = (pool.epoch << SEQUENCE_BITS) | pool.sequence;
pool.sequence += 1;
return ts;
}
self.reserve_new_pool(pool)
}
#[cold]
fn reserve_new_pool(&self, pool: &mut TimestampPool) -> u64 {
let new_epoch = self.global_epoch.fetch_add(1, Ordering::SeqCst);
pool.epoch = new_epoch;
pool.sequence = 1; pool.limit = self.pool_size;
(pool.epoch << SEQUENCE_BITS) | 0
}
#[inline]
pub fn current_epoch(&self) -> u64 {
self.global_epoch.load(Ordering::Relaxed)
}
#[inline]
pub fn extract_epoch(ts: u64) -> u64 {
ts >> SEQUENCE_BITS
}
#[inline]
pub fn extract_sequence(ts: u64) -> u64 {
ts & ((1 << SEQUENCE_BITS) - 1)
}
#[inline]
pub fn happens_before(ts1: u64, ts2: u64) -> bool {
ts1 < ts2
}
}
impl Default for HierarchicalTimestampOracle {
fn default() -> Self {
Self::new()
}
}
pub struct HybridHierarchicalOracle {
hierarchical: HierarchicalTimestampOracle,
wall_clock_offset: AtomicU64,
last_physical: AtomicU64,
}
impl HybridHierarchicalOracle {
pub fn new() -> Self {
Self {
hierarchical: HierarchicalTimestampOracle::new(),
wall_clock_offset: AtomicU64::new(0),
last_physical: AtomicU64::new(0),
}
}
#[inline]
pub fn allocate(&self) -> u64 {
self.hierarchical.allocate()
}
pub fn allocate_with_physical(&self) -> (u64, u64) {
let logical = self.hierarchical.allocate();
let physical = self.current_physical_time();
(logical, physical)
}
pub fn update_from_external(&self, external_physical: u64) {
loop {
let current = self.last_physical.load(Ordering::Acquire);
if external_physical <= current {
break;
}
if self
.last_physical
.compare_exchange_weak(
current,
external_physical,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
fn current_physical_time(&self) -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
let wall_clock = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let offset = self.wall_clock_offset.load(Ordering::Relaxed);
let physical = wall_clock + offset;
loop {
let last = self.last_physical.load(Ordering::Acquire);
let next = physical.max(last + 1);
if self
.last_physical
.compare_exchange_weak(last, next, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return next;
}
}
}
}
impl Default for HybridHierarchicalOracle {
fn default() -> Self {
Self::new()
}
}
pub struct BatchedTimestampAllocator {
next_global: AtomicU64,
batch_size: u64,
thread_batches: ThreadLocal<UnsafeCell<LocalBatch>>,
}
struct LocalBatch {
start: u64,
current: u64,
end: u64,
}
impl BatchedTimestampAllocator {
pub fn new() -> Self {
Self::with_batch_size(1000)
}
pub fn with_batch_size(batch_size: u64) -> Self {
Self {
next_global: AtomicU64::new(1), batch_size,
thread_batches: ThreadLocal::new(),
}
}
#[inline]
pub fn allocate(&self) -> u64 {
let batch_cell = self.thread_batches.get_or(|| {
UnsafeCell::new(LocalBatch {
start: 0,
current: 0,
end: 0,
})
});
let batch = unsafe { &mut *batch_cell.get() };
if batch.current < batch.end {
let ts = batch.current;
batch.current += 1;
return ts;
}
self.reserve_batch(batch)
}
#[cold]
fn reserve_batch(&self, batch: &mut LocalBatch) -> u64 {
let start = self
.next_global
.fetch_add(self.batch_size, Ordering::SeqCst);
batch.start = start;
batch.current = start + 1;
batch.end = start + self.batch_size;
start
}
pub fn watermark(&self) -> u64 {
self.next_global.load(Ordering::Acquire)
}
}
impl Default for BatchedTimestampAllocator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::sync::Arc;
use std::thread;
#[test]
fn test_hierarchical_oracle_basic() {
let oracle = HierarchicalTimestampOracle::new();
let ts1 = oracle.allocate();
let ts2 = oracle.allocate();
let ts3 = oracle.allocate();
assert!(ts1 < ts2);
assert!(ts2 < ts3);
let epoch = HierarchicalTimestampOracle::extract_epoch(ts1);
let seq = HierarchicalTimestampOracle::extract_sequence(ts1);
assert_eq!(epoch, 0);
assert_eq!(seq, 0);
}
#[test]
fn test_hierarchical_oracle_pool_exhaustion() {
let oracle = HierarchicalTimestampOracle::with_pool_size(10);
let mut timestamps = Vec::new();
for _ in 0..25 {
timestamps.push(oracle.allocate());
}
let unique: HashSet<_> = timestamps.iter().collect();
assert_eq!(unique.len(), 25);
let epochs: HashSet<_> = timestamps
.iter()
.map(|&ts| HierarchicalTimestampOracle::extract_epoch(ts))
.collect();
assert!(epochs.len() >= 2);
}
#[test]
fn test_hierarchical_oracle_concurrent() {
let oracle = Arc::new(HierarchicalTimestampOracle::new());
let num_threads = 8;
let ops_per_thread = 10000;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let oracle = Arc::clone(&oracle);
thread::spawn(move || {
let mut timestamps = Vec::with_capacity(ops_per_thread);
for _ in 0..ops_per_thread {
timestamps.push(oracle.allocate());
}
timestamps
})
})
.collect();
let all_timestamps: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
let unique: HashSet<_> = all_timestamps.iter().collect();
assert_eq!(unique.len(), num_threads * ops_per_thread);
}
#[test]
fn test_batched_allocator() {
let allocator = BatchedTimestampAllocator::with_batch_size(100);
let mut timestamps = Vec::new();
for _ in 0..250 {
timestamps.push(allocator.allocate());
}
let unique: HashSet<_> = timestamps.iter().collect();
assert_eq!(unique.len(), 250);
for window in timestamps.windows(2) {
assert!(window[0] < window[1]);
}
}
#[test]
fn test_batched_allocator_concurrent() {
let allocator = Arc::new(BatchedTimestampAllocator::with_batch_size(100));
let num_threads = 4;
let ops_per_thread = 1000;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let allocator = Arc::clone(&allocator);
thread::spawn(move || {
let mut timestamps = Vec::new();
for _ in 0..ops_per_thread {
timestamps.push(allocator.allocate());
}
timestamps
})
})
.collect();
let all_timestamps: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
let unique: HashSet<_> = all_timestamps.iter().collect();
assert_eq!(unique.len(), num_threads * ops_per_thread);
}
#[test]
fn test_hybrid_oracle() {
let oracle = HybridHierarchicalOracle::new();
let (logical1, physical1) = oracle.allocate_with_physical();
let (logical2, physical2) = oracle.allocate_with_physical();
assert!(logical1 < logical2);
assert!(physical1 <= physical2);
}
#[test]
fn test_happens_before() {
let oracle = HierarchicalTimestampOracle::new();
let ts1 = oracle.allocate();
let ts2 = oracle.allocate();
assert!(HierarchicalTimestampOracle::happens_before(ts1, ts2));
assert!(!HierarchicalTimestampOracle::happens_before(ts2, ts1));
assert!(!HierarchicalTimestampOracle::happens_before(ts1, ts1));
}
}