rocketmq_rust/schedule/
task.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::fmt;
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::Duration;
24use std::time::SystemTime;
25
26use uuid::Uuid;
27
28/// Task execution result
29#[derive(Debug, Clone)]
30pub enum TaskResult {
31    Success(Option<String>),
32    Failed(String),
33    Skipped(String),
34}
35
36/// Task execution status
37#[derive(Debug, Clone, PartialEq)]
38pub enum TaskStatus {
39    Pending,
40    Running,
41    Completed,
42    Failed,
43    Cancelled,
44}
45
46/// Task execution context
47#[derive(Debug, Clone)]
48pub struct TaskContext {
49    pub task_id: String,
50    pub execution_id: String,
51    pub scheduled_time: SystemTime,
52    pub actual_start_time: Option<SystemTime>,
53    pub metadata: HashMap<String, String>,
54}
55
56impl TaskContext {
57    pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
58        Self {
59            task_id: task_id.clone(),
60            execution_id: Uuid::new_v4().to_string(),
61            scheduled_time,
62            actual_start_time: None,
63            metadata: HashMap::new(),
64        }
65    }
66
67    pub fn with_metadata(mut self, key: String, value: String) -> Self {
68        self.metadata.insert(key, value);
69        self
70    }
71
72    pub fn mark_started(&mut self) {
73        self.actual_start_time = Some(SystemTime::now());
74    }
75
76    pub fn execution_delay(&self) -> Option<Duration> {
77        self.actual_start_time
78            .and_then(|start| start.duration_since(self.scheduled_time).ok())
79    }
80}
81
82/// Task execution function type
83pub type TaskFn =
84    dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = TaskResult> + Send>> + Send + Sync;
85
86/// Schedulable task
87#[derive(Clone)]
88pub struct Task {
89    pub id: String,
90    pub name: String,
91    pub description: Option<String>,
92    pub group: Option<String>,
93    pub priority: i32,
94    pub max_retry: u32,
95    pub timeout: Option<Duration>,
96    pub enabled: bool,
97    pub initial_delay: Option<Duration>,
98    pub execution_delay: Option<Duration>,
99    executor: Arc<TaskFn>,
100    metadata: HashMap<String, String>,
101}
102
103impl Task {
104    pub fn new<F, Fut>(id: impl Into<String>, name: impl Into<String>, executor: F) -> Self
105    where
106        F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
107        Fut: Future<Output = TaskResult> + Send + 'static,
108    {
109        let executor = Arc::new(move |ctx| {
110            let fut = executor(ctx);
111            Box::pin(fut) as Pin<Box<dyn Future<Output = TaskResult> + Send>>
112        });
113
114        Self {
115            id: id.into(),
116            name: name.into(),
117            description: None,
118            group: None,
119            priority: 0,
120            max_retry: 0,
121            timeout: None,
122            enabled: true,
123            initial_delay: None,
124            execution_delay: None,
125            executor,
126            metadata: HashMap::new(),
127        }
128    }
129
130    /// Set initial delay before first execution
131    pub fn with_initial_delay(mut self, delay: Duration) -> Self {
132        self.initial_delay = Some(delay);
133        self
134    }
135
136    /// Set delay before each execution
137    pub fn with_execution_delay(mut self, delay: Duration) -> Self {
138        self.execution_delay = Some(delay);
139        self
140    }
141
142    pub fn with_description(mut self, description: impl Into<String>) -> Self {
143        self.description = Some(description.into());
144        self
145    }
146
147    pub fn with_group(mut self, group: impl Into<String>) -> Self {
148        self.group = Some(group.into());
149        self
150    }
151
152    pub fn with_priority(mut self, priority: i32) -> Self {
153        self.priority = priority;
154        self
155    }
156
157    pub fn with_max_retry(mut self, max_retry: u32) -> Self {
158        self.max_retry = max_retry;
159        self
160    }
161
162    pub fn with_timeout(mut self, timeout: Duration) -> Self {
163        self.timeout = Some(timeout);
164        self
165    }
166
167    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
168        self.metadata.insert(key.into(), value.into());
169        self
170    }
171
172    pub fn enabled(mut self, enabled: bool) -> Self {
173        self.enabled = enabled;
174        self
175    }
176
177    pub async fn execute(&self, context: TaskContext) -> TaskResult {
178        if !self.enabled {
179            return TaskResult::Skipped("Task is disabled".to_string());
180        }
181
182        (self.executor)(context).await
183    }
184
185    pub fn get_metadata(&self, key: &str) -> Option<&String> {
186        self.metadata.get(key)
187    }
188}
189
190impl fmt::Debug for Task {
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        f.debug_struct("Task")
193            .field("id", &self.id)
194            .field("name", &self.name)
195            .field("description", &self.description)
196            .field("group", &self.group)
197            .field("priority", &self.priority)
198            .field("max_retry", &self.max_retry)
199            .field("timeout", &self.timeout)
200            .field("enabled", &self.enabled)
201            .field("metadata", &self.metadata)
202            .finish()
203    }
204}
205
206/// Task execution record
207#[derive(Debug, Clone)]
208pub struct TaskExecution {
209    pub execution_id: String,
210    pub task_id: String,
211    pub scheduled_time: SystemTime,
212    pub start_time: Option<SystemTime>,
213    pub end_time: Option<SystemTime>,
214    pub status: TaskStatus,
215    pub result: Option<TaskResult>,
216    pub retry_count: u32,
217    pub error_message: Option<String>,
218}
219
220impl TaskExecution {
221    pub fn new(task_id: String, scheduled_time: SystemTime) -> Self {
222        Self {
223            execution_id: Uuid::new_v4().to_string(),
224            task_id,
225            scheduled_time,
226            start_time: None,
227            end_time: None,
228            status: TaskStatus::Pending,
229            result: None,
230            retry_count: 0,
231            error_message: None,
232        }
233    }
234
235    pub fn start(&mut self) {
236        self.start_time = Some(SystemTime::now());
237        self.status = TaskStatus::Running;
238    }
239
240    pub fn complete(&mut self, result: TaskResult) {
241        self.end_time = Some(SystemTime::now());
242        self.status = match &result {
243            TaskResult::Success(_) => TaskStatus::Completed,
244            TaskResult::Failed(msg) => {
245                self.error_message = Some(msg.clone());
246                TaskStatus::Failed
247            }
248            TaskResult::Skipped(_) => TaskStatus::Completed,
249        };
250        self.result = Some(result);
251    }
252
253    pub fn cancel(&mut self) {
254        self.end_time = Some(SystemTime::now());
255        self.status = TaskStatus::Cancelled;
256    }
257
258    pub fn duration(&self) -> Option<Duration> {
259        match (self.start_time, self.end_time) {
260            (Some(start), Some(end)) => end.duration_since(start).ok(),
261            _ => None,
262        }
263    }
264}