harn-vm 0.7.48

Async bytecode virtual machine for the Harn programming language
Documentation
use std::cell::RefCell;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration as StdDuration, Instant};

use async_trait::async_trait;
use time::OffsetDateTime;
use tokio::sync::Notify;

use crate::connectors::cron::scheduler::Clock;

thread_local! {
    static MOCK_CLOCK_STACK: RefCell<Vec<Arc<MockClock>>> = const { RefCell::new(Vec::new()) };
}

fn process_start() -> &'static Instant {
    static PROCESS_START: OnceLock<Instant> = OnceLock::new();
    PROCESS_START.get_or_init(Instant::now)
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct ClockInstant(StdDuration);

impl ClockInstant {
    pub fn duration_since(self, earlier: Self) -> StdDuration {
        self.0.saturating_sub(earlier.0)
    }

    pub fn as_millis(self) -> u128 {
        self.0.as_millis()
    }
}

pub struct ClockOverrideGuard;

impl Drop for ClockOverrideGuard {
    fn drop(&mut self) {
        MOCK_CLOCK_STACK.with(|slot| {
            slot.borrow_mut().pop();
        });
    }
}

#[derive(Debug)]
pub struct MockClock {
    now: Mutex<OffsetDateTime>,
    monotonic: Mutex<StdDuration>,
    notify: Notify,
}

impl MockClock {
    pub fn new(now: OffsetDateTime) -> Arc<Self> {
        Arc::new(Self {
            now: Mutex::new(now),
            monotonic: Mutex::new(StdDuration::ZERO),
            notify: Notify::new(),
        })
    }

    pub fn monotonic_now(&self) -> ClockInstant {
        ClockInstant(
            *self
                .monotonic
                .lock()
                .expect("mock clock monotonic mutex poisoned"),
        )
    }

    pub async fn set(&self, now: OffsetDateTime) {
        let mut wall = self.now.lock().expect("mock clock mutex poisoned");
        let previous = *wall;
        *wall = now;
        drop(wall);

        if now > previous {
            let delta = now - previous;
            if let Ok(delta) = TryInto::<StdDuration>::try_into(delta) {
                let mut monotonic = self
                    .monotonic
                    .lock()
                    .expect("mock clock monotonic mutex poisoned");
                *monotonic += delta;
            }
        }

        self.notify.notify_waiters();
    }

    pub async fn advance(&self, duration: time::Duration) {
        let Ok(delta) = TryInto::<StdDuration>::try_into(duration) else {
            return;
        };
        self.advance_std(delta).await;
    }

    pub async fn advance_std(&self, duration: StdDuration) {
        if duration.is_zero() {
            self.notify.notify_waiters();
            return;
        }
        let delta =
            time::Duration::try_from(duration).expect("std duration should fit in time::Duration");
        let next = *self.now.lock().expect("mock clock mutex poisoned") + delta;
        self.set(next).await;
    }

    pub async fn advance_ticks(&self, ticks: u32, tick: StdDuration) {
        for _ in 0..ticks {
            self.advance_std(tick).await;
        }
    }
}

#[async_trait]
impl Clock for MockClock {
    fn now(&self) -> OffsetDateTime {
        *self.now.lock().expect("mock clock mutex poisoned")
    }

    async fn sleep_until(&self, deadline: OffsetDateTime) {
        loop {
            if *self.now.lock().expect("mock clock mutex poisoned") >= deadline {
                return;
            }
            self.notify.notified().await;
        }
    }
}

pub fn install_override(clock: Arc<MockClock>) -> ClockOverrideGuard {
    MOCK_CLOCK_STACK.with(|slot| {
        slot.borrow_mut().push(clock);
    });
    ClockOverrideGuard
}

pub fn active_mock_clock() -> Option<Arc<MockClock>> {
    MOCK_CLOCK_STACK.with(|slot| slot.borrow().last().cloned())
}

pub fn now_utc() -> OffsetDateTime {
    active_mock_clock()
        .map(|clock| clock.now())
        .unwrap_or_else(OffsetDateTime::now_utc)
}

pub fn now_ms() -> i64 {
    now_utc().unix_timestamp_nanos() as i64 / 1_000_000
}

pub fn instant_now() -> ClockInstant {
    active_mock_clock()
        .map(|clock| clock.monotonic_now())
        .unwrap_or_else(|| ClockInstant(process_start().elapsed()))
}