use crate::prelude::*;
use std::convert::TryFrom;
use future_lite::block_on;
use once_cell::sync::Lazy;
pub(crate) static GLOBAL_STATUS_REPORTER: Lazy<(AsyncSender<PublicEvent>, AsyncReceiver<PublicEvent>)> = Lazy::new(|| {
smol::channel::unbounded()
});
#[derive(Debug, Clone)]
pub struct StatusReporter {
inner: AsyncReceiver<PublicEvent>,
}
impl StatusReporter {
pub fn next_public_event(&self) -> Result<PublicEvent, channel::TryRecvError> {
let event = self.inner.try_recv()?;
Ok(event)
}
pub fn next_public_event_with_wait(&self) -> Result<PublicEvent, channel::RecvError> {
block_on(self.inner.recv())
}
pub async fn next_public_event_with_async_wait(&self) -> Result<PublicEvent, channel::RecvError> {
self.inner.recv().await
}
pub(crate) fn new(inner: AsyncReceiver<PublicEvent>) -> Self {
Self { inner }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicFinishTaskBody {
pub(crate) task_id: u64,
pub(crate) record_id: i64,
pub(crate) finish_time: u64,
pub(crate) finish_output: Option<PublicFinishOutput>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PublicFinishOutput {
ProcessOutput(std::process::Output),
ExceptionOutput(String),
}
impl PublicFinishTaskBody{
#[inline(always)]
pub fn get_task_id(&self) -> u64{
self.task_id
}
#[inline(always)]
pub fn get_record_id(&self) -> i64{
self.record_id
}
#[inline(always)]
pub fn get_finish_time(&self) -> u64{
self.finish_time
}
#[inline(always)]
pub fn get_finish_output(&mut self) -> Option<PublicFinishOutput>{
self.finish_output.take()
}
}
impl From<FinishTaskBody> for PublicFinishTaskBody{
fn from(value: FinishTaskBody) -> Self{
PublicFinishTaskBody{
task_id:value.task_id,
record_id:value.record_id,
finish_time:value.finish_time,
finish_output:value.finish_output.map(|o|o.into()),
}
}
}
impl From<FinishOutput> for PublicFinishOutput{
fn from(value:FinishOutput) -> Self{
match value{
FinishOutput::ProcessOutput(o) => PublicFinishOutput::ProcessOutput(o),
FinishOutput::ExceptionOutput(o) => PublicFinishOutput::ExceptionOutput(o)
}
}
}
#[derive(Debug, Clone)]
pub enum PublicEvent {
RemoveTask(u64),
RunningTask(u64, i64),
FinishTask(PublicFinishTaskBody),
TimeoutTask(u64, i64),
}
impl TryFrom<&TimerEvent> for PublicEvent {
type Error = anyhow::Error;
fn try_from(timer_event: &TimerEvent) -> Result<Self, Self::Error> {
match timer_event {
TimerEvent::RemoveTask(task_id) => Ok(PublicEvent::RemoveTask(*task_id)),
TimerEvent::AppendTaskHandle(_, delay_task_handler_box) => {
Ok(PublicEvent::RunningTask(delay_task_handler_box.get_task_id(), delay_task_handler_box.get_record_id()))
}
TimerEvent::FinishTask(finish_task_body) => {
Ok(PublicEvent::FinishTask(finish_task_body.clone().into()))
}
TimerEvent::TimeoutTask(task_id, record_id) => {
Ok(PublicEvent::TimeoutTask(*task_id, *record_id))
}
_ => Err(anyhow!("PublicEvent only accepts timer_event some variant( RemoveTask, CancelTask ,FinishTask )!")),
}
}
}
impl TryFrom<TimerEvent> for PublicEvent {
type Error = anyhow::Error;
fn try_from(timer_event: TimerEvent) -> Result<Self, Self::Error> {
match timer_event {
TimerEvent::RemoveTask(task_id) => Ok(PublicEvent::RemoveTask(task_id)),
TimerEvent::AppendTaskHandle(_, delay_task_handler_box) => {
Ok(PublicEvent::RunningTask(delay_task_handler_box.get_task_id(), delay_task_handler_box.get_record_id()))
}
TimerEvent::FinishTask(finish_task_body) => {
Ok(PublicEvent::FinishTask(finish_task_body.into()))
}
TimerEvent::TimeoutTask(task_id, record_id) => {
Ok(PublicEvent::TimeoutTask(task_id, record_id))
}
_ => Err(anyhow!("PublicEvent only accepts timer_event some variant( RemoveTask, CancelTask ,FinishTask )!")),
}
}
}
impl PublicEvent {
pub fn get_task_id(&self) -> u64 {
match self {
PublicEvent::RemoveTask(ref task_id) => *task_id,
PublicEvent::RunningTask(ref task_id, _) => *task_id,
PublicEvent::FinishTask(PublicFinishTaskBody{task_id,..}) => *task_id,
PublicEvent::TimeoutTask(ref task_id, _) => *task_id,
}
}
pub fn get_record_id(&self) -> Option<i64> {
match self {
PublicEvent::RemoveTask(_) => None,
PublicEvent::RunningTask(_,ref record_id) => Some(*record_id),
PublicEvent::FinishTask(PublicFinishTaskBody{record_id,..}) => Some(*record_id),
PublicEvent::TimeoutTask(_,ref record_id) => Some(*record_id),
}
}
}