use crate::actix::*;
use crate::{
mail, Action, Config, Motor, MotorId, MotorMessage, PinError, Program, Protocol, Pump,
PumpMessage, Step, ValidateProtocolError,
};
use lazy_static::lazy_static;
use uom::si::f64::*;
use uom::si::time::second;
use uom::si::volume::milliliter;
use uom::si::volume_rate::milliliter_per_second;
use uuid::Uuid;
use std::{fmt, ops::Index, time::Duration};
lazy_static! {
static ref VOLUME: Volume = Volume::new::<milliliter>(500.0);
static ref RATE: VolumeRate = VolumeRate::new::<milliliter_per_second>(5.0);
static ref TIME: Time = *VOLUME / *RATE;
static ref DURATION: Duration = {
let secs = TIME.get::<second>();
let nanos = ((secs - secs.floor()) * 1.0_E9).floor() as u32;
let secs = secs.floor() as u64;
Duration::new(secs, nanos)
};
}
type Result<T> = std::result::Result<T, Error>;
type CoordContext = Context<Coordinator>;
#[derive(Debug)]
pub enum Error {
ProtocolConversion(ValidateProtocolError),
Busy,
Pin(PinError),
}
impl From<ValidateProtocolError> for Error {
fn from(err: ValidateProtocolError) -> Self {
Error::ProtocolConversion(err)
}
}
impl From<PinError> for Error {
fn from(err: PinError) -> Self {
Error::Pin(err)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Coordinator error: {:?}", self)
}
}
impl std::error::Error for Error {}
#[derive(Debug)]
pub enum Message {
Continue,
Halt,
Stop,
ExchangeStop(MotorId),
Start(Protocol, Option<Uuid>),
Subscribe(Box<dyn Update>),
}
impl ActixMessage for Message {
type Result = Result<()>;
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "use_serde", derive(Deserialize, Serialize))]
#[cfg_attr(feature = "use_serde", serde(rename_all = "lowercase"))]
pub enum State {
Waiting,
Stopped {
early: bool,
},
Running,
}
impl Default for State {
fn default() -> Self {
State::Stopped { early: false }
}
}
#[derive(Debug)]
struct Addresses {
motors: Vec<Addr<Motor>>,
pump: Addr<Pump>,
subscribers: Addr<Subscribers>,
}
impl Index<MotorId> for Addresses {
type Output = Addr<Motor>;
fn index(&self, i: MotorId) -> &Self::Output {
&self.motors[i]
}
}
#[derive(Debug)]
struct Devices {
motors: Vec<Motor>,
pump: Pump,
}
#[derive(Debug, Default)]
pub(crate) struct CoordState {
pub(crate) program: Option<Program>,
pub(crate) remaining: Vec<Action>,
pub(crate) current: Option<Action>,
pub(crate) buffer: Option<MotorId>,
pub(crate) status: State,
pub(crate) completed: Vec<Action>,
pub(crate) uuid: Option<Uuid>,
}
#[derive(Debug)]
pub struct Coordinator {
devices: Option<Devices>,
addresses: Option<Addresses>,
pub(crate) state: CoordState,
admins: Vec<String>,
}
impl Coordinator {
pub fn try_new(config: Config) -> Result<Self> {
let mut pump = Pump::try_new(config.pump.pins)?;
pump.invert = config.pump.invert;
let motors = config
.motors
.into_iter()
.map(|spec| {
let period = spec.period;
let range = spec.range[0]..=spec.range[1];
let pin = spec.pin;
Motor::try_new(period, range, pin)
})
.collect::<std::result::Result<Vec<_>, _>>()?;
let devices = Some(Devices { motors, pump });
Ok(Self {
devices,
addresses: None,
state: CoordState::default(),
admins: config.admins,
})
}
pub fn program(&self) -> Option<&Program> {
self.state.program.as_ref()
}
pub fn status(&self) -> State {
self.state.status
}
fn close_all(&self, context: &mut CoordContext) {
if let Some(ref addresses) = self.addresses {
addresses[0].do_send(MotorMessage::Shut);
for addr in addresses.motors.iter().skip(1) {
addr.do_send(MotorMessage::Close);
}
}
context.run_later(Duration::new(5, 0), move |coord, _| {
if let Some(ref addresses) = coord.addresses {
for addr in &addresses.motors {
addr.do_send(MotorMessage::Stop);
}
}
});
}
fn _close(&self, index: usize, context: &mut CoordContext) {
if let Some(ref addresses) = self.addresses {
addresses[index].do_send(MotorMessage::Close);
context.run_later(Duration::new(5, 0), move |coord, _| {
if let Some(ref addresses) = coord.addresses {
addresses[index].do_send(MotorMessage::Stop);
}
});
}
}
fn close(&self, valve: usize, context: &mut CoordContext) {
let index = valve + 1;
self._close(index, context);
}
fn _open(&self, index: usize, context: &mut CoordContext) {
if let Some(ref addresses) = self.addresses {
addresses[index].do_send(MotorMessage::Open);
context.run_later(Duration::new(5, 0), move |coord, _| {
if let Some(ref addresses) = coord.addresses {
addresses[index].do_send(MotorMessage::Stop);
}
});
}
}
fn open(&self, valve: usize, context: &mut CoordContext) {
let index = valve + 1;
self._open(index, context);
}
fn shut_waste(&self, context: &mut CoordContext) {
if let Some(ref addresses) = self.addresses {
addresses[0].do_send(MotorMessage::Shut);
context.run_later(Duration::new(5, 0), move |coord, _| {
if let Some(ref addresses) = coord.addresses {
addresses[0].do_send(MotorMessage::Stop);
}
});
}
}
fn open_waste(&self, context: &mut CoordContext) {
self._open(0, context);
}
fn close_waste(&self, context: &mut CoordContext) {
self._close(0, context);
}
fn perfuse(&self) {
if let Some(ref addresses) = self.addresses {
addresses.pump.do_send(PumpMessage::Perfuse);
}
}
fn drain(&self) {
if let Some(ref addresses) = self.addresses {
addresses.pump.do_send(PumpMessage::Drain);
}
}
fn stop_pump(&self) {
if let Some(ref addresses) = self.addresses {
addresses.pump.do_send(PumpMessage::Stop);
}
}
fn try_advance(&mut self, context: &mut CoordContext) {
let result = self.advance(context);
if let Err(err) = result {
log::error!("Aborting due to program advance error: {:?}", err);
let mut tries = 0;
let mut result = self.hcf();
while tries < 5 && result.is_err() {
std::thread::sleep(Duration::from_millis(200));
result = self.hcf();
tries += 1;
}
if result.is_err() {
log::error!("Could not fully stop program; please take caution!");
}
}
}
fn advance(&mut self, context: &mut CoordContext) -> Result<Option<Action>> {
if !self.state.remaining.is_empty() {
self.state.status = State::Running;
let action = self.state.remaining.remove(0);
match action.clone() {
Action::Perfuse(buffer) => {
self.shut_waste(context);
self.open(buffer, context);
self.perfuse();
context.run_later(*DURATION, move |coord, context| {
coord.close(buffer, context);
coord.open_waste(context);
context.run_later(Duration::new(10, 0), move |coord, context| {
coord.stop_pump();
coord.close_waste(context);
coord.try_advance(context);
});
});
}
Action::Sleep(duration) => {
context.run_later(duration, Self::try_advance);
}
Action::Hail => {
self.state.status = State::Waiting;
self.publish(StatusMessage::Paused, context);
}
Action::Drain => {
self.close_waste(context);
self.drain();
context.run_later(*DURATION + Duration::new(5, 0), |coord, context| {
coord.stop_pump();
coord.shut_waste(context);
coord.try_advance(context);
});
}
Action::Finish => {
self.stop_pump();
self.close_all(context);
let _ = mail::notify(&self.admins, mail::Status::Finished);
}
Action::Notify(msg) => {
log::trace!("Notifying user (subject: {}).", msg.subject);
let _ = mail::mail(&self.admins, msg.subject, msg.message);
self.try_advance(context);
}
}
self.state.completed.push(action.clone());
self.state.current = Some(action);
} else {
self.state.status = State::Stopped { early: false };
self.state.current = None;
}
Ok(self.state.current.clone())
}
fn clear(&mut self) -> Result<()> {
if let Some(index) = self.state.remaining.iter().position(Action::is_disjoint) {
self.state.remaining.truncate(index);
}
self.state.program = None;
Ok(())
}
fn stop<I>(&mut self, buffer: I) -> Result<()>
where
I: Into<Option<MotorId>>,
{
if let Some(target) = buffer.into() {
if let Some(current) = self.state.buffer {
if current == target {
self.clear()?;
} else {
let program = Protocol::with_step(Step::Perfuse(target, None)).as_program()?;
self.state.program = Some(program.clone());
self.state.remaining = program.into();
}
}
}
Ok(())
}
fn resume(&mut self, context: &mut CoordContext) -> Result<()> {
if self.status() != State::Waiting {
log::warn!("Coordinator told to resume while not paused; ignoring.");
return Ok(());
}
self.state.status = State::Running;
self.advance(context)?;
Ok(())
}
fn hcf(&mut self) -> Result<()> {
self.stop_pump();
self.state.status = State::Stopped { early: true };
self.state.completed.pop();
let _ = mail::notify(&self.admins, mail::Status::Aborted);
Ok(())
}
pub fn is_stopped(&self) -> bool {
match self.state.status {
State::Stopped { .. } => true,
State::Running | State::Waiting => false,
}
}
fn start(
&mut self,
protocol: &Protocol,
label: Option<Uuid>,
context: &mut CoordContext,
) -> Result<()> {
let program = protocol.as_program()?;
if self.is_stopped() {
self.stop_pump();
self.close_all(context);
context.run_later(Duration::new(10, 0), move |coord, context| {
let id = label.unwrap_or_else(Uuid::new_v4);
coord.state.program = Some(program.clone());
coord.state.remaining = program.into();
coord.state.current = None;
coord.state.buffer = None;
coord.state.status = State::Running;
coord.state.completed.clear();
coord.state.uuid = Some(id);
coord.advance(context).unwrap();
});
}
Ok(())
}
pub fn subscribe(&self, sub: Box<dyn Update>) {
if let Some(addr) = &self.addresses {
addr.subscribers.do_send(SubscribersMessage::Add(sub));
}
}
fn publish(&self, message: StatusMessage, context: &mut <Self as Actor>::Context) {
if let Some(addr) = &self.addresses {
let message = Status {
address: context.address(),
message,
};
addr.subscribers
.do_send(SubscribersMessage::Forward(message));
}
}
}
impl Actor for Coordinator {
type Context = CoordContext;
fn started(&mut self, ctx: &mut Self::Context) {
let subscribers = Subscribers {
subs: vec![],
coord: ctx.address(),
}
.start();
if let Some(devices) = self.devices.take() {
let motors = devices
.motors
.into_iter()
.map(Actor::start)
.collect::<Vec<_>>();
let pump = devices.pump.start();
let addresses = Addresses {
pump,
motors,
subscribers,
};
self.addresses = Some(addresses);
}
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.addresses = None;
}
}
impl Handle<Message> for Coordinator {
type Result = Result<()>;
fn handle(&mut self, message: Message, context: &mut Self::Context) -> Self::Result {
match message {
Message::Continue => {
self.resume(context)?;
self.publish(StatusMessage::Continued, context);
}
Message::Stop => {
self.stop(None)?;
self.publish(StatusMessage::StopQueued { early: false }, context);
}
Message::Halt => {
self.hcf()?;
self.publish(StatusMessage::Halted, context);
}
Message::ExchangeStop(id) => {
self.stop(id)?;
self.publish(StatusMessage::StopQueued { early: false }, context);
}
Message::Start(proto, label) => {
self.start(&proto, label, context)?;
self.publish(StatusMessage::Started(proto), context);
}
Message::Subscribe(sub) => self.subscribe(sub),
}
Ok(())
}
}
#[derive(Debug)]
enum SubscribersMessage {
Add(Box<dyn Update>),
Forward(Status),
}
impl ActixMessage for SubscribersMessage {
type Result = ();
}
#[derive(Debug)]
pub struct Subscribers {
coord: Addr<Coordinator>,
subs: Vec<Box<dyn Update>>,
}
impl Actor for Subscribers {
type Context = Context<Self>;
}
impl Handle<SubscribersMessage> for Subscribers {
type Result = ();
fn handle(&mut self, message: SubscribersMessage, _context: &mut Self::Context) {
match message {
SubscribersMessage::Forward(message) => {
for sub in self.subs.iter() {
sub.handle(&message, &self);
}
}
SubscribersMessage::Add(listener) => {
self.subs.push(listener);
}
}
}
}
pub trait Respond {
fn respond(&self, msg: Message);
}
impl Respond for Subscribers {
fn respond(&self, msg: Message) {
self.coord.do_send(msg);
}
}
pub trait Update: std::fmt::Debug + Send {
fn handle(&self, msg: &Status, coord: &Subscribers);
}
#[derive(Debug)]
pub struct Status {
pub address: Addr<Coordinator>,
pub message: StatusMessage,
}
#[derive(Debug)]
pub enum StatusMessage {
Continued,
Started(Protocol),
Paused,
StopQueued {
early: bool,
},
Halted,
}
impl ActixMessage for Status {
type Result = ();
}
#[allow(clippy::print_stdout)]
pub mod tui {
use super::{Message, Respond, Status, StatusMessage, Subscribers, Update};
#[allow(missing_copy_implementations)]
#[derive(Debug, Default)]
pub struct Tui {}
impl Update for Tui {
fn handle(&self, status: &Status, coord: &Subscribers) {
match &status.message {
StatusMessage::Paused => {
log::trace!("Prompting user to unpause.");
use std::io::{stdin, stdout, BufRead, BufReader, Write};
let stdin = stdin();
let mut stdin = BufReader::new(stdin.lock());
print!("Coordinator paused. Press enter to continue when desired.");
let _ = stdout().lock().flush();
let mut s = String::new();
loop {
if stdin.read_line(&mut s).is_ok() {
break;
}
}
coord.respond(Message::Continue);
}
StatusMessage::Continued => log::debug!("Coordinator continuing."),
StatusMessage::Started(proto) => {
log::debug!("Coordinator starting protocol: {:?}", proto)
}
StatusMessage::StopQueued { early } => {
log::debug!("Coordinator stop queued (early: {})", early)
}
StatusMessage::Halted => log::warn!("Coordinator halted!"),
}
}
}
}