use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use std::time::Duration;
use wasmtime::Engine;
pub struct EpochTicker {
handle: Option<JoinHandle<()>>,
shutdown: Arc<AtomicBool>,
}
impl EpochTicker {
pub fn start(engine: Engine, interval: Duration) -> Self {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_flag = Arc::clone(&shutdown);
let handle = std::thread::spawn(move || {
while !shutdown_flag.load(Ordering::Acquire) {
std::thread::sleep(interval);
if !shutdown_flag.load(Ordering::Acquire) {
engine.increment_epoch();
}
}
});
Self {
handle: Some(handle),
shutdown,
}
}
pub fn is_running(&self) -> bool {
!self.shutdown.load(Ordering::Acquire)
}
}
impl Drop for EpochTicker {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use wasmtime::Config;
fn create_test_engine() -> Engine {
let mut config = Config::new();
config.epoch_interruption(true);
Engine::new(&config).expect("failed to create test engine")
}
#[tokio::test]
async fn test_epoch_ticker_starts_and_stops() {
let engine = create_test_engine();
let ticker = EpochTicker::start(engine, Duration::from_millis(10));
assert!(ticker.is_running());
drop(ticker);
}
#[tokio::test]
async fn test_epoch_ticker_increments_epoch() {
let engine = create_test_engine();
let ticker = EpochTicker::start(engine.clone(), Duration::from_millis(5));
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(
ticker.is_running(),
"ticker should still be running after 20ms"
);
drop(ticker);
}
#[tokio::test]
async fn test_epoch_ticker_stops_cleanly() {
let engine = create_test_engine();
let ticker = EpochTicker::start(engine, Duration::from_millis(10));
assert!(ticker.is_running());
drop(ticker);
let engine2 = create_test_engine();
let ticker2 = EpochTicker::start(engine2, Duration::from_millis(10));
assert!(ticker2.is_running());
drop(ticker2);
}
#[tokio::test]
async fn test_multiple_tickers_on_different_engines() {
let engine1 = create_test_engine();
let engine2 = create_test_engine();
let ticker1 = EpochTicker::start(engine1, Duration::from_millis(10));
let ticker2 = EpochTicker::start(engine2, Duration::from_millis(10));
assert!(ticker1.is_running());
assert!(ticker2.is_running());
drop(ticker1);
assert!(ticker2.is_running());
drop(ticker2);
}
#[tokio::test(flavor = "current_thread")]
async fn test_epoch_ticker_does_not_block_tokio_runtime() {
use std::sync::atomic::AtomicUsize;
let counter = Arc::new(AtomicUsize::new(0));
let counter_task = Arc::clone(&counter);
let progress_task = tokio::spawn(async move {
let start = tokio::time::Instant::now();
while start.elapsed() < Duration::from_millis(100) {
counter_task.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(5)).await;
}
});
let engine = create_test_engine();
let ticker = EpochTicker::start(engine, Duration::from_millis(1));
tokio::time::sleep(Duration::from_millis(100)).await;
progress_task.await.expect("progress task should finish");
drop(ticker);
assert!(counter.load(Ordering::Relaxed) >= 10);
}
}