use std::sync::Arc;
use async_channel::Sender;
use crate::{
error::TaskError,
task::{
RecordId, TaskId, TaskRunner,
frequency::{FrequencySeconds, FrequencyState},
},
timer::{TimerEvent, wheel::WheelCascadeGuide},
utils,
};
#[derive(Clone)]
pub struct Task {
pub task_id: TaskId,
pub(crate) runner: Arc<dyn TaskRunner<Output = ()> + Send + Sync>,
pub(crate) cascade_guide: WheelCascadeGuide,
pub(crate) frequency: FrequencyState,
pub(crate) frequency_config: FrequencySeconds,
pub max_concurrency: usize,
}
impl Task {
pub fn is_arrived(&self, current_sec: u64, current_min: u64, current_hour: u64) -> bool {
self.cascade_guide
.is_arrived(current_sec, current_min, current_hour)
}
pub fn next_alarm_timestamp(&mut self) -> Option<u64> {
self.frequency.next_alarm_timestamp()
}
pub(crate) fn set_wheel_position(&mut self, wheel_position: WheelCascadeGuide) {
self.cascade_guide = wheel_position;
}
}
#[derive(Default, Clone, Copy)]
pub struct TaskBuilder {
task_id: TaskId,
frequency: FrequencySeconds,
max_concurrency: usize,
}
impl TaskBuilder {
pub fn new(task_id: u64) -> Self {
Self {
task_id,
frequency: FrequencySeconds::default(),
max_concurrency: 1,
}
}
pub fn with_frequency_once_by_seconds(&mut self, seconds: u64) -> &mut Self {
self.frequency = FrequencySeconds::Once(seconds);
self
}
pub fn with_frequency_repeated_by_seconds(&mut self, seconds: u64) -> &mut Self {
self.frequency = FrequencySeconds::Repeated(seconds);
self
}
pub fn with_max_concurrency(&mut self, max: usize) -> &mut Self {
self.max_concurrency = max;
self
}
pub fn with_frequency_count_down_by_seconds(
&mut self,
count_down: u64,
seconds: u64,
) -> &mut Self {
self.frequency = FrequencySeconds::CountDown(count_down, seconds);
self
}
pub fn with_frequency_once_by_timestamp_seconds(
&mut self,
timestamp: u64,
) -> Result<&mut Self, TaskError> {
let now = utils::timestamp();
let gap = timestamp.checked_sub(now).filter(|&gap| gap > 0).ok_or(
TaskError::InvalidFrequency(format!(
"Once timestamp({timestamp} need greater than current timestamp({now})"
)),
)?;
self.frequency = FrequencySeconds::Once(gap);
Ok(self)
}
pub fn spawn_async<T: TaskRunner<Output = ()> + Send + Sync>(
self,
task_runner: T,
) -> Result<Task, TaskError> {
let frequency = self.frequency.into();
Ok(Task {
task_id: self.task_id,
runner: Arc::new(task_runner),
cascade_guide: WheelCascadeGuide::default(),
frequency,
frequency_config: self.frequency,
max_concurrency: self.max_concurrency,
})
}
}
#[allow(dead_code)]
pub(crate) struct TaskContext {
pub task_id: TaskId,
pub record_id: RecordId,
#[allow(dead_code)]
pub(crate) timer_event_sender: Option<Sender<TimerEvent>>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_with_frequency_once_by_timestamp_seconds_valid() {
let now = utils::timestamp();
let future = now + 100;
let mut builder = TaskBuilder::new(1);
let result = builder.with_frequency_once_by_timestamp_seconds(future);
assert!(result.is_ok());
}
#[test]
fn test_with_frequency_once_by_timestamp_seconds_past() {
let now = utils::timestamp();
let past = now - 10;
let mut builder = TaskBuilder::new(1);
let result = builder.with_frequency_once_by_timestamp_seconds(past);
assert!(result.is_err());
}
#[test]
fn test_with_frequency_once_by_timestamp_seconds_now() {
let now = utils::timestamp();
let mut builder = TaskBuilder::new(1);
let result = builder.with_frequency_once_by_timestamp_seconds(now);
assert!(result.is_err());
}
}