use std::fmt;
use std::path::{
Path,
PathBuf
};
use std::pin::Pin;
use std::process::Stdio;
use std::task::{
Context,
Poll
};
use futures::{
Future,
FutureExt
};
use tokio::task;
use tokio::process::Command;
use super::{
Event,
Filter,
DaemonEventFilter,
DynamicEventFilter,
PeerEventFilter,
dispatch::Interest
};
use crate::util::*;
#[derive(Debug)]
pub struct HandlerScript {
pub name: String,
pub path: PathBuf
}
#[derive(Debug)]
pub enum ParsedHandler {
Daemon {
script: HandlerScript,
filter: DaemonEventFilter
},
Dynamic {
script: HandlerScript,
filter: DynamicEventFilter
},
Peer {
script: HandlerScript,
filter: PeerEventFilter
}
}
pub enum Handler<E: Event> {
Spawner {
script: HandlerScript,
filter: E::Filter,
channel: Sender<E>
},
Plain {
name: String,
filter: Option<E::Filter>,
channel: Sender<E>
}
}
impl<E: Event> fmt::Debug for Handler<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Spawner{script, ..} => write!(f,
"[{}] {} {}",
E::NAME,
script.name,
script.path.display()
),
Self::Plain{name, ..} => write!(f,
"(Plain) [{}] {}",
E::NAME,
name
)
}
}
}
impl<E: Event> fmt::Display for Handler<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Spawner{script, ..} if E::NAME == "dynamic" => {
f.write_str(&script.name)
},
Self::Spawner{script, ..} => write!(f,
"{} [{}]",
script.name,
E::NAME
),
Self::Plain{name, ..} => write!(f,
"(internal handler) {} [{}]",
name,
E::NAME
)
}
}
}
impl<E: Event> Handler<E> {
pub fn send(&self, event: E) -> bool {
match self {
Self::Spawner{channel, filter, ..} if filter.matches(&event) => {
let _ = channel.send(event);
true
},
Self::Plain{channel, filter, ..} if filter.matches(&event) => {
let _ = channel.send(event);
true
},
_ => false
}
}
}
pub struct HandlerTask<E: Event> {
handler: Option<Handler<E>>,
task: Task
}
#[derive(Debug)]
pub enum HandlerResult<E: Event> {
Ok(Handler<E>),
Killed,
TaskPanic
}
impl<E: Event> Unpin for HandlerTask<E> {}
impl<E: Event> HandlerTask<E> {
pub fn new(handler: Handler<E>, task: Task) -> Self {
Self {handler: Some(handler), task}
}
pub fn send(&self, event: E) -> bool {
self.handler
.as_ref()
.map(|h| h.send(event))
.unwrap_or(false)
}
}
impl<E: Event> Future for HandlerTask<E> {
type Output = HandlerResult<E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let new_self = self.get_mut();
new_self.task
.poll_unpin(cx)
.map(|join_res| match join_res {
Ok(()) => HandlerResult::Ok(new_self.handler
.take()
.expect("HandlerTask polled after completion")
),
Err(err) => {
if err.is_cancelled() {HandlerResult::Killed}
else if err.is_panic() {HandlerResult::TaskPanic}
else {
unreachable!("JoinError is either cancelled or panic")
}
}
})
}
}
impl<E: Event> fmt::Debug for HandlerTask<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.handler {
Some(handler) => fmt::Debug::fmt(handler, f),
None => write!(f, "terminated handler: [{}]", E::NAME)
}
}
}
impl<E: Event> fmt::Display for HandlerTask<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.handler {
Some(handler) => fmt::Display::fmt(handler, f),
None => write!(f, "(terminated handler) [{}]", E::NAME)
}
}
}
pub struct Spawner<E: Event> {
event_rx: Receiver<E>,
cmd: Command
}
impl<E: Event> Spawner<E> {
pub fn new(event_rx: Receiver<E>, path: &Path) -> Self {
let mut cmd = Command::new(path);
cmd.env("CNSPRCY_EVENT_TYP", E::NAME)
.stdin(Stdio::null());
Self {event_rx, cmd}
}
pub async fn run(mut self) {
while let Some(evt) = self.event_rx.recv().await {
evt.into_envs(&mut self.cmd);
match self.cmd.spawn() {
Ok(mut child) => match child.wait().await {
Ok(_exit) => continue,
Err(_err) => break
},
Err(_err) => break
}
}
}
}
pub fn launch(parsed: ParsedHandler) -> Interest {
match parsed {
ParsedHandler::Daemon{script, filter} => {
let (channel, rx) = new_channel();
let spawner = Spawner::new(rx, &script.path);
let handler = Handler::Spawner {
script,
filter,
channel
};
let task = HandlerTask {
handler: Some(handler),
task: task::spawn(spawner.run())
};
Interest::Daemon(task)
},
ParsedHandler::Dynamic{script, filter} => {
let (channel, rx) = new_channel();
let spawner = Spawner::new(rx, &script.path);
let handler = Handler::Spawner {
script,
filter,
channel
};
let task = HandlerTask {
handler: Some(handler),
task: task::spawn(spawner.run())
};
Interest::Dynamic(task)
},
ParsedHandler::Peer{script, filter} => {
let (channel, rx) = new_channel();
let spawner = Spawner::new(rx, &script.path);
let handler = Handler::Spawner {
script,
filter,
channel
};
let task = HandlerTask {
handler: Some(handler),
task: task::spawn(spawner.run())
};
Interest::Peer(task)
}
}
}