use chrono::{DateTime, Duration, Utc};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock};
use tracing::{debug, info, warn};
pub type TimeChangeCallback = Arc<dyn Fn(DateTime<Utc>, DateTime<Utc>) + Send + Sync>;
#[derive(Clone)]
pub struct VirtualClock {
current_time: Arc<RwLock<Option<DateTime<Utc>>>>,
enabled: Arc<RwLock<bool>>,
scale_factor: Arc<RwLock<f64>>,
baseline_real_time: Arc<RwLock<Option<DateTime<Utc>>>>,
#[cfg_attr(not(test), allow(dead_code))]
time_change_callbacks: Arc<RwLock<Vec<TimeChangeCallback>>>,
}
impl std::fmt::Debug for VirtualClock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VirtualClock")
.field(
"current_time",
&*self.current_time.read().unwrap_or_else(|poisoned| poisoned.into_inner()),
)
.field(
"enabled",
&*self.enabled.read().unwrap_or_else(|poisoned| poisoned.into_inner()),
)
.field(
"scale_factor",
&*self.scale_factor.read().unwrap_or_else(|poisoned| poisoned.into_inner()),
)
.field(
"baseline_real_time",
&*self.baseline_real_time.read().unwrap_or_else(|poisoned| poisoned.into_inner()),
)
.field(
"callback_count",
&self
.time_change_callbacks
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.len(),
)
.finish()
}
}
impl Default for VirtualClock {
fn default() -> Self {
Self::new()
}
}
impl VirtualClock {
pub fn new() -> Self {
Self {
current_time: Arc::new(RwLock::new(None)),
enabled: Arc::new(RwLock::new(false)),
scale_factor: Arc::new(RwLock::new(1.0)),
baseline_real_time: Arc::new(RwLock::new(None)),
time_change_callbacks: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn on_time_change<F>(&self, callback: F)
where
F: Fn(DateTime<Utc>, DateTime<Utc>) + Send + Sync + 'static,
{
let mut callbacks = self
.time_change_callbacks
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
callbacks.push(Arc::new(callback));
}
fn invoke_time_change_callbacks(&self, old_time: DateTime<Utc>, new_time: DateTime<Utc>) {
let callbacks = self
.time_change_callbacks
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
for callback in callbacks.iter() {
callback(old_time, new_time);
}
}
pub fn new_at(time: DateTime<Utc>) -> Self {
let clock = Self::new();
clock.enable_and_set(time);
clock
}
pub fn enable_and_set(&self, time: DateTime<Utc>) {
let mut current =
self.current_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*current = Some(time);
let mut enabled = self.enabled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*enabled = true;
let mut baseline =
self.baseline_real_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*baseline = Some(Utc::now());
info!("Time travel enabled at {}", time);
}
pub fn disable(&self) {
let mut enabled = self.enabled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*enabled = false;
let mut current =
self.current_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*current = None;
let mut baseline =
self.baseline_real_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*baseline = None;
info!("Time travel disabled, using real time");
}
pub fn is_enabled(&self) -> bool {
*self.enabled.read().unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub fn now(&self) -> DateTime<Utc> {
let enabled = *self.enabled.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if !enabled {
return Utc::now();
}
let current = self.current_time.read().unwrap_or_else(|poisoned| poisoned.into_inner());
let scale = *self.scale_factor.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(virtual_time) = *current {
if (scale - 1.0).abs() < f64::EPSILON {
return virtual_time;
}
let baseline =
self.baseline_real_time.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(baseline_real) = *baseline {
let elapsed_real = Utc::now() - baseline_real;
let elapsed_scaled =
Duration::milliseconds((elapsed_real.num_milliseconds() as f64 * scale) as i64);
return virtual_time + elapsed_scaled;
}
virtual_time
} else {
Utc::now()
}
}
pub fn advance(&self, duration: Duration) {
let enabled = *self.enabled.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if !enabled {
warn!("Cannot advance time: time travel is not enabled");
return;
}
let mut current =
self.current_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(time) = *current {
let old_time = time;
let new_time = time + duration;
*current = Some(new_time);
let mut baseline =
self.baseline_real_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*baseline = Some(Utc::now());
drop(current);
drop(baseline);
self.invoke_time_change_callbacks(old_time, new_time);
info!("Time advanced by {} to {}", duration, new_time);
}
}
pub fn set_scale(&self, factor: f64) {
if factor <= 0.0 {
warn!("Invalid scale factor: {}, must be positive", factor);
return;
}
let mut scale = self.scale_factor.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*scale = factor;
let mut baseline =
self.baseline_real_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*baseline = Some(Utc::now());
info!("Time scale set to {}x", factor);
}
pub fn get_scale(&self) -> f64 {
*self.scale_factor.read().unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub fn reset(&self) {
self.disable();
info!("Time travel reset to real time");
}
pub fn set_time(&self, time: DateTime<Utc>) {
let enabled = *self.enabled.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if !enabled {
self.enable_and_set(time);
return;
}
let mut current =
self.current_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
let old_time = *current;
*current = Some(time);
let mut baseline =
self.baseline_real_time.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*baseline = Some(Utc::now());
if let Some(old) = old_time {
drop(current);
drop(baseline);
self.invoke_time_change_callbacks(old, time);
}
info!("Virtual time set to {}", time);
}
pub fn status(&self) -> TimeTravelStatus {
TimeTravelStatus {
enabled: self.is_enabled(),
current_time: if self.is_enabled() {
Some(self.now())
} else {
None
},
scale_factor: self.get_scale(),
real_time: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeTravelStatus {
pub enabled: bool,
pub current_time: Option<DateTime<Utc>>,
pub scale_factor: f64,
pub real_time: DateTime<Utc>,
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeTravelConfig {
#[serde(default)]
pub enabled: bool,
#[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
pub initial_time: Option<DateTime<Utc>>,
#[serde(default = "default_scale")]
pub scale_factor: f64,
#[serde(default = "default_true")]
pub enable_scheduling: bool,
}
fn default_scale() -> f64 {
1.0
}
fn default_true() -> bool {
true
}
impl Default for TimeTravelConfig {
fn default() -> Self {
Self {
enabled: false,
initial_time: None,
scale_factor: 1.0,
enable_scheduling: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ResponseScheduler {
clock: Arc<VirtualClock>,
scheduled: Arc<RwLock<BTreeMap<DateTime<Utc>, Vec<ScheduledResponse>>>>,
named_schedules: Arc<RwLock<HashMap<String, String>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledResponse {
pub id: String,
pub trigger_time: DateTime<Utc>,
pub body: serde_json::Value,
#[serde(default = "default_status")]
pub status: u16,
#[serde(default)]
pub headers: HashMap<String, String>,
pub name: Option<String>,
#[serde(default)]
pub repeat: Option<RepeatConfig>,
}
fn default_status() -> u16 {
200
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepeatConfig {
pub interval: Duration,
pub max_count: Option<usize>,
}
impl ResponseScheduler {
pub fn new(clock: Arc<VirtualClock>) -> Self {
Self {
clock,
scheduled: Arc::new(RwLock::new(BTreeMap::new())),
named_schedules: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn schedule(&self, response: ScheduledResponse) -> Result<String, String> {
let id = if response.id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
response.id.clone()
};
let mut scheduled = self.scheduled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
scheduled.entry(response.trigger_time).or_default().push(response.clone());
if let Some(name) = &response.name {
let mut named =
self.named_schedules.write().unwrap_or_else(|poisoned| poisoned.into_inner());
named.insert(name.clone(), id.clone());
}
info!("Scheduled response {} for {}", id, response.trigger_time);
Ok(id)
}
pub fn get_due_responses(&self) -> Vec<ScheduledResponse> {
let now = self.clock.now();
let mut scheduled = self.scheduled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
let mut due = Vec::new();
let times_to_process: Vec<DateTime<Utc>> =
scheduled.range(..=now).map(|(time, _)| *time).collect();
for time in times_to_process {
if let Some(responses) = scheduled.remove(&time) {
for response in responses {
due.push(response.clone());
if let Some(repeat_config) = &response.repeat {
let next_time = time + repeat_config.interval;
let should_repeat = if let Some(max) = repeat_config.max_count {
max > 1
} else {
true
};
if should_repeat {
let mut next_response = response.clone();
next_response.trigger_time = next_time;
if let Some(ref mut repeat) = next_response.repeat {
if let Some(ref mut count) = repeat.max_count {
*count -= 1;
}
}
scheduled.entry(next_time).or_default().push(next_response);
}
}
}
}
}
debug!("Found {} due responses at {}", due.len(), now);
due
}
pub fn cancel(&self, id: &str) -> bool {
let mut scheduled = self.scheduled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
for responses in scheduled.values_mut() {
if let Some(pos) = responses.iter().position(|r| r.id == id) {
responses.remove(pos);
info!("Cancelled scheduled response {}", id);
return true;
}
}
false
}
pub fn clear_all(&self) {
let mut scheduled = self.scheduled.write().unwrap_or_else(|poisoned| poisoned.into_inner());
scheduled.clear();
let mut named =
self.named_schedules.write().unwrap_or_else(|poisoned| poisoned.into_inner());
named.clear();
info!("Cleared all scheduled responses");
}
pub fn list_scheduled(&self) -> Vec<ScheduledResponse> {
let scheduled = self.scheduled.read().unwrap_or_else(|poisoned| poisoned.into_inner());
scheduled.values().flat_map(|v| v.iter().cloned()).collect()
}
pub fn count(&self) -> usize {
let scheduled = self.scheduled.read().unwrap_or_else(|poisoned| poisoned.into_inner());
scheduled.values().map(|v| v.len()).sum()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeScenario {
pub name: String,
pub enabled: bool,
pub current_time: Option<DateTime<Utc>>,
pub scale_factor: f64,
#[serde(default)]
pub scheduled_responses: Vec<ScheduledResponse>,
pub created_at: DateTime<Utc>,
#[serde(default)]
pub description: Option<String>,
}
impl TimeScenario {
pub fn from_manager(manager: &TimeTravelManager, name: String) -> Self {
let status = manager.clock().status();
let scheduled = manager.scheduler().list_scheduled();
Self {
name,
enabled: status.enabled,
current_time: status.current_time,
scale_factor: status.scale_factor,
scheduled_responses: scheduled,
created_at: Utc::now(),
description: None,
}
}
pub fn apply_to_manager(&self, manager: &TimeTravelManager) {
if self.enabled {
if let Some(time) = self.current_time {
manager.enable_and_set(time);
} else {
manager.enable_and_set(Utc::now());
}
manager.set_scale(self.scale_factor);
} else {
manager.disable();
}
manager.scheduler().clear_all();
for response in &self.scheduled_responses {
let _ = manager.scheduler().schedule(response.clone());
}
}
}
static GLOBAL_CLOCK_REGISTRY: Lazy<Arc<RwLock<Option<Arc<VirtualClock>>>>> =
Lazy::new(|| Arc::new(RwLock::new(None)));
pub fn register_global_clock(clock: Arc<VirtualClock>) {
let mut registry =
GLOBAL_CLOCK_REGISTRY.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*registry = Some(clock);
info!("Virtual clock registered globally");
}
pub fn unregister_global_clock() {
let mut registry =
GLOBAL_CLOCK_REGISTRY.write().unwrap_or_else(|poisoned| poisoned.into_inner());
*registry = None;
info!("Virtual clock unregistered globally");
}
pub fn get_global_clock() -> Option<Arc<VirtualClock>> {
let registry = GLOBAL_CLOCK_REGISTRY.read().unwrap_or_else(|poisoned| poisoned.into_inner());
registry.clone()
}
pub fn now() -> DateTime<Utc> {
if let Some(clock) = get_global_clock() {
clock.now()
} else {
Utc::now()
}
}
pub fn is_time_travel_enabled() -> bool {
if let Some(clock) = get_global_clock() {
clock.is_enabled()
} else {
false
}
}
pub struct TimeTravelManager {
clock: Arc<VirtualClock>,
scheduler: Arc<ResponseScheduler>,
cron_scheduler: Arc<CronScheduler>,
}
impl TimeTravelManager {
pub fn new(config: TimeTravelConfig) -> Self {
let clock = Arc::new(VirtualClock::new());
if config.enabled {
if let Some(initial_time) = config.initial_time {
clock.enable_and_set(initial_time);
} else {
clock.enable_and_set(Utc::now());
}
clock.set_scale(config.scale_factor);
register_global_clock(clock.clone());
}
let scheduler = Arc::new(ResponseScheduler::new(clock.clone()));
let cron_scheduler =
Arc::new(CronScheduler::new(clock.clone()).with_response_scheduler(scheduler.clone()));
Self {
clock,
scheduler,
cron_scheduler,
}
}
pub fn clock(&self) -> Arc<VirtualClock> {
self.clock.clone()
}
pub fn scheduler(&self) -> Arc<ResponseScheduler> {
self.scheduler.clone()
}
pub fn cron_scheduler(&self) -> Arc<CronScheduler> {
self.cron_scheduler.clone()
}
pub fn now(&self) -> DateTime<Utc> {
self.clock.now()
}
pub fn save_scenario(&self, name: String) -> TimeScenario {
TimeScenario::from_manager(self, name)
}
pub fn load_scenario(&self, scenario: &TimeScenario) {
scenario.apply_to_manager(self);
}
pub fn enable_and_set(&self, time: DateTime<Utc>) {
self.clock.enable_and_set(time);
register_global_clock(self.clock.clone());
}
pub fn disable(&self) {
self.clock.disable();
unregister_global_clock();
}
pub fn advance(&self, duration: Duration) {
self.clock.advance(duration);
}
pub fn set_time(&self, time: DateTime<Utc>) {
self.clock.set_time(time);
if self.clock.is_enabled() {
register_global_clock(self.clock.clone());
}
}
pub fn set_scale(&self, factor: f64) {
self.clock.set_scale(factor);
}
}
impl Drop for TimeTravelManager {
fn drop(&mut self) {
unregister_global_clock();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_virtual_clock_creation() {
let clock = VirtualClock::new();
assert!(!clock.is_enabled());
}
#[test]
fn test_virtual_clock_enable() {
let clock = VirtualClock::new();
let test_time = Utc::now();
clock.enable_and_set(test_time);
assert!(clock.is_enabled());
let now = clock.now();
assert!((now - test_time).num_seconds().abs() < 1);
}
#[test]
fn test_virtual_clock_advance() {
let clock = VirtualClock::new();
let test_time = Utc::now();
clock.enable_and_set(test_time);
clock.advance(Duration::hours(2));
let now = clock.now();
assert!((now - test_time - Duration::hours(2)).num_seconds().abs() < 1);
}
#[test]
fn test_virtual_clock_scale() {
let clock = VirtualClock::new();
clock.set_scale(2.0);
assert_eq!(clock.get_scale(), 2.0);
}
#[test]
fn test_response_scheduler() {
let clock = Arc::new(VirtualClock::new());
let test_time = Utc::now();
clock.enable_and_set(test_time);
let scheduler = ResponseScheduler::new(clock.clone());
let response = ScheduledResponse {
id: "test-1".to_string(),
trigger_time: test_time + Duration::seconds(10),
body: serde_json::json!({"message": "Hello"}),
status: 200,
headers: HashMap::new(),
name: Some("test".to_string()),
repeat: None,
};
let id = scheduler.schedule(response).unwrap();
assert_eq!(id, "test-1");
assert_eq!(scheduler.count(), 1);
}
#[test]
fn test_scheduled_response_triggering() {
let clock = Arc::new(VirtualClock::new());
let test_time = Utc::now();
clock.enable_and_set(test_time);
let scheduler = ResponseScheduler::new(clock.clone());
let response = ScheduledResponse {
id: "test-1".to_string(),
trigger_time: test_time + Duration::seconds(10),
body: serde_json::json!({"message": "Hello"}),
status: 200,
headers: HashMap::new(),
name: None,
repeat: None,
};
scheduler.schedule(response).unwrap();
let due = scheduler.get_due_responses();
assert_eq!(due.len(), 0);
clock.advance(Duration::seconds(15));
let due = scheduler.get_due_responses();
assert_eq!(due.len(), 1);
}
#[test]
fn test_time_travel_config() {
let config = TimeTravelConfig::default();
assert!(!config.enabled);
assert_eq!(config.scale_factor, 1.0);
assert!(config.enable_scheduling);
}
#[test]
fn test_time_travel_manager() {
let config = TimeTravelConfig {
enabled: true,
initial_time: Some(Utc::now()),
scale_factor: 1.0,
enable_scheduling: true,
};
let manager = TimeTravelManager::new(config);
assert!(manager.clock().is_enabled());
}
#[test]
fn test_one_month_later_scenario() {
let clock = Arc::new(VirtualClock::new());
let initial_time = Utc::now();
clock.enable_and_set(initial_time);
clock.advance(Duration::days(30));
let final_time = clock.now();
let elapsed = final_time - initial_time;
assert!(elapsed.num_days() >= 29 && elapsed.num_days() <= 31);
}
#[test]
fn test_scenario_save_and_load() {
let config = TimeTravelConfig {
enabled: true,
initial_time: Some(Utc::now()),
scale_factor: 2.0,
enable_scheduling: true,
};
let manager = TimeTravelManager::new(config);
manager.clock().advance(Duration::hours(24));
let scenario = manager.save_scenario("test-scenario".to_string());
assert_eq!(scenario.name, "test-scenario");
assert!(scenario.enabled);
assert_eq!(scenario.scale_factor, 2.0);
assert!(scenario.current_time.is_some());
let new_config = TimeTravelConfig::default();
let new_manager = TimeTravelManager::new(new_config);
new_manager.load_scenario(&scenario);
assert!(new_manager.clock().is_enabled());
assert_eq!(new_manager.clock().get_scale(), 2.0);
if let Some(saved_time) = scenario.current_time {
let loaded_time = new_manager.clock().now();
assert!((loaded_time - saved_time).num_seconds().abs() < 1);
}
}
#[test]
fn test_duration_parsing_month_year() {
let clock = Arc::new(VirtualClock::new());
let initial_time = Utc::now();
clock.enable_and_set(initial_time);
clock.advance(Duration::days(30));
let after_month = clock.now();
let month_elapsed = after_month - initial_time;
assert!(month_elapsed.num_days() >= 29 && month_elapsed.num_days() <= 31);
clock.set_time(initial_time);
clock.advance(Duration::days(365));
let after_year = clock.now();
let year_elapsed = after_year - initial_time;
assert!(year_elapsed.num_days() >= 364 && year_elapsed.num_days() <= 366);
}
}
pub mod cron;
pub use cron::{CronJob, CronJobAction, CronScheduler};