use axiom::prelude::*;
use log::LevelFilter;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::time::{Duration, Instant};
#[derive(Debug, Serialize, Deserialize)]
pub enum ForkCommand {
RequestFork(Aid),
UsingFork(Aid),
ForkPutDown(Aid),
}
#[derive(Eq, PartialEq)]
struct Fork {
clean: bool,
owned_by: Option<Aid>,
}
impl Fork {
fn new() -> Fork {
Fork {
clean: false,
owned_by: None,
}
}
fn fork_requested(mut self, context: Context, requester: Aid) -> ActorResult<Self> {
match &self.owned_by {
Some(owner) => {
if self.clean {
Ok(Status::skip(self))
} else {
owner.send_new(Command::GiveUpFork(context.aid.clone()))?;
Ok(Status::skip(self))
}
}
None => {
self.owned_by = Some(requester.clone());
requester.send_new(Command::ReceiveFork(context.aid.clone()))?;
Ok(Status::done(self))
}
}
}
fn fork_put_down(mut self, context: Context, sender: Aid) -> ActorResult<Self> {
match &self.owned_by {
Some(owner) => {
if owner == &sender {
self.owned_by = None;
self.clean = true;
Ok(Status::reset(self))
} else {
error!(
"[{}] fork_put_down() from non-owner: {} real owner is: {}",
context.aid, sender, owner
);
Ok(Status::done(self))
}
}
None => {
error!(
"[{}] fork_put_down() from non-owner: {} real owner is: None:",
context.aid, sender
);
Ok(Status::done(self))
}
}
}
fn using_fork(mut self, context: Context, sender: Aid) -> ActorResult<Self> {
match &self.owned_by {
Some(owner) => {
if owner == &sender {
self.clean = false;
Ok(Status::reset(self))
} else {
error!("[{}] Got UsingFork from non-owner: {}", context.aid, sender);
Ok(Status::done(self))
}
}
_ => {
error!("[{}] Got UsingFork from non-owner: {}", context.aid, sender);
Ok(Status::done(self))
}
}
}
pub async fn handle(self, context: Context, message: Message) -> ActorResult<Self> {
if let Some(msg) = message.content_as::<ForkCommand>() {
match &*msg {
ForkCommand::RequestFork(requester) => {
self.fork_requested(context, requester.clone())
}
ForkCommand::UsingFork(owner) => self.using_fork(context, owner.clone()),
ForkCommand::ForkPutDown(owner) => self.fork_put_down(context, owner.clone()),
}
} else {
Ok(Status::done(self))
}
}
}
#[derive(Debug)]
enum PhilosopherState {
Thinking,
Hungry,
Eating,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
StopEating(u16),
BecomeHungry(u16),
GiveUpFork(Aid),
ReceiveFork(Aid),
SendMetrics(Aid),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct MetricsReply {
aid: Aid,
metrics: Metrics,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct Metrics {
state_change_count: u16,
failed_to_eat: u16,
time_thinking: Duration,
time_hungry: Duration,
time_eating: Duration,
}
struct Philosopher {
time_slice: Duration,
state: PhilosopherState,
left_fork_aid: Aid,
has_left_fork: bool,
left_fork_requested: bool,
right_fork_aid: Aid,
has_right_fork: bool,
right_fork_requested: bool,
last_state_change: Instant,
metrics: Metrics,
}
impl Philosopher {
pub fn new(time_slice: Duration, left_fork_aid: Aid, right_fork_aid: Aid) -> Philosopher {
Philosopher {
time_slice,
state: PhilosopherState::Thinking,
left_fork_aid,
has_left_fork: false,
left_fork_requested: false,
right_fork_aid,
has_right_fork: false,
right_fork_requested: false,
last_state_change: Instant::now(),
metrics: Metrics {
state_change_count: 0,
failed_to_eat: 0,
time_thinking: Duration::from_micros(0),
time_hungry: Duration::from_micros(0),
time_eating: Duration::from_micros(0),
},
}
}
fn begin_eating(&mut self, context: Context) -> Result<(), StdError> {
self.metrics.time_hungry += Instant::elapsed(&self.last_state_change);
self.last_state_change = Instant::now();
self.state = PhilosopherState::Eating;
self.metrics.state_change_count += 1;
self.left_fork_aid
.send_new(ForkCommand::UsingFork(context.aid.clone()))?;
self.right_fork_aid
.send_new(ForkCommand::UsingFork(context.aid.clone()))?;
let msg = Message::new(Command::StopEating(self.metrics.state_change_count));
context.aid.send_after(msg, self.time_slice)?;
Ok(())
}
fn fork_received(mut self, context: Context, fork_aid: Aid) -> ActorResult<Self> {
if self.left_fork_aid == fork_aid {
self.has_left_fork = true;
self.left_fork_requested = false;
} else if self.right_fork_aid == fork_aid {
self.has_right_fork = true;
self.right_fork_requested = false;
} else {
panic!("[{}] Unknown Fork Received: {}", context.aid, fork_aid);
}
if self.has_left_fork && self.has_right_fork {
self.begin_eating(context)?;
}
Ok(Status::done(self))
}
fn request_missing_forks(&mut self, context: Context) -> Result<(), StdError> {
if !self.has_left_fork && !self.left_fork_requested {
self.left_fork_requested = true;
self.left_fork_aid
.send_new(ForkCommand::RequestFork(context.aid.clone()))?;
}
if !self.has_right_fork && !self.right_fork_requested {
self.right_fork_requested = true;
self.right_fork_aid
.send_new(ForkCommand::RequestFork(context.aid.clone()))?;
}
Ok(())
}
fn become_hungry(mut self, context: Context, state_num: u16) -> ActorResult<Self> {
if self.metrics.state_change_count == state_num {
if self.has_left_fork && self.has_right_fork {
self.begin_eating(context)?;
} else {
match &self.state {
PhilosopherState::Thinking => {
self.metrics.time_thinking += Instant::elapsed(&self.last_state_change);
self.last_state_change = Instant::now();
self.state = PhilosopherState::Hungry;
self.metrics.state_change_count += 1;
self.request_missing_forks(context)?;
}
PhilosopherState::Hungry => {
error!("[{}] Got BecomeHungry while eating!", context.aid);
}
PhilosopherState::Eating => {
error!("[{}] Got BecomeHungry while eating!", context.aid);
}
};
}
}
Ok(Status::done(self))
}
fn begin_thinking(&mut self, context: Context) -> Result<(), StdError> {
self.state = PhilosopherState::Thinking;
self.metrics.state_change_count += 1;
self.metrics.time_eating += Instant::elapsed(&self.last_state_change);
self.last_state_change = Instant::now();
let msg = Message::new(Command::BecomeHungry(self.metrics.state_change_count));
context.aid.send_after(msg, self.time_slice)?;
Ok(())
}
fn stop_eating(mut self, context: Context, state_num: u16) -> ActorResult<Self> {
if self.metrics.state_change_count == state_num {
if let PhilosopherState::Eating = &self.state {
self.begin_thinking(context)?;
}
}
Ok(Status::done(self))
}
fn give_up_fork(mut self, context: Context, fork_aid: Aid) -> ActorResult<Self> {
if self.left_fork_aid == fork_aid {
if self.has_left_fork {
self.has_left_fork = false;
fork_aid.send_new(ForkCommand::ForkPutDown(context.aid.clone()))?;
}
} else if self.right_fork_aid == fork_aid {
if self.has_right_fork {
self.has_right_fork = false;
fork_aid.send_new(ForkCommand::ForkPutDown(context.aid.clone()))?;
}
} else {
error!(
"[{}] Unknown fork asked for: {}:\n left ==> {}\n right ==> {}",
context.aid, fork_aid, self.left_fork_aid, self.right_fork_aid
);
}
match &self.state {
PhilosopherState::Hungry => {
self.metrics.failed_to_eat += 1;
self.begin_thinking(context)?;
}
PhilosopherState::Eating => {
self.begin_thinking(context)?;
}
_ => (),
}
Ok(Status::done(self))
}
fn send_metrics(self, context: Context, reply_to: Aid) -> ActorResult<Self> {
reply_to.send_new(MetricsReply {
aid: context.aid.clone(),
metrics: self.metrics,
})?;
Ok(Status::done(self))
}
pub async fn handle(self, context: Context, message: Message) -> ActorResult<Self> {
if let Some(msg) = message.content_as::<Command>() {
match &*msg {
Command::StopEating(state_num) => self.stop_eating(context, *state_num),
Command::BecomeHungry(state_num) => self.become_hungry(context, *state_num),
Command::ReceiveFork(fork_aid) => self.fork_received(context, fork_aid.clone()),
Command::GiveUpFork(fork_aid) => self.give_up_fork(context, fork_aid.clone()),
Command::SendMetrics(reply_to) => self.send_metrics(context, reply_to.clone()),
}
} else if let Some(msg) = message.content_as::<SystemMsg>() {
match &*msg {
SystemMsg::Start => {
context.aid.send_new(Command::BecomeHungry(0))?;
Ok(Status::done(self))
}
_ => Ok(Status::done(self)),
}
} else {
Ok(Status::done(self))
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct EndSimulation {}
pub fn main() {
let args: Vec<String> = env::args().collect();
let level = if args.contains(&"-v".to_string()) {
LevelFilter::Debug
} else {
LevelFilter::Info
};
env_logger::builder()
.filter_level(level)
.is_test(true)
.try_init()
.unwrap();
let count = 5 as usize;
let time_slice = Duration::from_millis(10);
let run_time = Duration::from_millis(5000);
let mut forks: Vec<Aid> = Vec::with_capacity(count);
let mut results: HashMap<Aid, Option<Metrics>> = HashMap::with_capacity(count);
let config = ActorSystemConfig::default().thread_pool_size(4);
let system = ActorSystem::create(config);
for i in 0..count {
let name = format!("Fork-{}", i);
let fork = system
.spawn()
.name(&name)
.with(Fork::new(), Fork::handle)
.unwrap();
forks.push(fork);
}
let names = vec![
"Confucius",
"Laozi",
"Descartes",
"Ben Franklin",
"Thomas Jefferson",
];
for left in 0..count {
let right = if left == 0 { count - 1 } else { left - 1 };
let state = Philosopher::new(time_slice, forks[left].clone(), forks[right].clone());
let philosopher = system
.spawn()
.name(names[left])
.with(state, Philosopher::handle)
.unwrap();
results.insert(philosopher, None);
}
let _shutdown = system
.spawn()
.name("Manager")
.with(
results,
move |mut state: HashMap<Aid, Option<Metrics>>, context: Context, message: Message| {
async move {
if let Some(msg) = message.content_as::<MetricsReply>() {
state.insert(msg.aid.clone(), Some(msg.metrics));
if !state.iter().any(|(_, metrics)| metrics.is_none()) {
info!("Final Metrics:");
for (aid, metrics) in state.iter() {
info!("{}: {:?}", aid, metrics);
}
context.system.trigger_shutdown();
}
} else if let Some(_) = message.content_as::<EndSimulation>() {
let request = Message::new(Command::SendMetrics(context.aid.clone()));
for (aid, _) in state.iter() {
aid.send(request.clone())?;
}
} else if let Some(msg) = message.content_as::<SystemMsg>() {
if let SystemMsg::Start = &*msg {
let msg = Message::new(EndSimulation {});
context.aid.send_after(msg, run_time)?;
}
}
Ok(Status::done(state))
}
},
)
.expect("failed to create shutdown actor");
system.await_shutdown(None);
}