use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use rand::rngs::SmallRng;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task::LocalSet;
use tokio::time::{sleep, Duration, Instant};
use super::Result;
type Software<'a> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result>>> + 'a>;
pub enum Kind<'a> {
Client,
Host { software: Software<'a> },
NoSoftware,
}
#[derive(Clone, Default)]
pub struct Config {
pub enable_io: bool,
#[cfg_attr(not(tokio_unstable), expect(dead_code))]
pub rng: Option<SmallRng>,
}
pub struct Rt<'a> {
pub kind: Kind<'a>,
tokio: Runtime,
local: LocalSet,
pub nodename: Arc<str>,
handle: Option<JoinHandle<Result>>,
config: Config,
}
impl<'a> Rt<'a> {
pub(crate) fn client<F>(nodename: Arc<str>, client: F, mut config: Config) -> Self
where
F: Future<Output = Result> + 'static,
{
let (tokio, local) = init(&mut config);
let handle = with(&tokio, &local, || tokio::task::spawn_local(client));
Self {
kind: Kind::Client,
tokio,
local,
nodename,
handle: Some(handle),
config,
}
}
pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, mut config: Config) -> Self
where
F: Fn() -> Fut + 'a,
Fut: Future<Output = Result> + 'static,
{
let (tokio, local) = init(&mut config);
let software: Software = Box::new(move || Box::pin(software()));
let handle = with(&tokio, &local, || tokio::task::spawn_local(software()));
Self {
kind: Kind::Host { software },
tokio,
local,
nodename,
handle: Some(handle),
config,
}
}
pub(crate) fn no_software() -> Self {
let mut config = Config::default();
let (tokio, local) = init(&mut config);
Self {
kind: Kind::NoSoftware,
tokio,
local,
nodename: String::new().into(),
handle: None,
config,
}
}
pub(crate) fn is_client(&self) -> bool {
matches!(self.kind, Kind::Client)
}
fn is_host(&self) -> bool {
matches!(self.kind, Kind::Host { .. })
}
pub(crate) fn is_software_running(&self) -> bool {
self.handle.is_some()
}
pub(crate) fn now(&self) -> Instant {
let _guard = self.tokio.enter();
Instant::now()
}
pub(crate) fn tick(&mut self, duration: Duration) -> Result<bool> {
self.tokio.block_on(async {
self.local
.run_until(async {
sleep(duration).await;
})
.await
});
match &self.handle {
Some(handle) if handle.is_finished() => {
if let Some(h) = self.handle.take() {
match self.tokio.block_on(h) {
Err(je) if je.is_cancelled() => {}
res => res??,
}
};
Ok(true)
}
Some(_) => Ok(false),
None => Ok(true),
}
}
pub(crate) fn crash(&mut self) {
if !self.is_host() {
panic!("can only crash host's software");
}
if self.handle.take().is_some() {
self.cancel_tasks();
};
}
pub(crate) fn bounce(&mut self) {
if !self.is_host() {
panic!("can only bounce host's software");
}
self.cancel_tasks();
if let Kind::Host { software } = &self.kind {
let handle = with(&self.tokio, &self.local, || {
tokio::task::spawn_local(software())
});
self.handle.replace(handle);
};
}
fn cancel_tasks(&mut self) {
let (tokio, local) = init(&mut self.config);
_ = mem::replace(&mut self.tokio, tokio);
drop(mem::replace(&mut self.local, local));
}
}
fn init(config: &mut Config) -> (Runtime, LocalSet) {
let mut tokio_builder = tokio::runtime::Builder::new_current_thread();
#[cfg(tokio_unstable)]
tokio_builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
if config.enable_io {
tokio_builder.enable_io();
}
#[cfg(tokio_unstable)]
if let Some(rng) = &mut config.rng {
let bytes: [u8; 32] = rand::Rng::random(rng);
let seed = tokio::runtime::RngSeed::from_bytes(&bytes);
tokio_builder.rng_seed(seed);
}
let tokio = tokio_builder
.enable_time()
.start_paused(true)
.build()
.unwrap();
tokio.block_on(async {
tokio::time::sleep(Duration::from_millis(1)).await;
});
(tokio, new_local())
}
fn new_local() -> LocalSet {
#[cfg(not(tokio_unstable))]
let local = LocalSet::new();
#[cfg(tokio_unstable)]
let mut local = LocalSet::new();
#[cfg(tokio_unstable)]
local.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
local
}
fn with<R>(tokio: &Runtime, local: &LocalSet, f: impl FnOnce() -> R) -> R {
tokio.block_on(async { local.run_until(async { f() }).await })
}