1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//! `StatusReporter` is to expose the necessary operational information
//! to the outside world.
use crate::prelude::*;
use std::convert::TryFrom;
use future_lite::block_on;


/// # Required features
///
/// This function requires the `status-report` feature of the `delay_timer`
/// crate to be enabled.
#[derive(Debug, Clone)]
pub struct StatusReporter {
    inner: AsyncReceiver<PublicEvent>,
}

impl StatusReporter {

    /// Non-blocking get `PublicEvent` via `StatusReporter`.
    pub fn next_public_event(&self) -> Result<PublicEvent, channel::TryRecvError> {
        let event = self.inner.try_recv()?;
        Ok(event)
    }

    /// Blocking get `PublicEvent` via `StatusReporter`.
    pub fn next_public_event_with_wait(&self) -> Result<PublicEvent, channel::RecvError> {
        block_on(self.inner.recv())
    }

    /// Async get `PublicEvent` via `StatusReporter`.
    pub async fn next_public_event_with_async_wait(&self) -> Result<PublicEvent, channel::RecvError> {
        Ok(self.inner.recv().await?)
    }

    pub(crate) fn new(inner: AsyncReceiver<PublicEvent>) -> Self {
        Self { inner }
    }
}

// Define types independently to avoid coupling internal types.
/// The information generated when completing a task.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicFinishTaskBody {
    pub(crate) task_id: u64,
    pub(crate) record_id: i64,
    pub(crate) finish_time: u64,
    pub(crate) finish_output: Option<PublicFinishOutput>,
}

// Define types independently to avoid coupling internal types.
/// The output generated when the task is completed.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PublicFinishOutput {
    /// The output generated when the process task is completed.
    ProcessOutput(std::process::Output),
    /// Exception output for a task that did not run successfully.
    ExceptionOutput(String),
}

impl PublicFinishTaskBody{

    /// Get the TaskInstance task-id.
    #[inline(always)]
    pub fn get_task_id(&self) -> u64{
        self.task_id
    }


    /// Get the TaskInstance record-id.
    #[inline(always)]
    pub fn get_record_id(&self) -> i64{
        self.record_id
    }


    /// Get the TaskInstance finish-time.
    #[inline(always)]
    pub fn get_finish_time(&self) -> u64{
        self.finish_time
    }

    /// Get the output on internal completion.
    #[inline(always)]
    pub fn get_finish_output(&mut self) -> Option<PublicFinishOutput>{
        self.finish_output.take()
    }
}

impl From<FinishTaskBody> for PublicFinishTaskBody{
    fn from(value: FinishTaskBody) -> Self{
        PublicFinishTaskBody{
            task_id:value.task_id,
            record_id:value.record_id,
            finish_time:value.finish_time,
            finish_output:value.finish_output.map(|o|o.into()),
        }
    }
}

impl From<FinishOutput> for PublicFinishOutput{
    fn from(value:FinishOutput) -> Self{
        match value{
            FinishOutput::ProcessOutput(o) => PublicFinishOutput::ProcessOutput(o),
            FinishOutput::ExceptionOutput(o) => PublicFinishOutput::ExceptionOutput(o) 
        }
    }
}

/// `PublicEvent`, describes the open events that occur in the delay-timer of the task.
#[derive(Debug, Clone)]
pub enum PublicEvent {
    /// Describes which task is removed.
    RemoveTask(u64),
    /// Describes which task produced a new running instance, record the id.
    RunningTask(u64, i64),
    /// Describe which task instance completed.
    FinishTask(PublicFinishTaskBody),
    /// Describe which task instance timeout .
    TimeoutTask(u64, i64),
}

impl TryFrom<&TimerEvent> for PublicEvent {
    type Error = anyhow::Error;

    fn try_from(timer_event: &TimerEvent) -> Result<Self, Self::Error> {
        match timer_event {
            TimerEvent::RemoveTask(task_id) => Ok(PublicEvent::RemoveTask(*task_id)),
            TimerEvent::AppendTaskHandle(_, delay_task_handler_box) => {
                Ok(PublicEvent::RunningTask(delay_task_handler_box.get_task_id(), delay_task_handler_box.get_record_id()))
            }
            TimerEvent::FinishTask(finish_task_body) => {
                // TODO: Be wary, clone can involve a lot of memory and consume performance.
                Ok(PublicEvent::FinishTask(finish_task_body.clone().into()))
            }

            TimerEvent::TimeoutTask(task_id, record_id) => {
                Ok(PublicEvent::TimeoutTask(*task_id, *record_id))
            }

            _ => Err(anyhow!("PublicEvent only accepts timer_event some variant( RemoveTask, CancelTask ,FinishTask )!")),
        }
    }
}

impl PublicEvent {
    /// Get the task_id corresponding to the event.
   pub fn get_task_id(&self) -> u64 {
        match self {
            PublicEvent::RemoveTask(ref task_id) => *task_id,
            PublicEvent::RunningTask(ref task_id, _) => *task_id,
            PublicEvent::FinishTask(PublicFinishTaskBody{task_id,..}) => *task_id,
            PublicEvent::TimeoutTask(ref task_id, _) => *task_id,
        }
    }

    /// Get the record_id corresponding to the event.
   pub fn get_record_id(&self) -> Option<i64> {
        match self {
            PublicEvent::RemoveTask(_) => None,
            PublicEvent::RunningTask(_,ref record_id) => Some(*record_id),
            PublicEvent::FinishTask(PublicFinishTaskBody{record_id,..}) => Some(*record_id),
            PublicEvent::TimeoutTask(_,ref record_id) => Some(*record_id),
      
        }
    }
}