use std::borrow::Cow;
use std::io;
use futures_channel::mpsc::unbounded;
use futures_channel::oneshot::{channel, Receiver};
use futures_util::future::{lazy, FutureExt};
use futures_core::Future;
use std::fmt;
use std::sync::Arc;
use crate::fiber::handle::Handle;
use crate::fiber::{block_pool, Spawner};
use crate::krse::thread::ParkThread;
use crate::fiber::arbiter::{Arbiter, SystemArbiter};
use crate::fiber::runtime::{Runtime, Callback, Kind, RuntimeInner};
use crate::fiber::system::System;
use crate::fiber::local::LocalSet;
use crate::fiber::BasicScheduler;
use crate::fiber::{io as io_in, timer};
pub struct Builder {
name: Cow<'static, str>,
stop_on_panic: bool,
}
impl Builder {
pub(crate) fn new() -> Self {
Builder {
name: Cow::Borrowed("fiber"),
stop_on_panic: false,
}
}
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
self.name = Cow::Owned(name.into());
self
}
pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
self.stop_on_panic = stop_on_panic;
self
}
pub fn build(self) -> SystemRunner {
self.create_runtime(|| {})
}
pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
self.create_async_runtime(local)
}
pub fn run<F>(self, f: F) -> io::Result<()>
where
F: FnOnce() + 'static,
{
self.create_runtime(f).run()
}
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let _ = local.spawn_local(arb);
AsyncSystemRunner { stop, system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner
where
F: FnOnce() + 'static,
{
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let mut rt = Runtime::new().unwrap();
rt.spawn(arb);
rt.block_on(lazy(move |_| f()));
SystemRunner { rt, stop, system }
}
}
#[derive(Debug)]
pub(crate) struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
}
impl AsyncSystemRunner {
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
lazy(|_| {
Arbiter::run_system(None);
async {
let res = match stop.await {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
return res;
}
})
.flatten()
}
}
#[must_use = "SystemRunner must be run"]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
stop: Receiver<i32>,
system: System,
}
impl SystemRunner {
pub fn run(self) -> io::Result<()> {
let SystemRunner { mut rt, stop, .. } = self;
Arbiter::run_system(Some(&rt));
let result = match rt.block_on(stop) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
result
}
pub fn block_on<F, O>(&mut self, fut: F) -> O
where
F: Future<Output = O> + 'static,
{
Arbiter::run_system(Some(&self.rt));
let res = self.rt.block_on(fut);
Arbiter::stop_system();
res
}
}
pub struct BuilderInner {
enable_io: bool,
enable_timer: bool,
core_threads: usize,
max_threads: usize,
pub thread_name: String,
pub thread_stack_size: Option<usize>,
pub after_start: Option<Callback>,
pub before_stop: Option<Callback>,
}
impl BuilderInner {
pub fn new() -> BuilderInner {
BuilderInner {
enable_io: false,
enable_timer: false,
core_threads: usize::max(1, num_cpus::get_physical()),
max_threads: 512,
thread_name: "kayrx-zone-worker".into(),
thread_stack_size: None,
after_start: None,
before_stop: None,
}
}
pub fn enable_all(&mut self) -> &mut Self {
self.enable_io();
self.enable_timer();
self
}
pub fn enable_io(&mut self) -> &mut Self {
self.enable_io = true;
self
}
pub fn enable_timer(&mut self) -> &mut Self {
self.enable_timer = true;
self
}
pub fn core_threads(&mut self, val: usize) -> &mut Self {
assert_ne!(val, 0, "Core threads cannot be zero");
self.core_threads = val;
self
}
pub fn max_threads(&mut self, val: usize) -> &mut Self {
assert_ne!(val, 0, "Thread limit cannot be zero");
self.max_threads = val;
self
}
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
self
}
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
self.thread_stack_size = Some(val);
self
}
pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
}
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
}
pub fn build(&mut self) -> io::Result<RuntimeInner> {
self.build_basic_runtime()
}
fn build_basic_runtime(&mut self) -> io::Result<RuntimeInner> {
let clock = timer::create_clock();
let (io_driver, io_handle) = io_in::create_driver(self.enable_io)?;
let (driver, timer_handle) = timer::create_driver(self.enable_timer, io_driver, clock.clone());
let scheduler = BasicScheduler::new(driver);
let spawner = Spawner::Basic(scheduler.spawner());
let blocking_pool = block_pool::create_blocking_pool(self, &spawner, &io_handle, &timer_handle, &clock, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(RuntimeInner {
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle,
timer_handle,
clock,
blocking_spawner,
},
blocking_pool,
})
}
}
impl Default for BuilderInner {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for BuilderInner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("core_threads", &self.core_threads)
.field("max_threads", &self.max_threads)
.field("thread_name", &self.thread_name)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
.finish()
}
}