use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum TaskResult {
Success(Option<String>),
Failed(String),
Skipped(String),
}
#[derive(Debug, Clone, PartialEq)]
pub enum TaskStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct TaskContext {
pub task_id: String,
pub execution_id: String,
pub scheduled_time: SystemTime,
pub actual_start_time: Option<SystemTime>,
pub metadata: HashMap<String, String>,
}
impl TaskContext {
pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
Self {
task_id,
execution_id: Uuid::new_v4().to_string(),
scheduled_time,
actual_start_time: None,
metadata: HashMap::new(),
}
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn mark_started(&mut self) {
self.actual_start_time = Some(SystemTime::now());
}
pub fn execution_delay(&self) -> Option<Duration> {
self.actual_start_time
.and_then(|start| start.duration_since(self.scheduled_time).ok())
}
}
pub type TaskFn = dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = TaskResult> + Send>> + Send + Sync;
#[derive(Clone)]
pub struct Task {
pub id: String,
pub name: String,
pub description: Option<String>,
pub group: Option<String>,
pub priority: i32,
pub max_retry: u32,
pub timeout: Option<Duration>,
pub enabled: bool,
pub initial_delay: Option<Duration>,
pub execution_delay: Option<Duration>,
executor: Arc<TaskFn>,
metadata: HashMap<String, String>,
}
impl Task {
pub fn new<F, Fut>(id: impl Into<String>, name: impl Into<String>, executor: F) -> Self
where
F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = TaskResult> + Send + 'static,
{
let executor = Arc::new(move |ctx| {
let fut = executor(ctx);
Box::pin(fut) as Pin<Box<dyn Future<Output = TaskResult> + Send>>
});
Self {
id: id.into(),
name: name.into(),
description: None,
group: None,
priority: 0,
max_retry: 0,
timeout: None,
enabled: true,
initial_delay: None,
execution_delay: None,
executor,
metadata: HashMap::new(),
}
}
pub fn with_initial_delay(mut self, delay: Duration) -> Self {
self.initial_delay = Some(delay);
self
}
pub fn with_execution_delay(mut self, delay: Duration) -> Self {
self.execution_delay = Some(delay);
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_group(mut self, group: impl Into<String>) -> Self {
self.group = Some(group.into());
self
}
pub fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
pub fn with_max_retry(mut self, max_retry: u32) -> Self {
self.max_retry = max_retry;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub async fn execute(&self, context: TaskContext) -> TaskResult {
if !self.enabled {
return TaskResult::Skipped("Task is disabled".to_string());
}
(self.executor)(context).await
}
pub fn get_metadata(&self, key: &str) -> Option<&String> {
self.metadata.get(key)
}
}
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task")
.field("id", &self.id)
.field("name", &self.name)
.field("description", &self.description)
.field("group", &self.group)
.field("priority", &self.priority)
.field("max_retry", &self.max_retry)
.field("timeout", &self.timeout)
.field("enabled", &self.enabled)
.field("metadata", &self.metadata)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct TaskExecution {
pub execution_id: String,
pub task_id: String,
pub scheduled_time: SystemTime,
pub start_time: Option<SystemTime>,
pub end_time: Option<SystemTime>,
pub status: TaskStatus,
pub result: Option<TaskResult>,
pub retry_count: u32,
pub error_message: Option<String>,
}
impl TaskExecution {
pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
Self {
execution_id: Uuid::new_v4().to_string(),
task_id,
scheduled_time,
start_time: None,
end_time: None,
status: TaskStatus::Pending,
result: None,
retry_count: 0,
error_message: None,
}
}
pub fn start(&mut self) {
self.start_time = Some(SystemTime::now());
self.status = TaskStatus::Running;
}
pub fn complete(&mut self, result: TaskResult) {
self.end_time = Some(SystemTime::now());
self.status = match &result {
TaskResult::Success(_) => TaskStatus::Completed,
TaskResult::Failed(msg) => {
self.error_message = Some(msg.clone());
TaskStatus::Failed
}
TaskResult::Skipped(_) => TaskStatus::Completed,
};
self.result = Some(result);
}
pub fn cancel(&mut self) {
self.end_time = Some(SystemTime::now());
self.status = TaskStatus::Cancelled;
}
pub fn duration(&self) -> Option<Duration> {
match (self.start_time, self.end_time) {
(Some(start), Some(end)) => end.duration_since(start).ok(),
_ => None,
}
}
}