Skip to main content

rocketmq_rust/schedule/
task.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::Duration;
21use std::time::SystemTime;
22
23use uuid::Uuid;
24
25/// Task execution result
26#[derive(Debug, Clone)]
27pub enum TaskResult {
28    Success(Option<String>),
29    Failed(String),
30    Skipped(String),
31}
32
33/// Task execution status
34#[derive(Debug, Clone, PartialEq)]
35pub enum TaskStatus {
36    Pending,
37    Running,
38    Completed,
39    Failed,
40    Cancelled,
41}
42
43/// Task execution context
44#[derive(Debug, Clone)]
45pub struct TaskContext {
46    pub task_id: String,
47    pub execution_id: String,
48    pub scheduled_time: SystemTime,
49    pub actual_start_time: Option<SystemTime>,
50    pub metadata: HashMap<String, String>,
51}
52
53impl TaskContext {
54    pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
55        Self {
56            task_id,
57            execution_id: Uuid::new_v4().to_string(),
58            scheduled_time,
59            actual_start_time: None,
60            metadata: HashMap::new(),
61        }
62    }
63
64    pub fn with_metadata(mut self, key: String, value: String) -> Self {
65        self.metadata.insert(key, value);
66        self
67    }
68
69    pub fn mark_started(&mut self) {
70        self.actual_start_time = Some(SystemTime::now());
71    }
72
73    pub fn execution_delay(&self) -> Option<Duration> {
74        self.actual_start_time
75            .and_then(|start| start.duration_since(self.scheduled_time).ok())
76    }
77}
78
79/// Task execution function type
80pub type TaskFn = dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = TaskResult> + Send>> + Send + Sync;
81
82/// Schedulable task
83#[derive(Clone)]
84pub struct Task {
85    pub id: String,
86    pub name: String,
87    pub description: Option<String>,
88    pub group: Option<String>,
89    pub priority: i32,
90    pub max_retry: u32,
91    pub timeout: Option<Duration>,
92    pub enabled: bool,
93    pub initial_delay: Option<Duration>,
94    pub execution_delay: Option<Duration>,
95    executor: Arc<TaskFn>,
96    metadata: HashMap<String, String>,
97}
98
99impl Task {
100    pub fn new<F, Fut>(id: impl Into<String>, name: impl Into<String>, executor: F) -> Self
101    where
102        F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
103        Fut: Future<Output = TaskResult> + Send + 'static,
104    {
105        let executor = Arc::new(move |ctx| {
106            let fut = executor(ctx);
107            Box::pin(fut) as Pin<Box<dyn Future<Output = TaskResult> + Send>>
108        });
109
110        Self {
111            id: id.into(),
112            name: name.into(),
113            description: None,
114            group: None,
115            priority: 0,
116            max_retry: 0,
117            timeout: None,
118            enabled: true,
119            initial_delay: None,
120            execution_delay: None,
121            executor,
122            metadata: HashMap::new(),
123        }
124    }
125
126    /// Set initial delay before first execution
127    pub fn with_initial_delay(mut self, delay: Duration) -> Self {
128        self.initial_delay = Some(delay);
129        self
130    }
131
132    /// Set delay before each execution
133    pub fn with_execution_delay(mut self, delay: Duration) -> Self {
134        self.execution_delay = Some(delay);
135        self
136    }
137
138    pub fn with_description(mut self, description: impl Into<String>) -> Self {
139        self.description = Some(description.into());
140        self
141    }
142
143    pub fn with_group(mut self, group: impl Into<String>) -> Self {
144        self.group = Some(group.into());
145        self
146    }
147
148    pub fn with_priority(mut self, priority: i32) -> Self {
149        self.priority = priority;
150        self
151    }
152
153    pub fn with_max_retry(mut self, max_retry: u32) -> Self {
154        self.max_retry = max_retry;
155        self
156    }
157
158    pub fn with_timeout(mut self, timeout: Duration) -> Self {
159        self.timeout = Some(timeout);
160        self
161    }
162
163    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.metadata.insert(key.into(), value.into());
165        self
166    }
167
168    pub fn enabled(mut self, enabled: bool) -> Self {
169        self.enabled = enabled;
170        self
171    }
172
173    pub async fn execute(&self, context: TaskContext) -> TaskResult {
174        if !self.enabled {
175            return TaskResult::Skipped("Task is disabled".to_string());
176        }
177
178        (self.executor)(context).await
179    }
180
181    pub fn get_metadata(&self, key: &str) -> Option<&String> {
182        self.metadata.get(key)
183    }
184}
185
186impl fmt::Debug for Task {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("Task")
189            .field("id", &self.id)
190            .field("name", &self.name)
191            .field("description", &self.description)
192            .field("group", &self.group)
193            .field("priority", &self.priority)
194            .field("max_retry", &self.max_retry)
195            .field("timeout", &self.timeout)
196            .field("enabled", &self.enabled)
197            .field("metadata", &self.metadata)
198            .finish()
199    }
200}
201
202/// Task execution record
203#[derive(Debug, Clone)]
204pub struct TaskExecution {
205    pub execution_id: String,
206    pub task_id: String,
207    pub scheduled_time: SystemTime,
208    pub start_time: Option<SystemTime>,
209    pub end_time: Option<SystemTime>,
210    pub status: TaskStatus,
211    pub result: Option<TaskResult>,
212    pub retry_count: u32,
213    pub error_message: Option<String>,
214}
215
216impl TaskExecution {
217    pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
218        Self {
219            execution_id: Uuid::new_v4().to_string(),
220            task_id,
221            scheduled_time,
222            start_time: None,
223            end_time: None,
224            status: TaskStatus::Pending,
225            result: None,
226            retry_count: 0,
227            error_message: None,
228        }
229    }
230
231    pub fn start(&mut self) {
232        self.start_time = Some(SystemTime::now());
233        self.status = TaskStatus::Running;
234    }
235
236    pub fn complete(&mut self, result: TaskResult) {
237        self.end_time = Some(SystemTime::now());
238        self.status = match &result {
239            TaskResult::Success(_) => TaskStatus::Completed,
240            TaskResult::Failed(msg) => {
241                self.error_message = Some(msg.clone());
242                TaskStatus::Failed
243            }
244            TaskResult::Skipped(_) => TaskStatus::Completed,
245        };
246        self.result = Some(result);
247    }
248
249    pub fn cancel(&mut self) {
250        self.end_time = Some(SystemTime::now());
251        self.status = TaskStatus::Cancelled;
252    }
253
254    pub fn duration(&self) -> Option<Duration> {
255        match (self.start_time, self.end_time) {
256            (Some(start), Some(end)) => end.duration_since(start).ok(),
257            _ => None,
258        }
259    }
260}