use super::{CheckTargetError, Status, Target};
use futures::future::{join, join_all, BoxFuture, FutureExt};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use tokio::runtime::{self};
use tokio::select;
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::task::{self};
use tokio::time::{self};
pub type OldStatus = Status;
pub type BoxedTarget<'a> = Box<dyn Target + Send + 'a>;
pub type BoxedHandler<'a> = Box<dyn FnMut(&dyn Target, Status, OldStatus, Option<CheckTargetError>) + Send + 'a>;
pub struct AsyncTarget<'a> {
target: BoxedTarget<'a>,
check_handler: BoxedHandler<'a>,
check_interval: Duration,
status: Status,
}
impl<'a> AsyncTarget<'a> {
pub fn new(target: BoxedTarget<'a>, check_handler: BoxedHandler<'a>, check_interval: Duration) -> Self {
AsyncTarget {
target,
check_handler,
check_interval,
status: Status::Unknown,
}
}
}
impl<'a, T, U> From<(T, U, Duration)> for AsyncTarget<'a>
where
T: Target + Send + 'a,
U: FnMut(&dyn Target, Status, OldStatus, Option<CheckTargetError>) + Send + 'a,
{
fn from(pieces: (T, U, Duration)) -> AsyncTarget<'a> {
let (target, check_handler, check_interval) = pieces;
AsyncTarget::new(Box::from(target), Box::from(check_handler), check_interval)
}
}
pub struct AsyncTargetExecutor {
worker: Option<(JoinHandle<()>, Sender<()>)>,
}
impl AsyncTargetExecutor {
pub fn new() -> Self {
AsyncTargetExecutor { worker: None }
}
pub fn start(&mut self, targets: Vec<AsyncTarget<'static>>) {
if self.worker.is_none() {
let (teardown_send, teardown_recv) = watch::channel(());
let runtime = runtime::Builder::new_multi_thread().enable_time().build().unwrap();
let tasks: Vec<BoxFuture<()>> = targets
.into_iter()
.map(|target| check_target_periodically(target, teardown_recv.clone()).boxed())
.collect();
let handle = spawn(move || {
runtime.block_on(join_all(tasks));
runtime.shutdown_background();
});
self.worker = Some((handle, teardown_send));
}
}
pub fn stop(&mut self) {
if let Some((handle, teardown_send)) = self.worker.take() {
teardown_send.send(()).unwrap();
handle.join().unwrap();
}
}
}
impl Default for AsyncTargetExecutor {
fn default() -> Self {
AsyncTargetExecutor::new()
}
}
impl Drop for AsyncTargetExecutor {
fn drop(&mut self) {
self.stop()
}
}
async fn check_target_periodically(mut target: AsyncTarget<'static>, mut teardown_recv: Receiver<()>) {
loop {
target = select! {
target = check_target(target) => target,
_ = teardown_recv.changed() => return,
};
}
}
async fn check_target(mut target: AsyncTarget<'static>) -> AsyncTarget<'static> {
let sleep = time::sleep(target.check_interval);
let task = task::spawn_blocking(|| {
let (status, error) = match target.target.check_availability() {
Ok(status) => (status, None),
Err(error) => (Status::Unknown, Some(error)),
};
let old_status = target.status;
target.status = status.clone();
target.check_handler.as_mut()(target.target.as_ref(), status, old_status, error);
target
});
let (tmp, _) = join(task, sleep).await;
tmp.unwrap()
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use mockall::Sequence;
use super::*;
use crate::target::MockTarget;
#[test]
fn async_target_call_behavior() {
let mut mock = MockTarget::new();
let mut call_sequence = Sequence::new();
mock.expect_check_availability()
.times(1)
.returning(|| Ok(Status::Available))
.in_sequence(&mut call_sequence);
mock.expect_check_availability()
.times(1)
.returning(|| Ok(Status::NotAvailable))
.in_sequence(&mut call_sequence);
mock.expect_check_availability()
.times(1)
.returning(|| Err(CheckTargetError::from("Error")))
.in_sequence(&mut call_sequence);
let (send, recv) = mpsc::channel();
let handler = move |_: &dyn Target, new: Status, old: OldStatus, error: Option<CheckTargetError>| {
match old {
Status::Unknown => {
assert_eq!(new, Status::Available);
assert_eq!(error.is_none(), true);
}
Status::Available => {
assert_eq!(new, Status::NotAvailable);
assert_eq!(error.is_none(), true);
}
Status::NotAvailable => {
assert_eq!(new, Status::Unknown);
assert_eq!(error.is_some(), true);
let error = error.unwrap();
assert_eq!(format!("{}", error), "Error");
send.send(()).unwrap();
}
}
};
let mut exec = AsyncTargetExecutor::new();
exec.start(vec![AsyncTarget::from((mock, handler, Duration::from_millis(100)))]);
recv.recv().unwrap();
exec.stop();
}
}