use crate::actors::ActorStream;
use crate::executor::thread_pool::MaximThreadPool;
use crate::prelude::*;
use dashmap::DashMap;
use futures::task::ArcWake;
use futures::Stream;
use log::{debug, info, trace, warn};
use std::collections::{BTreeMap, VecDeque};
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
mod thread_pool;
#[derive(Clone)]
pub(crate) struct MaximExecutor {
shutdown_triggered: Arc<(Mutex<bool>, Condvar)>,
thread_pool: Arc<MaximThreadPool>,
sleeping: Arc<DashMap<Aid, Task>>,
reactors: Arc<DashMap<u16, MaximReactor>>,
actors_per_reactor: Arc<DashMap<u16, u32>>,
}
impl MaximExecutor {
pub(crate) fn new(shutdown_triggered: Arc<(Mutex<bool>, Condvar)>) -> Self {
Self {
shutdown_triggered,
thread_pool: Default::default(),
sleeping: Default::default(),
reactors: Default::default(),
actors_per_reactor: Default::default(),
}
}
pub(crate) fn init(&self, system: &ActorSystem) {
for i in 0..system.data.config.thread_pool_size {
let reactor = MaximReactor::new(self.clone(), system, i);
self.reactors.insert(i, reactor.clone());
self.actors_per_reactor.insert(i, 0);
let sys = system.clone();
info!("Spawning Reactors");
self.thread_pool
.spawn(format!("Reactor-{}", reactor.name), move || {
sys.init_current();
futures::executor::enter().expect("Executor nested in other executor");
loop {
if !reactor.thread() {
break;
}
}
});
}
}
pub(crate) fn register_actor(&self, actor: ActorStream) {
let id = actor.context.aid.clone();
let actor = Mutex::new(Box::pin(actor));
self.sleeping.insert(id.clone(), Task { id, actor });
}
pub(crate) fn wake(&self, id: Aid) {
trace!("Waking Actor `{}`", id.name_or_uuid());
let task = match self.sleeping.remove(&id) {
Some((_, task)) => task,
None => {
debug!(
"Actor `{}` not in Executor - already woken or stopped",
id.name_or_uuid()
);
return;
}
};
let destination = self.get_reactor_with_least_actors();
*self.actors_per_reactor.get_mut(&destination).unwrap() += 1;
self.reactors.get(&destination).unwrap().insert(task);
}
fn get_reactor_with_least_actors(&self) -> u16 {
let mut iter_state = (0u16, u32::max_value());
for i in self.actors_per_reactor.iter() {
if i.value() < &iter_state.1 {
iter_state = (*i.key(), *i.value());
}
}
iter_state.0
}
fn return_task(&self, task: Task, reactor: &MaximReactor) {
trace!(
"Actor {} returned from Reactor {}",
task.id.name_or_uuid(),
reactor.name
);
self.sleeping.insert(task.id.clone(), task);
*self.actors_per_reactor.get_mut(&reactor.id).unwrap() -= 1;
}
pub(crate) fn await_shutdown(&self, timeout: impl Into<Option<Duration>>) -> ShutdownResult {
let start = Instant::now();
info!("Notifying Reactor threads, so they can end gracefully");
for r in self.reactors.iter() {
match r.thread_condvar.read() {
Ok(g) => g.1.notify_one(),
Err(_) => return ShutdownResult::Panicked,
}
}
let timeout = timeout.into().map(|t| t - (Instant::now() - start));
info!("Awaiting the threadpool's shutdown");
self.thread_pool.await_shutdown(timeout)
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum ShutdownResult {
Ok,
TimedOut,
Panicked,
}
#[derive(Clone)]
pub(crate) struct MaximReactor {
id: u16,
name: String,
executor: MaximExecutor,
run_queue: Arc<RwLock<VecDeque<Wakeup>>>,
wait_queue: Arc<RwLock<BTreeMap<Aid, Task>>>,
thread_condvar: Arc<RwLock<(Mutex<()>, Condvar)>>,
thread_wait_time: Duration,
time_slice: Duration,
warn_threshold: Duration,
}
enum LoopResult<T> {
Ok(T),
Continue,
}
impl MaximReactor {
fn new(executor: MaximExecutor, system: &ActorSystem, id: u16) -> MaximReactor {
let name = format!("{:08x?}-{}", system.data.uuid.as_fields().0, id);
debug!("Creating Reactor {}", name);
MaximReactor {
id,
name,
executor,
run_queue: Arc::new(RwLock::new(Default::default())),
wait_queue: Arc::new(RwLock::new(BTreeMap::new())),
thread_condvar: Arc::new(RwLock::new((Mutex::new(()), Condvar::new()))),
thread_wait_time: system.config().thread_wait_time,
time_slice: system.config().time_slice,
warn_threshold: system.config().warn_threshold,
}
}
fn insert(&self, task: Task) {
let token = Token {
id: task.id.clone(),
reactor: self.clone(),
};
let waker = futures::task::waker(Arc::new(token));
let wakeup = Wakeup {
id: task.id.clone(),
waker,
};
self.wait(task);
self.wake(wakeup);
}
pub(crate) fn thread(&self) -> bool {
{
if *self
.executor
.shutdown_triggered
.0
.lock()
.expect("Poisoned shutdown_triggered condvar")
{
debug!("Reactor-{} acknowledging shutdown", self.name);
return false;
}
}
let (w, mut task) = match self.get_work() {
LoopResult::Ok(v) => v,
LoopResult::Continue => return true,
};
let aid = w.id.clone();
let end = Instant::now() + self.time_slice;
loop {
let start = Instant::now();
match task.poll(&w.waker) {
Poll::Ready(result) => {
if result.is_none() {
self.executor.return_task(task, self);
break;
}
let is_stopping = {
task.actor
.lock()
.expect("Poisoned Actor")
.handle_result(result.unwrap())
};
if is_stopping {
break;
}
if Instant::now() >= end {
self.wait(task);
self.wake(w);
break;
}
}
Poll::Pending => {
trace!("Reactor-{} waiting on pending Actor", self.name);
self.wait(task);
break;
}
}
if Instant::now().duration_since(start) >= self.warn_threshold {
warn!(
"Actor {} took longer than configured warning threshold",
aid.name_or_uuid()
);
}
}
true
}
#[inline]
fn get_work(&self) -> LoopResult<(Wakeup, Task)> {
if let Some(w) = self.get_woken() {
if let Some(task) = self.remove_waiting(&w.id) {
trace!(
"Reactor-{} received Wakeup for Actor `{}`",
self.name,
task.id.name_or_uuid()
);
LoopResult::Ok((w, task))
} else {
trace!("Reactor-{} dropping spurious WakeUp", self.name);
LoopResult::Continue
}
} else {
let (mutex, condvar) = &*self
.thread_condvar
.read()
.expect("Poisoned Reactor condvar");
trace!("Reactor-{} waiting on condvar", self.name);
let g = mutex.lock().expect("Poisoned Reactor condvar");
let _ = condvar
.wait_timeout(g, self.thread_wait_time)
.expect("Poisoned Reactor condvar");
trace!("Reactor-{} resuming", self.name);
LoopResult::Continue
}
}
fn wake(&self, wakeup: Wakeup) {
self.run_queue
.write()
.expect("Poisoned run_queue")
.push_back(wakeup);
self.thread_condvar
.read()
.expect("Poisoned Reactor condvar")
.1
.notify_one();
}
fn get_woken(&self) -> Option<Wakeup> {
self.run_queue
.write()
.expect("Poisoned run_queue")
.pop_front()
}
fn wait(&self, task: Task) {
self.wait_queue
.write()
.expect("Poisoned wait_queue")
.insert(task.id.clone(), task);
}
fn remove_waiting(&self, id: &Aid) -> Option<Task> {
self.wait_queue
.write()
.expect("Poisoned wait_queue")
.remove(id)
}
}
struct Task {
id: Aid,
actor: Mutex<Pin<Box<ActorStream>>>,
}
impl Task {
fn poll(&mut self, waker: &Waker) -> Poll<Option<Result<Status, StdError>>> {
let mut ctx = Context::from_waker(waker);
self.actor
.lock()
.expect("Poisoned ActorStream")
.as_mut()
.poll_next(&mut ctx)
}
}
struct Token {
id: Aid,
reactor: MaximReactor,
}
impl ArcWake for Token {
fn wake_by_ref(arc_self: &Arc<Self>) {
let id = arc_self.id.clone();
let wakeup = Wakeup {
id,
waker: futures::task::waker(arc_self.clone()),
};
(arc_self.reactor).wake(wakeup);
}
}
struct Wakeup {
id: Aid,
waker: Waker,
}
#[cfg(test)]
mod tests {
use crate::executor::ShutdownResult;
use crate::prelude::*;
use crate::tests::*;
use log::*;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::thread;
use std::time::Duration;
struct PendingNTimes {
pending_count: u8,
sleep_for: u64,
}
impl PendingNTimes {
fn new(n: u8, sleep_for: u64) -> Self {
Self {
pending_count: n,
sleep_for,
}
}
}
impl Future for PendingNTimes {
type Output = ActorResult<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match &mut self.pending_count {
0 => Poll::Ready(Ok(Status::done(()))),
count => {
*count -= 1;
debug!("Pending, {} times left", count);
let waker = cx.waker().clone();
let sleep_for = self.sleep_for;
thread::spawn(move || {
sleep(sleep_for);
waker.wake();
});
Poll::Pending
}
}
}
}
#[test]
fn test_nested_futures_wakeup() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let _aid = system
.spawn()
.with((), |_: (), c: Context, _: Message| async move {
let r = PendingNTimes::new(1, 50).await;
c.system.trigger_shutdown();
r
})
.unwrap();
assert_ne!(
system.await_shutdown(Duration::from_millis(100)),
ShutdownResult::TimedOut,
"Failed to trigger shutdown, actor was never woken"
);
}
#[test]
fn test_thread_wakes_after_no_work() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(1));
let aid = system.spawn().with((), simple_handler).unwrap();
sleep(125);
let _ = aid.send_new(11);
await_received(&aid, 2, 1000).unwrap();
system.trigger_and_await_shutdown(None);
}
#[test]
fn test_actor_awake_phases() {
init_test_log();
let system = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(1));
let aid = system
.spawn()
.with((), |_: (), _: Context, msg: Message| async move {
if let Some(_) = msg.content_as::<SystemMsg>() {
return Ok(Status::done(()));
}
PendingNTimes::new(1, 25).await
})
.unwrap();
await_received(&aid, 1, 5).expect("Actor took too long to process Start");
let _ = aid.send_new(()).unwrap();
sleep(5);
{
let pending = system
.executor()
.reactors
.iter()
.nth(0)
.unwrap()
.wait_queue
.read()
.unwrap()
.len();
assert_eq!(pending, 1, "Actor should be pending");
}
await_received(&aid, 2, 30).expect("Actor failed to process message");
sleep(20);
{
let pending = system
.executor()
.reactors
.iter()
.nth(0)
.unwrap()
.wait_queue
.read()
.unwrap()
.len();
assert_eq!(
pending, 0,
"Actor should be returned to the Executor by now"
);
}
{
let running = system
.executor()
.reactors
.iter()
.nth(0)
.unwrap()
.run_queue
.read()
.unwrap()
.len();
assert_eq!(running, 0, "Actor should not be running again");
}
assert_eq!(
system.executor().sleeping.len(),
2,
"Actor was not returned to Executor"
);
}
}