use std::task::Waker;
use std::time::{Duration, Instant, SystemTime};
use thread_aware::ThreadAware;
use thread_aware::affinity::Affinity;
use crate::state::ClockState;
use crate::timers::TimerKey;
#[derive(Clone)]
pub struct Clock {
state: ClockState,
affinity: Option<Affinity>,
}
impl std::fmt::Debug for Clock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let kind = match &self.state {
#[cfg(any(feature = "test-util", test))]
ClockState::ClockControl(_) => "controlled",
ClockState::System(_) => "system",
};
f.debug_struct("Clock")
.field("kind", &kind)
.field("timers", &self.state.timers_len())
.field("alive", &self.state.alive())
.field("affinity", &self.affinity)
.finish_non_exhaustive()
}
}
impl ThreadAware for Clock {
fn relocate(&mut self, source: Option<Affinity>, destination: Affinity) {
self.state.relocate(source, destination);
self.affinity = Some(destination);
}
}
impl Clock {
#[cfg(any(feature = "tokio", test))]
#[must_use]
#[cfg_attr(test, mutants::skip)] pub fn new_tokio() -> Self {
Self::new_tokio_core().0
}
pub(crate) fn new(state: ClockState) -> Self {
Self { state, affinity: None }
}
#[cfg(any(feature = "tokio", test))]
fn new_tokio_core() -> (Self, tokio::task::JoinHandle<()>) {
const TIMER_RESOLUTION: Duration = Duration::from_millis(10);
let (clock, mut driver) = crate::runtime::InactiveClock::new_shared().activate();
let join_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(TIMER_RESOLUTION).await;
if driver.advance_timers(Instant::now()).is_err() {
break;
}
}
});
(clock, join_handle)
}
#[cfg(test)]
pub(super) fn new_system_frozen() -> Self {
Self::new(ClockState::new_system())
}
#[cfg(any(feature = "test-util", test))]
#[must_use]
pub fn new_frozen() -> Self {
crate::ClockControl::new().to_clock()
}
#[cfg(any(feature = "test-util", test))]
#[must_use]
pub fn new_frozen_at(time: impl Into<SystemTime>) -> Self {
crate::ClockControl::new_at(time).to_clock()
}
#[must_use]
pub fn system_time(&self) -> SystemTime {
match self.clock_state() {
#[cfg(any(feature = "test-util", test))]
ClockState::ClockControl(control) => control.system_time(),
ClockState::System(_) => SystemTime::now(),
}
}
#[expect(
clippy::match_wild_err_arm,
clippy::panic,
reason = "the panic might only occur when system time is outside of valid range which won't ever happen in real environments"
)]
#[must_use]
pub fn system_time_as<T: TryFrom<SystemTime>>(&self) -> T {
match T::try_from(self.system_time()) {
Ok(time) => time,
Err(_err) => panic!(
"The SystemTime returned by the clock is always in normalized range and must be convertible to the target type.
If the target type overflows, it indicates a problem with the target type not supporting valid system time range or
we are in tests where the time was moved excessively into the future. Practically, in production, this conversion will
always succeed.",
),
}
}
#[must_use]
pub fn instant(&self) -> Instant {
match self.clock_state() {
#[cfg(any(feature = "test-util", test))]
ClockState::ClockControl(control) => control.instant(),
ClockState::System(_) => Instant::now(),
}
}
#[must_use]
pub fn delay(&self, duration: Duration) -> crate::Delay {
crate::Delay::new(self, duration)
}
#[must_use]
pub fn stopwatch(&self) -> crate::Stopwatch {
crate::Stopwatch::new(self)
}
pub(super) fn register_timer(&self, when: Instant, waker: Waker) -> TimerKey {
match self.clock_state() {
#[cfg(any(feature = "test-util", test))]
ClockState::ClockControl(control) => control.register_timer(when, waker),
ClockState::System(timers) => timers.with_timers(|t| t.register(when, waker)),
}
}
pub(super) fn unregister_timer(&self, key: TimerKey) {
match self.clock_state() {
#[cfg(any(feature = "test-util", test))]
ClockState::ClockControl(control) => control.unregister_timer(key),
ClockState::System(timers) => timers.with_timers(|t| t.unregister(key)),
}
}
pub(crate) fn clock_state(&self) -> &ClockState {
&self.state
}
}
impl AsRef<Self> for Clock {
fn as_ref(&self) -> &Self {
self
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
#![allow(clippy::arithmetic_side_effects, reason = "no need to be strict in tests")]
use std::fmt::Debug;
use std::task::Context;
use std::thread::sleep;
use futures::FutureExt;
use thread_aware::affinity::pinned_affinities;
use super::*;
use crate::ClockControl;
use crate::runtime::InactiveClock;
static_assertions::assert_impl_all!(Clock: Debug, Send, Sync, Clone, AsRef<Clock>);
#[test]
fn assert_types() {
static_assertions::assert_impl_all!(Clock: Send, Sync, AsRef<Clock>);
}
#[cfg_attr(miri, ignore)] #[test]
fn test_now() {
let now = std::time::SystemTime::now();
let clock = Clock::new_system_frozen();
let absolute = clock.system_time();
assert!(absolute >= now);
}
#[test]
fn test_now_with_control() {
let control = ClockControl::new();
let clock = control.to_clock();
let now = clock.system_time();
assert_eq!(now, control.system_time());
() = control.advance(Duration::from_secs(10));
assert_eq!(clock.system_time(), now.checked_add(Duration::from_secs(10)).unwrap());
}
#[test]
fn test_instant_now() {
let clock = Clock::new_system_frozen();
let clock_instant = clock.instant();
let system_instant = Instant::now();
assert!(
(system_instant.duration_since(clock_instant)) < Duration::from_secs(10),
"the `Instant` retrieved from the clock is not the same as the system one"
);
}
#[cfg_attr(miri, ignore)] #[test]
fn test_system_time() {
let now = std::time::SystemTime::now();
let clock = Clock::new_system_frozen();
let system_time = clock.system_time();
assert!(system_time >= now);
}
#[test]
fn test_system_time_with_control() {
let control = ClockControl::new();
let clock = control.to_clock();
let system_time = clock.system_time();
assert_eq!(system_time, control.system_time());
() = control.advance(Duration::from_secs(10));
assert_eq!(clock.system_time(), control.system_time());
}
#[cfg_attr(miri, ignore)] #[tokio::test]
async fn tokio_ensure_timers_advancing() {
let clock = Clock::new_tokio();
clock.delay(Duration::from_millis(15)).await;
}
#[cfg_attr(miri, ignore)] #[tokio::test]
async fn tokio_ensure_timers_advancing_after_relocate() {
let affinities = pinned_affinities(&[2]);
let clock = Clock::new_tokio();
let mut clock = clock;
clock.relocate(Some(affinities[0]), affinities[1]);
clock.delay(Duration::from_millis(15)).await;
}
#[cfg_attr(miri, ignore)] #[tokio::test]
async fn tokio_ensure_future_finished_when_clock_dropped() {
let (clock, handle) = Clock::new_tokio_core();
clock.delay(Duration::from_millis(15)).await;
drop(clock);
handle.await.unwrap();
}
#[test]
fn new_frozen_ok() {
let clock = Clock::new_frozen();
let now = clock.system_time();
let instant = clock.instant();
sleep(Duration::from_micros(1));
assert_eq!(now, clock.system_time());
assert_eq!(instant, clock.instant());
}
#[test]
fn new_frozen_at_ok() {
let specific_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_000_000);
let clock = Clock::new_frozen_at(specific_time);
let timestamp = clock.system_time();
let system_time = clock.system_time();
sleep(Duration::from_micros(1));
assert_eq!(system_time, specific_time);
assert_eq!(timestamp, clock.system_time());
assert_eq!(system_time, clock.system_time());
}
#[test]
#[should_panic(expected = "The SystemTime returned by the clock is always in normalized range")]
fn system_time_as_panics_on_conversion_failure() {
struct AlwaysFailsConversion;
impl TryFrom<SystemTime> for AlwaysFailsConversion {
type Error = &'static str;
fn try_from(_: SystemTime) -> Result<Self, Self::Error> {
Err("conversion always fails")
}
}
let clock = Clock::new_frozen();
let _: AlwaysFailsConversion = clock.system_time_as();
}
#[test]
fn as_ref_ok() {
let clock = Clock::new_frozen();
let _: &Clock = clock.as_ref();
}
#[test]
fn owners_count() {
let (clock, driver) = InactiveClock::default().activate();
assert!(!clock.state.is_unique());
drop(clock);
assert!(driver.state.is_unique());
}
#[test]
fn owners_count_clock_control() {
let (clock, driver) = InactiveClock::from(ClockControl::default()).activate();
assert!(!driver.state.is_unique());
drop(clock);
assert!(driver.state.is_unique());
}
#[test]
fn thread_aware() {
let affinites = pinned_affinities(&[1, 1]);
let source = Some(affinites[0]);
let pinned_1 = affinites[0];
let pinned_2 = affinites[1];
let root = InactiveClock::default();
let mut inactive_1 = root.clone();
inactive_1.relocate(source, pinned_1);
let mut inactive_2 = root;
inactive_2.relocate(source, pinned_2);
let (clock_1, mut driver_1) = inactive_1.activate();
let (clock_2, mut driver_2) = inactive_2.activate();
let mut fut_1 = Box::pin(clock_1.delay(Duration::from_secs(100)));
_ = fut_1.poll_unpin(&mut Context::from_waker(Waker::noop()));
assert_eq!(clock_1.state.timers_len(), 1);
assert_eq!(clock_2.state.timers_len(), 0);
assert_eq!(driver_1.state.timers_len(), 1);
assert_eq!(driver_2.state.timers_len(), 0);
{
let mut relocated_clock = clock_1.clone();
relocated_clock.relocate(source, pinned_2);
assert_eq!(relocated_clock.state.timers_len(), 0);
}
let mut fut_2 = Box::pin(clock_2.delay(Duration::from_secs(100)));
_ = fut_2.poll_unpin(&mut Context::from_waker(Waker::noop()));
assert_eq!(clock_1.state.timers_len(), 1);
assert_eq!(clock_2.state.timers_len(), 1);
assert_eq!(driver_1.state.timers_len(), 1);
assert_eq!(driver_2.state.timers_len(), 1);
driver_1.advance_timers(Instant::now() + Duration::from_secs(200)).unwrap();
assert_eq!(driver_1.state.timers_len(), 0);
assert_eq!(driver_2.state.timers_len(), 1);
driver_2.advance_timers(Instant::now() + Duration::from_secs(200)).unwrap();
assert_eq!(driver_2.state.timers_len(), 0);
drop(fut_1);
drop(fut_2);
drop(clock_1);
driver_1.advance_timers(Instant::now()).unwrap_err();
driver_2.advance_timers(Instant::now()).unwrap();
drop(clock_2);
driver_2.advance_timers(Instant::now()).unwrap_err();
}
#[test]
fn thread_aware_clock_control() {
let affinites = pinned_affinities(&[1, 1]);
let source = Some(affinites[0]);
let pinned_1 = affinites[0];
let pinned_2 = affinites[1];
let root: InactiveClock = ClockControl::default().into();
let mut inactive_1 = root.clone();
inactive_1.relocate(source, pinned_1);
let mut inactive_2 = root;
inactive_2.relocate(source, pinned_2);
let (clock_1, driver_1) = inactive_1.activate();
let (clock_2, driver_2) = inactive_2.activate();
let mut fut_1 = Box::pin(clock_1.delay(Duration::from_secs(100)));
_ = fut_1.poll_unpin(&mut Context::from_waker(Waker::noop()));
assert_eq!(clock_1.state.timers_len(), 1);
assert_eq!(clock_2.state.timers_len(), 1);
assert_eq!(driver_1.state.timers_len(), 1);
assert_eq!(driver_2.state.timers_len(), 1);
}
#[test]
#[cfg_attr(miri, ignore)]
fn debug_system_clock() {
let clock = Clock::new_system_frozen();
insta::assert_debug_snapshot!(clock);
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn debug_alive_system_clock() {
let _affinites = pinned_affinities(&[2]);
let clock = Clock::new_tokio();
clock.delay(Duration::from_millis(1)).await;
insta::assert_debug_snapshot!(clock);
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn debug_alive_system_clock_relocated() {
let affinites = pinned_affinities(&[2]);
let mut clock = Clock::new_system_frozen();
clock.relocate(Some(affinites[0]), affinites[1]);
insta::assert_debug_snapshot!(clock);
}
#[test]
#[cfg_attr(miri, ignore)]
fn debug_controlled_clock() {
let control = ClockControl::new();
let clock = control.to_clock();
insta::assert_debug_snapshot!(clock);
}
#[test]
#[cfg_attr(miri, ignore)]
fn debug_clock_with_timers() {
let control = ClockControl::new();
let clock = control.to_clock();
control.register_timer(Instant::now() + Duration::from_secs(100), Waker::noop().clone());
control.register_timer(Instant::now() + Duration::from_secs(200), Waker::noop().clone());
insta::assert_debug_snapshot!(clock);
}
}