use ccxt_core::time::TimestampUtils;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct TimeSyncConfig {
pub sync_interval: Duration,
pub auto_sync: bool,
pub max_offset_drift: i64,
}
impl Default for TimeSyncConfig {
fn default() -> Self {
Self {
sync_interval: Duration::from_secs(30),
auto_sync: true,
max_offset_drift: 5000,
}
}
}
impl TimeSyncConfig {
pub fn with_interval(sync_interval: Duration) -> Self {
Self {
sync_interval,
..Default::default()
}
}
pub fn manual_sync_only() -> Self {
Self {
auto_sync: false,
..Default::default()
}
}
}
#[derive(Debug)]
pub struct TimeSyncManager {
time_offset: AtomicI64,
last_sync_time: AtomicI64,
initialized: AtomicBool,
config: TimeSyncConfig,
}
impl TimeSyncManager {
pub fn new() -> Self {
Self::with_config(TimeSyncConfig::default())
}
pub fn with_config(config: TimeSyncConfig) -> Self {
Self {
time_offset: AtomicI64::new(0),
last_sync_time: AtomicI64::new(0),
initialized: AtomicBool::new(false),
config,
}
}
#[inline]
pub fn is_initialized(&self) -> bool {
self.initialized.load(Ordering::Acquire)
}
pub fn needs_resync(&self) -> bool {
if !self.is_initialized() {
return true;
}
if !self.config.auto_sync {
return false;
}
let last_sync = self.last_sync_time.load(Ordering::Acquire);
let now = TimestampUtils::now_ms();
let elapsed = now.saturating_sub(last_sync);
if elapsed >= self.config.sync_interval.as_millis() as i64 {
return true;
}
if elapsed >= self.config.max_offset_drift {
return true;
}
false
}
#[inline]
pub fn get_offset(&self) -> i64 {
self.time_offset.load(Ordering::Acquire)
}
#[inline]
pub fn get_server_timestamp(&self) -> i64 {
let local_time = TimestampUtils::now_ms();
let offset = self.get_offset();
local_time.saturating_add(offset)
}
pub fn update_offset(&self, server_time: i64) {
let local_time = TimestampUtils::now_ms();
let offset = server_time.saturating_sub(local_time);
self.time_offset.store(offset, Ordering::Release);
self.last_sync_time.store(local_time, Ordering::Release);
self.initialized.store(true, Ordering::Release);
}
#[inline]
pub fn last_sync_time(&self) -> i64 {
self.last_sync_time.load(Ordering::Acquire)
}
#[inline]
pub fn config(&self) -> &TimeSyncConfig {
&self.config
}
pub fn reset(&self) {
self.time_offset.store(0, Ordering::Release);
self.last_sync_time.store(0, Ordering::Release);
self.initialized.store(false, Ordering::Release);
}
}
impl Default for TimeSyncManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::disallowed_methods)]
use super::*;
use std::thread;
#[test]
fn test_time_sync_config_default() {
let config = TimeSyncConfig::default();
assert_eq!(config.sync_interval, Duration::from_secs(30));
assert!(config.auto_sync);
assert_eq!(config.max_offset_drift, 5000);
}
#[test]
fn test_time_sync_config_with_interval() {
let config = TimeSyncConfig::with_interval(Duration::from_secs(60));
assert_eq!(config.sync_interval, Duration::from_secs(60));
assert!(config.auto_sync);
}
#[test]
fn test_time_sync_config_manual_sync_only() {
let config = TimeSyncConfig::manual_sync_only();
assert!(!config.auto_sync);
}
#[test]
fn test_time_sync_manager_new() {
let manager = TimeSyncManager::new();
assert!(!manager.is_initialized());
assert_eq!(manager.get_offset(), 0);
assert_eq!(manager.last_sync_time(), 0);
}
#[test]
fn test_time_sync_manager_with_config() {
let config = TimeSyncConfig {
sync_interval: Duration::from_secs(60),
auto_sync: false,
max_offset_drift: 3000,
};
let manager = TimeSyncManager::with_config(config);
assert_eq!(manager.config().sync_interval, Duration::from_secs(60));
assert!(!manager.config().auto_sync);
assert_eq!(manager.config().max_offset_drift, 3000);
}
#[test]
fn test_needs_resync_when_not_initialized() {
let manager = TimeSyncManager::new();
assert!(manager.needs_resync());
}
#[test]
fn test_needs_resync_after_initialization() {
let manager = TimeSyncManager::new();
let server_time = TimestampUtils::now_ms();
manager.update_offset(server_time);
assert!(!manager.needs_resync());
}
#[test]
fn test_needs_resync_with_auto_sync_disabled() {
let config = TimeSyncConfig::manual_sync_only();
let manager = TimeSyncManager::with_config(config);
let server_time = TimestampUtils::now_ms();
manager.update_offset(server_time);
assert!(!manager.needs_resync());
}
#[test]
fn test_update_offset() {
let manager = TimeSyncManager::new();
let local_time = TimestampUtils::now_ms();
let server_time = local_time + 100;
manager.update_offset(server_time);
assert!(manager.is_initialized());
let offset = manager.get_offset();
assert!((90..=110).contains(&offset), "Offset was: {}", offset);
}
#[test]
fn test_get_server_timestamp() {
let manager = TimeSyncManager::new();
let local_time = TimestampUtils::now_ms();
let server_time = local_time + 1000;
manager.update_offset(server_time);
let estimated = manager.get_server_timestamp();
let diff = (estimated - server_time).abs();
assert!(diff < 100, "Difference was: {}", diff);
}
#[test]
fn test_reset() {
let manager = TimeSyncManager::new();
manager.update_offset(TimestampUtils::now_ms());
assert!(manager.is_initialized());
manager.reset();
assert!(!manager.is_initialized());
assert_eq!(manager.get_offset(), 0);
assert_eq!(manager.last_sync_time(), 0);
}
#[test]
fn test_thread_safety_concurrent_reads() {
use std::sync::Arc;
let manager = Arc::new(TimeSyncManager::new());
manager.update_offset(TimestampUtils::now_ms() + 500);
let mut handles = vec![];
for _ in 0..10 {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for _ in 0..100 {
let _ = manager_clone.get_server_timestamp();
let _ = manager_clone.get_offset();
let _ = manager_clone.is_initialized();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_thread_safety_concurrent_writes() {
use std::sync::Arc;
let manager = Arc::new(TimeSyncManager::new());
let mut handles = vec![];
for i in 0..5 {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for j in 0..20 {
let server_time = TimestampUtils::now_ms() + (i * 100 + j) as i64;
manager_clone.update_offset(server_time);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(manager.is_initialized());
}
#[test]
fn test_thread_safety_concurrent_read_write() {
use std::sync::Arc;
let manager = Arc::new(TimeSyncManager::new());
manager.update_offset(TimestampUtils::now_ms());
let mut handles = vec![];
for _ in 0..5 {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for _ in 0..100 {
let _ = manager_clone.get_server_timestamp();
let _ = manager_clone.needs_resync();
}
});
handles.push(handle);
}
for i in 0..3 {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for j in 0..50 {
let server_time = TimestampUtils::now_ms() + (i * 10 + j) as i64;
manager_clone.update_offset(server_time);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(manager.is_initialized());
assert!(manager.last_sync_time() > 0);
}
#[test]
fn test_offset_calculation_with_negative_offset() {
let manager = TimeSyncManager::new();
let local_time = TimestampUtils::now_ms();
let server_time = local_time - 500;
manager.update_offset(server_time);
let offset = manager.get_offset();
assert!((-600..=-400).contains(&offset), "Offset was: {}", offset);
}
#[test]
fn test_saturating_arithmetic() {
let manager = TimeSyncManager::new();
manager.time_offset.store(i64::MAX, Ordering::Release);
manager.initialized.store(true, Ordering::Release);
let timestamp = manager.get_server_timestamp();
assert!(timestamp > 0);
}
}