1use chrono::{DateTime, Duration, Utc};
11use once_cell::sync::Lazy;
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, HashMap};
14use std::sync::{Arc, RwLock};
15use tracing::{debug, info, warn};
16
17pub type TimeChangeCallback = Arc<dyn Fn(DateTime<Utc>, DateTime<Utc>) + Send + Sync>;
19
20#[derive(Clone)]
22pub struct VirtualClock {
23 current_time: Arc<RwLock<Option<DateTime<Utc>>>>,
25 enabled: Arc<RwLock<bool>>,
27 scale_factor: Arc<RwLock<f64>>,
29 baseline_real_time: Arc<RwLock<Option<DateTime<Utc>>>>,
31 #[cfg_attr(not(test), allow(dead_code))]
33 time_change_callbacks: Arc<RwLock<Vec<TimeChangeCallback>>>,
34}
35
36impl std::fmt::Debug for VirtualClock {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("VirtualClock")
39 .field("current_time", &self.current_time.read().unwrap())
40 .field("enabled", &self.enabled.read().unwrap())
41 .field("scale_factor", &self.scale_factor.read().unwrap())
42 .field("baseline_real_time", &self.baseline_real_time.read().unwrap())
43 .field("callback_count", &self.time_change_callbacks.read().unwrap().len())
44 .finish()
45 }
46}
47
48impl Default for VirtualClock {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl VirtualClock {
55 pub fn new() -> Self {
57 Self {
58 current_time: Arc::new(RwLock::new(None)),
59 enabled: Arc::new(RwLock::new(false)),
60 scale_factor: Arc::new(RwLock::new(1.0)),
61 baseline_real_time: Arc::new(RwLock::new(None)),
62 time_change_callbacks: Arc::new(RwLock::new(Vec::new())),
63 }
64 }
65
66 pub fn on_time_change<F>(&self, callback: F)
70 where
71 F: Fn(DateTime<Utc>, DateTime<Utc>) + Send + Sync + 'static,
72 {
73 let mut callbacks = self.time_change_callbacks.write().unwrap();
74 callbacks.push(Arc::new(callback));
75 }
76
77 fn invoke_time_change_callbacks(&self, old_time: DateTime<Utc>, new_time: DateTime<Utc>) {
79 let callbacks = self.time_change_callbacks.read().unwrap();
80 for callback in callbacks.iter() {
81 callback(old_time, new_time);
82 }
83 }
84
85 pub fn new_at(time: DateTime<Utc>) -> Self {
87 let clock = Self::new();
88 clock.enable_and_set(time);
89 clock
90 }
91
92 pub fn enable_and_set(&self, time: DateTime<Utc>) {
94 let mut current = self.current_time.write().unwrap();
95 *current = Some(time);
96
97 let mut enabled = self.enabled.write().unwrap();
98 *enabled = true;
99
100 let mut baseline = self.baseline_real_time.write().unwrap();
101 *baseline = Some(Utc::now());
102
103 info!("Time travel enabled at {}", time);
104 }
105
106 pub fn disable(&self) {
108 let mut enabled = self.enabled.write().unwrap();
109 *enabled = false;
110
111 let mut current = self.current_time.write().unwrap();
112 *current = None;
113
114 let mut baseline = self.baseline_real_time.write().unwrap();
115 *baseline = None;
116
117 info!("Time travel disabled, using real time");
118 }
119
120 pub fn is_enabled(&self) -> bool {
122 *self.enabled.read().unwrap()
123 }
124
125 pub fn now(&self) -> DateTime<Utc> {
127 let enabled = *self.enabled.read().unwrap();
128
129 if !enabled {
130 return Utc::now();
131 }
132
133 let current = self.current_time.read().unwrap();
134 let scale = *self.scale_factor.read().unwrap();
135
136 if let Some(virtual_time) = *current {
137 if (scale - 1.0).abs() < f64::EPSILON {
139 return virtual_time;
140 }
141
142 let baseline = self.baseline_real_time.read().unwrap();
144 if let Some(baseline_real) = *baseline {
145 let elapsed_real = Utc::now() - baseline_real;
146 let elapsed_scaled =
147 Duration::milliseconds((elapsed_real.num_milliseconds() as f64 * scale) as i64);
148 return virtual_time + elapsed_scaled;
149 }
150
151 virtual_time
152 } else {
153 Utc::now()
154 }
155 }
156
157 pub fn advance(&self, duration: Duration) {
159 let enabled = *self.enabled.read().unwrap();
160 if !enabled {
161 warn!("Cannot advance time: time travel is not enabled");
162 return;
163 }
164
165 let mut current = self.current_time.write().unwrap();
166 if let Some(time) = *current {
167 let old_time = time;
168 let new_time = time + duration;
169 *current = Some(new_time);
170
171 let mut baseline = self.baseline_real_time.write().unwrap();
173 *baseline = Some(Utc::now());
174
175 drop(current);
177 drop(baseline);
178 self.invoke_time_change_callbacks(old_time, new_time);
179
180 info!("Time advanced by {} to {}", duration, new_time);
181 }
182 }
183
184 pub fn set_scale(&self, factor: f64) {
186 if factor <= 0.0 {
187 warn!("Invalid scale factor: {}, must be positive", factor);
188 return;
189 }
190
191 let mut scale = self.scale_factor.write().unwrap();
192 *scale = factor;
193
194 let mut baseline = self.baseline_real_time.write().unwrap();
196 *baseline = Some(Utc::now());
197
198 info!("Time scale set to {}x", factor);
199 }
200
201 pub fn get_scale(&self) -> f64 {
203 *self.scale_factor.read().unwrap()
204 }
205
206 pub fn reset(&self) {
208 self.disable();
209 info!("Time travel reset to real time");
210 }
211
212 pub fn set_time(&self, time: DateTime<Utc>) {
214 let enabled = *self.enabled.read().unwrap();
215 if !enabled {
216 self.enable_and_set(time);
217 return;
218 }
219
220 let mut current = self.current_time.write().unwrap();
221 let old_time = *current;
222 *current = Some(time);
223
224 let mut baseline = self.baseline_real_time.write().unwrap();
226 *baseline = Some(Utc::now());
227
228 if let Some(old) = old_time {
230 drop(current);
231 drop(baseline);
232 self.invoke_time_change_callbacks(old, time);
233 }
234
235 info!("Virtual time set to {}", time);
236 }
237
238 pub fn status(&self) -> TimeTravelStatus {
240 TimeTravelStatus {
241 enabled: self.is_enabled(),
242 current_time: if self.is_enabled() {
243 Some(self.now())
244 } else {
245 None
246 },
247 scale_factor: self.get_scale(),
248 real_time: Utc::now(),
249 }
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct TimeTravelStatus {
256 pub enabled: bool,
258 pub current_time: Option<DateTime<Utc>>,
260 pub scale_factor: f64,
262 pub real_time: DateTime<Utc>,
264}
265
266#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct TimeTravelConfig {
270 #[serde(default)]
272 pub enabled: bool,
273 #[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
275 pub initial_time: Option<DateTime<Utc>>,
276 #[serde(default = "default_scale")]
278 pub scale_factor: f64,
279 #[serde(default = "default_true")]
281 pub enable_scheduling: bool,
282}
283
284fn default_scale() -> f64 {
285 1.0
286}
287
288fn default_true() -> bool {
289 true
290}
291
292impl Default for TimeTravelConfig {
293 fn default() -> Self {
294 Self {
295 enabled: false,
296 initial_time: None,
297 scale_factor: 1.0,
298 enable_scheduling: true,
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
305pub struct ResponseScheduler {
306 clock: Arc<VirtualClock>,
308 scheduled: Arc<RwLock<BTreeMap<DateTime<Utc>, Vec<ScheduledResponse>>>>,
310 named_schedules: Arc<RwLock<HashMap<String, String>>>,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct ScheduledResponse {
317 pub id: String,
319 pub trigger_time: DateTime<Utc>,
321 pub body: serde_json::Value,
323 #[serde(default = "default_status")]
325 pub status: u16,
326 #[serde(default)]
328 pub headers: HashMap<String, String>,
329 pub name: Option<String>,
331 #[serde(default)]
333 pub repeat: Option<RepeatConfig>,
334}
335
336fn default_status() -> u16 {
337 200
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct RepeatConfig {
343 pub interval: Duration,
345 pub max_count: Option<usize>,
347}
348
349impl ResponseScheduler {
350 pub fn new(clock: Arc<VirtualClock>) -> Self {
352 Self {
353 clock,
354 scheduled: Arc::new(RwLock::new(BTreeMap::new())),
355 named_schedules: Arc::new(RwLock::new(HashMap::new())),
356 }
357 }
358
359 pub fn schedule(&self, response: ScheduledResponse) -> Result<String, String> {
361 let id = if response.id.is_empty() {
362 uuid::Uuid::new_v4().to_string()
363 } else {
364 response.id.clone()
365 };
366
367 let mut scheduled = self.scheduled.write().unwrap();
368 scheduled.entry(response.trigger_time).or_default().push(response.clone());
369
370 if let Some(name) = &response.name {
371 let mut named = self.named_schedules.write().unwrap();
372 named.insert(name.clone(), id.clone());
373 }
374
375 info!("Scheduled response {} for {}", id, response.trigger_time);
376 Ok(id)
377 }
378
379 pub fn get_due_responses(&self) -> Vec<ScheduledResponse> {
381 let now = self.clock.now();
382 let mut scheduled = self.scheduled.write().unwrap();
383 let mut due = Vec::new();
384
385 let times_to_process: Vec<DateTime<Utc>> =
387 scheduled.range(..=now).map(|(time, _)| *time).collect();
388
389 for time in times_to_process {
390 if let Some(responses) = scheduled.remove(&time) {
391 for response in responses {
392 due.push(response.clone());
393
394 if let Some(repeat_config) = &response.repeat {
396 let next_time = time + repeat_config.interval;
397
398 let should_repeat = if let Some(max) = repeat_config.max_count {
400 max > 1
402 } else {
403 true
404 };
405
406 if should_repeat {
407 let mut next_response = response.clone();
408 next_response.trigger_time = next_time;
409 if let Some(ref mut repeat) = next_response.repeat {
410 if let Some(ref mut count) = repeat.max_count {
411 *count -= 1;
412 }
413 }
414
415 scheduled.entry(next_time).or_default().push(next_response);
416 }
417 }
418 }
419 }
420 }
421
422 debug!("Found {} due responses at {}", due.len(), now);
423 due
424 }
425
426 pub fn cancel(&self, id: &str) -> bool {
428 let mut scheduled = self.scheduled.write().unwrap();
429
430 for responses in scheduled.values_mut() {
431 if let Some(pos) = responses.iter().position(|r| r.id == id) {
432 responses.remove(pos);
433 info!("Cancelled scheduled response {}", id);
434 return true;
435 }
436 }
437
438 false
439 }
440
441 pub fn clear_all(&self) {
443 let mut scheduled = self.scheduled.write().unwrap();
444 scheduled.clear();
445
446 let mut named = self.named_schedules.write().unwrap();
447 named.clear();
448
449 info!("Cleared all scheduled responses");
450 }
451
452 pub fn list_scheduled(&self) -> Vec<ScheduledResponse> {
454 let scheduled = self.scheduled.read().unwrap();
455 scheduled.values().flat_map(|v| v.iter().cloned()).collect()
456 }
457
458 pub fn count(&self) -> usize {
460 let scheduled = self.scheduled.read().unwrap();
461 scheduled.values().map(|v| v.len()).sum()
462 }
463}
464
465#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct TimeScenario {
468 pub name: String,
470 pub enabled: bool,
472 pub current_time: Option<DateTime<Utc>>,
474 pub scale_factor: f64,
476 #[serde(default)]
478 pub scheduled_responses: Vec<ScheduledResponse>,
479 pub created_at: DateTime<Utc>,
481 #[serde(default)]
483 pub description: Option<String>,
484}
485
486impl TimeScenario {
487 pub fn from_manager(manager: &TimeTravelManager, name: String) -> Self {
489 let status = manager.clock().status();
490 let scheduled = manager.scheduler().list_scheduled();
491
492 Self {
493 name,
494 enabled: status.enabled,
495 current_time: status.current_time,
496 scale_factor: status.scale_factor,
497 scheduled_responses: scheduled,
498 created_at: Utc::now(),
499 description: None,
500 }
501 }
502
503 pub fn apply_to_manager(&self, manager: &TimeTravelManager) {
505 if self.enabled {
506 if let Some(time) = self.current_time {
507 manager.enable_and_set(time);
508 } else {
509 manager.enable_and_set(Utc::now());
510 }
511 manager.set_scale(self.scale_factor);
512 } else {
513 manager.disable();
514 }
515
516 manager.scheduler().clear_all();
518 for response in &self.scheduled_responses {
519 let _ = manager.scheduler().schedule(response.clone());
520 }
521 }
522}
523
524static GLOBAL_CLOCK_REGISTRY: Lazy<Arc<RwLock<Option<Arc<VirtualClock>>>>> =
531 Lazy::new(|| Arc::new(RwLock::new(None)));
532
533pub fn register_global_clock(clock: Arc<VirtualClock>) {
538 let mut registry = GLOBAL_CLOCK_REGISTRY.write().unwrap();
539 *registry = Some(clock);
540 info!("Virtual clock registered globally");
541}
542
543pub fn unregister_global_clock() {
547 let mut registry = GLOBAL_CLOCK_REGISTRY.write().unwrap();
548 *registry = None;
549 info!("Virtual clock unregistered globally");
550}
551
552pub fn get_global_clock() -> Option<Arc<VirtualClock>> {
556 let registry = GLOBAL_CLOCK_REGISTRY.read().unwrap();
557 registry.clone()
558}
559
560pub fn now() -> DateTime<Utc> {
567 if let Some(clock) = get_global_clock() {
568 clock.now()
569 } else {
570 Utc::now()
571 }
572}
573
574pub fn is_time_travel_enabled() -> bool {
578 if let Some(clock) = get_global_clock() {
579 clock.is_enabled()
580 } else {
581 false
582 }
583}
584
585pub struct TimeTravelManager {
587 clock: Arc<VirtualClock>,
589 scheduler: Arc<ResponseScheduler>,
591 cron_scheduler: Arc<cron::CronScheduler>,
593}
594
595impl TimeTravelManager {
596 pub fn new(config: TimeTravelConfig) -> Self {
601 let clock = Arc::new(VirtualClock::new());
602
603 if config.enabled {
604 if let Some(initial_time) = config.initial_time {
605 clock.enable_and_set(initial_time);
606 } else {
607 clock.enable_and_set(Utc::now());
608 }
609 clock.set_scale(config.scale_factor);
610 register_global_clock(clock.clone());
612 }
613
614 let scheduler = Arc::new(ResponseScheduler::new(clock.clone()));
615 let cron_scheduler = Arc::new(
616 cron::CronScheduler::new(clock.clone())
617 .with_response_scheduler(scheduler.clone())
618 );
619
620 Self {
621 clock,
622 scheduler,
623 cron_scheduler,
624 }
625 }
626
627 pub fn clock(&self) -> Arc<VirtualClock> {
629 self.clock.clone()
630 }
631
632 pub fn scheduler(&self) -> Arc<ResponseScheduler> {
634 self.scheduler.clone()
635 }
636
637 pub fn cron_scheduler(&self) -> Arc<cron::CronScheduler> {
639 self.cron_scheduler.clone()
640 }
641
642 pub fn now(&self) -> DateTime<Utc> {
644 self.clock.now()
645 }
646
647 pub fn save_scenario(&self, name: String) -> TimeScenario {
649 TimeScenario::from_manager(self, name)
650 }
651
652 pub fn load_scenario(&self, scenario: &TimeScenario) {
654 scenario.apply_to_manager(self);
655 }
656
657 pub fn enable_and_set(&self, time: DateTime<Utc>) {
661 self.clock.enable_and_set(time);
662 register_global_clock(self.clock.clone());
663 }
664
665 pub fn disable(&self) {
669 self.clock.disable();
670 unregister_global_clock();
671 }
672
673 pub fn advance(&self, duration: Duration) {
677 self.clock.advance(duration);
678 }
679
680 pub fn set_time(&self, time: DateTime<Utc>) {
684 self.clock.set_time(time);
685 if self.clock.is_enabled() {
686 register_global_clock(self.clock.clone());
687 }
688 }
689
690 pub fn set_scale(&self, factor: f64) {
694 self.clock.set_scale(factor);
695 }
696}
697
698impl Drop for TimeTravelManager {
699 fn drop(&mut self) {
700 unregister_global_clock();
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708
709 #[test]
710 fn test_virtual_clock_creation() {
711 let clock = VirtualClock::new();
712 assert!(!clock.is_enabled());
713 }
714
715 #[test]
716 fn test_virtual_clock_enable() {
717 let clock = VirtualClock::new();
718 let test_time = Utc::now();
719 clock.enable_and_set(test_time);
720
721 assert!(clock.is_enabled());
722 let now = clock.now();
723 assert!((now - test_time).num_seconds().abs() < 1);
724 }
725
726 #[test]
727 fn test_virtual_clock_advance() {
728 let clock = VirtualClock::new();
729 let test_time = Utc::now();
730 clock.enable_and_set(test_time);
731
732 clock.advance(Duration::hours(2));
733 let now = clock.now();
734
735 assert!((now - test_time - Duration::hours(2)).num_seconds().abs() < 1);
736 }
737
738 #[test]
739 fn test_virtual_clock_scale() {
740 let clock = VirtualClock::new();
741 clock.set_scale(2.0);
742 assert_eq!(clock.get_scale(), 2.0);
743 }
744
745 #[test]
746 fn test_response_scheduler() {
747 let clock = Arc::new(VirtualClock::new());
748 let test_time = Utc::now();
749 clock.enable_and_set(test_time);
750
751 let scheduler = ResponseScheduler::new(clock.clone());
752
753 let response = ScheduledResponse {
754 id: "test-1".to_string(),
755 trigger_time: test_time + Duration::seconds(10),
756 body: serde_json::json!({"message": "Hello"}),
757 status: 200,
758 headers: HashMap::new(),
759 name: Some("test".to_string()),
760 repeat: None,
761 };
762
763 let id = scheduler.schedule(response).unwrap();
764 assert_eq!(id, "test-1");
765 assert_eq!(scheduler.count(), 1);
766 }
767
768 #[test]
769 fn test_scheduled_response_triggering() {
770 let clock = Arc::new(VirtualClock::new());
771 let test_time = Utc::now();
772 clock.enable_and_set(test_time);
773
774 let scheduler = ResponseScheduler::new(clock.clone());
775
776 let response = ScheduledResponse {
777 id: "test-1".to_string(),
778 trigger_time: test_time + Duration::seconds(10),
779 body: serde_json::json!({"message": "Hello"}),
780 status: 200,
781 headers: HashMap::new(),
782 name: None,
783 repeat: None,
784 };
785
786 scheduler.schedule(response).unwrap();
787
788 let due = scheduler.get_due_responses();
790 assert_eq!(due.len(), 0);
791
792 clock.advance(Duration::seconds(15));
794
795 let due = scheduler.get_due_responses();
797 assert_eq!(due.len(), 1);
798 }
799
800 #[test]
801 fn test_time_travel_config() {
802 let config = TimeTravelConfig::default();
803 assert!(!config.enabled);
804 assert_eq!(config.scale_factor, 1.0);
805 assert!(config.enable_scheduling);
806 }
807
808 #[test]
809 fn test_time_travel_manager() {
810 let config = TimeTravelConfig {
811 enabled: true,
812 initial_time: Some(Utc::now()),
813 scale_factor: 1.0,
814 enable_scheduling: true,
815 };
816
817 let manager = TimeTravelManager::new(config);
818 assert!(manager.clock().is_enabled());
819 }
820
821 #[test]
822 fn test_one_month_later_scenario() {
823 let clock = Arc::new(VirtualClock::new());
824 let initial_time = Utc::now();
825 clock.enable_and_set(initial_time);
826
827 clock.advance(Duration::days(30));
829
830 let final_time = clock.now();
831 let elapsed = final_time - initial_time;
832
833 assert!(elapsed.num_days() >= 29 && elapsed.num_days() <= 31);
835 }
836
837 #[test]
838 fn test_scenario_save_and_load() {
839 let config = TimeTravelConfig {
840 enabled: true,
841 initial_time: Some(Utc::now()),
842 scale_factor: 2.0,
843 enable_scheduling: true,
844 };
845
846 let manager = TimeTravelManager::new(config);
847
848 manager.clock().advance(Duration::hours(24));
850
851 let scenario = manager.save_scenario("test-scenario".to_string());
853 assert_eq!(scenario.name, "test-scenario");
854 assert!(scenario.enabled);
855 assert_eq!(scenario.scale_factor, 2.0);
856 assert!(scenario.current_time.is_some());
857
858 let new_config = TimeTravelConfig::default();
860 let new_manager = TimeTravelManager::new(new_config);
861
862 new_manager.load_scenario(&scenario);
864
865 assert!(new_manager.clock().is_enabled());
867 assert_eq!(new_manager.clock().get_scale(), 2.0);
868 if let Some(saved_time) = scenario.current_time {
869 let loaded_time = new_manager.clock().now();
870 assert!((loaded_time - saved_time).num_seconds().abs() < 1);
872 }
873 }
874
875 #[test]
876 fn test_duration_parsing_month_year() {
877 let clock = Arc::new(VirtualClock::new());
879 let initial_time = Utc::now();
880 clock.enable_and_set(initial_time);
881
882 clock.advance(Duration::days(30));
884 let after_month = clock.now();
885 let month_elapsed = after_month - initial_time;
886 assert!(month_elapsed.num_days() >= 29 && month_elapsed.num_days() <= 31);
887
888 clock.set_time(initial_time);
890 clock.advance(Duration::days(365));
891 let after_year = clock.now();
892 let year_elapsed = after_year - initial_time;
893 assert!(year_elapsed.num_days() >= 364 && year_elapsed.num_days() <= 366);
894 }
895}
896
897pub mod cron;
899
900pub use cron::{CronJob, CronJobAction, CronScheduler};