1use chrono::{DateTime, Duration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::{BTreeMap, HashMap};
13use std::sync::{Arc, RwLock};
14use tracing::{debug, info, warn};
15
16#[derive(Debug, Clone)]
18pub struct VirtualClock {
19 current_time: Arc<RwLock<Option<DateTime<Utc>>>>,
21 enabled: Arc<RwLock<bool>>,
23 scale_factor: Arc<RwLock<f64>>,
25 baseline_real_time: Arc<RwLock<Option<DateTime<Utc>>>>,
27}
28
29impl Default for VirtualClock {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl VirtualClock {
36 pub fn new() -> Self {
38 Self {
39 current_time: Arc::new(RwLock::new(None)),
40 enabled: Arc::new(RwLock::new(false)),
41 scale_factor: Arc::new(RwLock::new(1.0)),
42 baseline_real_time: Arc::new(RwLock::new(None)),
43 }
44 }
45
46 pub fn new_at(time: DateTime<Utc>) -> Self {
48 let clock = Self::new();
49 clock.enable_and_set(time);
50 clock
51 }
52
53 pub fn enable_and_set(&self, time: DateTime<Utc>) {
55 let mut current = self.current_time.write().unwrap();
56 *current = Some(time);
57
58 let mut enabled = self.enabled.write().unwrap();
59 *enabled = true;
60
61 let mut baseline = self.baseline_real_time.write().unwrap();
62 *baseline = Some(Utc::now());
63
64 info!("Time travel enabled at {}", time);
65 }
66
67 pub fn disable(&self) {
69 let mut enabled = self.enabled.write().unwrap();
70 *enabled = false;
71
72 let mut current = self.current_time.write().unwrap();
73 *current = None;
74
75 let mut baseline = self.baseline_real_time.write().unwrap();
76 *baseline = None;
77
78 info!("Time travel disabled, using real time");
79 }
80
81 pub fn is_enabled(&self) -> bool {
83 *self.enabled.read().unwrap()
84 }
85
86 pub fn now(&self) -> DateTime<Utc> {
88 let enabled = *self.enabled.read().unwrap();
89
90 if !enabled {
91 return Utc::now();
92 }
93
94 let current = self.current_time.read().unwrap();
95 let scale = *self.scale_factor.read().unwrap();
96
97 if let Some(virtual_time) = *current {
98 if (scale - 1.0).abs() < f64::EPSILON {
100 return virtual_time;
101 }
102
103 let baseline = self.baseline_real_time.read().unwrap();
105 if let Some(baseline_real) = *baseline {
106 let elapsed_real = Utc::now() - baseline_real;
107 let elapsed_scaled =
108 Duration::milliseconds((elapsed_real.num_milliseconds() as f64 * scale) as i64);
109 return virtual_time + elapsed_scaled;
110 }
111
112 virtual_time
113 } else {
114 Utc::now()
115 }
116 }
117
118 pub fn advance(&self, duration: Duration) {
120 let enabled = *self.enabled.read().unwrap();
121 if !enabled {
122 warn!("Cannot advance time: time travel is not enabled");
123 return;
124 }
125
126 let mut current = self.current_time.write().unwrap();
127 if let Some(time) = *current {
128 let new_time = time + duration;
129 *current = Some(new_time);
130
131 let mut baseline = self.baseline_real_time.write().unwrap();
133 *baseline = Some(Utc::now());
134
135 info!("Time advanced by {} to {}", duration, new_time);
136 }
137 }
138
139 pub fn set_scale(&self, factor: f64) {
141 if factor <= 0.0 {
142 warn!("Invalid scale factor: {}, must be positive", factor);
143 return;
144 }
145
146 let mut scale = self.scale_factor.write().unwrap();
147 *scale = factor;
148
149 let mut baseline = self.baseline_real_time.write().unwrap();
151 *baseline = Some(Utc::now());
152
153 info!("Time scale set to {}x", factor);
154 }
155
156 pub fn get_scale(&self) -> f64 {
158 *self.scale_factor.read().unwrap()
159 }
160
161 pub fn reset(&self) {
163 self.disable();
164 info!("Time travel reset to real time");
165 }
166
167 pub fn set_time(&self, time: DateTime<Utc>) {
169 let enabled = *self.enabled.read().unwrap();
170 if !enabled {
171 self.enable_and_set(time);
172 return;
173 }
174
175 let mut current = self.current_time.write().unwrap();
176 *current = Some(time);
177
178 let mut baseline = self.baseline_real_time.write().unwrap();
180 *baseline = Some(Utc::now());
181
182 info!("Virtual time set to {}", time);
183 }
184
185 pub fn status(&self) -> TimeTravelStatus {
187 TimeTravelStatus {
188 enabled: self.is_enabled(),
189 current_time: if self.is_enabled() {
190 Some(self.now())
191 } else {
192 None
193 },
194 scale_factor: self.get_scale(),
195 real_time: Utc::now(),
196 }
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TimeTravelStatus {
203 pub enabled: bool,
205 pub current_time: Option<DateTime<Utc>>,
207 pub scale_factor: f64,
209 pub real_time: DateTime<Utc>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct TimeTravelConfig {
216 #[serde(default)]
218 pub enabled: bool,
219 pub initial_time: Option<DateTime<Utc>>,
221 #[serde(default = "default_scale")]
223 pub scale_factor: f64,
224 #[serde(default = "default_true")]
226 pub enable_scheduling: bool,
227}
228
229fn default_scale() -> f64 {
230 1.0
231}
232
233fn default_true() -> bool {
234 true
235}
236
237impl Default for TimeTravelConfig {
238 fn default() -> Self {
239 Self {
240 enabled: false,
241 initial_time: None,
242 scale_factor: 1.0,
243 enable_scheduling: true,
244 }
245 }
246}
247
248#[derive(Debug, Clone)]
250pub struct ResponseScheduler {
251 clock: Arc<VirtualClock>,
253 scheduled: Arc<RwLock<BTreeMap<DateTime<Utc>, Vec<ScheduledResponse>>>>,
255 named_schedules: Arc<RwLock<HashMap<String, String>>>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ScheduledResponse {
262 pub id: String,
264 pub trigger_time: DateTime<Utc>,
266 pub body: serde_json::Value,
268 #[serde(default = "default_status")]
270 pub status: u16,
271 #[serde(default)]
273 pub headers: HashMap<String, String>,
274 pub name: Option<String>,
276 #[serde(default)]
278 pub repeat: Option<RepeatConfig>,
279}
280
281fn default_status() -> u16 {
282 200
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct RepeatConfig {
288 pub interval: Duration,
290 pub max_count: Option<usize>,
292}
293
294impl ResponseScheduler {
295 pub fn new(clock: Arc<VirtualClock>) -> Self {
297 Self {
298 clock,
299 scheduled: Arc::new(RwLock::new(BTreeMap::new())),
300 named_schedules: Arc::new(RwLock::new(HashMap::new())),
301 }
302 }
303
304 pub fn schedule(&self, response: ScheduledResponse) -> Result<String, String> {
306 let id = if response.id.is_empty() {
307 uuid::Uuid::new_v4().to_string()
308 } else {
309 response.id.clone()
310 };
311
312 let mut scheduled = self.scheduled.write().unwrap();
313 scheduled.entry(response.trigger_time).or_default().push(response.clone());
314
315 if let Some(name) = &response.name {
316 let mut named = self.named_schedules.write().unwrap();
317 named.insert(name.clone(), id.clone());
318 }
319
320 info!("Scheduled response {} for {}", id, response.trigger_time);
321 Ok(id)
322 }
323
324 pub fn get_due_responses(&self) -> Vec<ScheduledResponse> {
326 let now = self.clock.now();
327 let mut scheduled = self.scheduled.write().unwrap();
328 let mut due = Vec::new();
329
330 let times_to_process: Vec<DateTime<Utc>> =
332 scheduled.range(..=now).map(|(time, _)| *time).collect();
333
334 for time in times_to_process {
335 if let Some(responses) = scheduled.remove(&time) {
336 for response in responses {
337 due.push(response.clone());
338
339 if let Some(repeat_config) = &response.repeat {
341 let next_time = time + repeat_config.interval;
342
343 let should_repeat = if let Some(max) = repeat_config.max_count {
345 max > 1
347 } else {
348 true
349 };
350
351 if should_repeat {
352 let mut next_response = response.clone();
353 next_response.trigger_time = next_time;
354 if let Some(ref mut repeat) = next_response.repeat {
355 if let Some(ref mut count) = repeat.max_count {
356 *count -= 1;
357 }
358 }
359
360 scheduled.entry(next_time).or_default().push(next_response);
361 }
362 }
363 }
364 }
365 }
366
367 debug!("Found {} due responses at {}", due.len(), now);
368 due
369 }
370
371 pub fn cancel(&self, id: &str) -> bool {
373 let mut scheduled = self.scheduled.write().unwrap();
374
375 for responses in scheduled.values_mut() {
376 if let Some(pos) = responses.iter().position(|r| r.id == id) {
377 responses.remove(pos);
378 info!("Cancelled scheduled response {}", id);
379 return true;
380 }
381 }
382
383 false
384 }
385
386 pub fn clear_all(&self) {
388 let mut scheduled = self.scheduled.write().unwrap();
389 scheduled.clear();
390
391 let mut named = self.named_schedules.write().unwrap();
392 named.clear();
393
394 info!("Cleared all scheduled responses");
395 }
396
397 pub fn list_scheduled(&self) -> Vec<ScheduledResponse> {
399 let scheduled = self.scheduled.read().unwrap();
400 scheduled.values().flat_map(|v| v.iter().cloned()).collect()
401 }
402
403 pub fn count(&self) -> usize {
405 let scheduled = self.scheduled.read().unwrap();
406 scheduled.values().map(|v| v.len()).sum()
407 }
408}
409
410pub struct TimeTravelManager {
412 clock: Arc<VirtualClock>,
414 scheduler: Arc<ResponseScheduler>,
416}
417
418impl TimeTravelManager {
419 pub fn new(config: TimeTravelConfig) -> Self {
421 let clock = Arc::new(VirtualClock::new());
422
423 if config.enabled {
424 if let Some(initial_time) = config.initial_time {
425 clock.enable_and_set(initial_time);
426 } else {
427 clock.enable_and_set(Utc::now());
428 }
429 clock.set_scale(config.scale_factor);
430 }
431
432 let scheduler = Arc::new(ResponseScheduler::new(clock.clone()));
433
434 Self { clock, scheduler }
435 }
436
437 pub fn clock(&self) -> Arc<VirtualClock> {
439 self.clock.clone()
440 }
441
442 pub fn scheduler(&self) -> Arc<ResponseScheduler> {
444 self.scheduler.clone()
445 }
446
447 pub fn now(&self) -> DateTime<Utc> {
449 self.clock.now()
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456
457 #[test]
458 fn test_virtual_clock_creation() {
459 let clock = VirtualClock::new();
460 assert!(!clock.is_enabled());
461 }
462
463 #[test]
464 fn test_virtual_clock_enable() {
465 let clock = VirtualClock::new();
466 let test_time = Utc::now();
467 clock.enable_and_set(test_time);
468
469 assert!(clock.is_enabled());
470 let now = clock.now();
471 assert!((now - test_time).num_seconds().abs() < 1);
472 }
473
474 #[test]
475 fn test_virtual_clock_advance() {
476 let clock = VirtualClock::new();
477 let test_time = Utc::now();
478 clock.enable_and_set(test_time);
479
480 clock.advance(Duration::hours(2));
481 let now = clock.now();
482
483 assert!((now - test_time - Duration::hours(2)).num_seconds().abs() < 1);
484 }
485
486 #[test]
487 fn test_virtual_clock_scale() {
488 let clock = VirtualClock::new();
489 clock.set_scale(2.0);
490 assert_eq!(clock.get_scale(), 2.0);
491 }
492
493 #[test]
494 fn test_response_scheduler() {
495 let clock = Arc::new(VirtualClock::new());
496 let test_time = Utc::now();
497 clock.enable_and_set(test_time);
498
499 let scheduler = ResponseScheduler::new(clock.clone());
500
501 let response = ScheduledResponse {
502 id: "test-1".to_string(),
503 trigger_time: test_time + Duration::seconds(10),
504 body: serde_json::json!({"message": "Hello"}),
505 status: 200,
506 headers: HashMap::new(),
507 name: Some("test".to_string()),
508 repeat: None,
509 };
510
511 let id = scheduler.schedule(response).unwrap();
512 assert_eq!(id, "test-1");
513 assert_eq!(scheduler.count(), 1);
514 }
515
516 #[test]
517 fn test_scheduled_response_triggering() {
518 let clock = Arc::new(VirtualClock::new());
519 let test_time = Utc::now();
520 clock.enable_and_set(test_time);
521
522 let scheduler = ResponseScheduler::new(clock.clone());
523
524 let response = ScheduledResponse {
525 id: "test-1".to_string(),
526 trigger_time: test_time + Duration::seconds(10),
527 body: serde_json::json!({"message": "Hello"}),
528 status: 200,
529 headers: HashMap::new(),
530 name: None,
531 repeat: None,
532 };
533
534 scheduler.schedule(response).unwrap();
535
536 let due = scheduler.get_due_responses();
538 assert_eq!(due.len(), 0);
539
540 clock.advance(Duration::seconds(15));
542
543 let due = scheduler.get_due_responses();
545 assert_eq!(due.len(), 1);
546 }
547
548 #[test]
549 fn test_time_travel_config() {
550 let config = TimeTravelConfig::default();
551 assert!(!config.enabled);
552 assert_eq!(config.scale_factor, 1.0);
553 assert!(config.enable_scheduling);
554 }
555
556 #[test]
557 fn test_time_travel_manager() {
558 let config = TimeTravelConfig {
559 enabled: true,
560 initial_time: Some(Utc::now()),
561 scale_factor: 1.0,
562 enable_scheduling: true,
563 };
564
565 let manager = TimeTravelManager::new(config);
566 assert!(manager.clock().is_enabled());
567 }
568}