reinhardt_tasks/chain.rs
1//! Task chaining
2//!
3//! Allows multiple tasks to be executed in sequence, with each task receiving
4//! the result of the previous task.
5
6use crate::{TaskBackend, TaskExecutionError, TaskId, TaskStatus};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9
10/// Task chain configuration
11///
12/// # Examples
13///
14/// ```rust
15/// use reinhardt_tasks::TaskChain;
16///
17/// let chain = TaskChain::new("email-workflow");
18/// assert_eq!(chain.name(), "email-workflow");
19/// ```
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct TaskChain {
22 /// Chain identifier
23 id: TaskId,
24 /// Chain name
25 name: String,
26 /// Task IDs in execution order
27 task_ids: Vec<TaskId>,
28 /// Current task index
29 current_index: usize,
30 /// Chain status
31 status: ChainStatus,
32}
33
34/// Status of a task chain
35///
36/// # Examples
37///
38/// ```rust
39/// use reinhardt_tasks::ChainStatus;
40///
41/// let status = ChainStatus::Pending;
42/// assert_eq!(status, ChainStatus::Pending);
43/// ```
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45pub enum ChainStatus {
46 /// Chain is waiting to start
47 Pending,
48 /// Chain is currently executing
49 Running,
50 /// Chain completed successfully
51 Completed,
52 /// Chain failed
53 Failed,
54}
55
56impl TaskChain {
57 /// Create a new task chain
58 ///
59 /// # Examples
60 ///
61 /// ```rust
62 /// use reinhardt_tasks::TaskChain;
63 ///
64 /// let chain = TaskChain::new("payment-processing");
65 /// assert_eq!(chain.name(), "payment-processing");
66 /// ```
67 pub fn new(name: impl Into<String>) -> Self {
68 Self {
69 id: TaskId::new(),
70 name: name.into(),
71 task_ids: Vec::new(),
72 current_index: 0,
73 status: ChainStatus::Pending,
74 }
75 }
76
77 /// Get the chain ID
78 ///
79 /// # Examples
80 ///
81 /// ```rust
82 /// use reinhardt_tasks::TaskChain;
83 ///
84 /// let chain = TaskChain::new("test-chain");
85 /// let id = chain.id();
86 /// ```
87 pub fn id(&self) -> TaskId {
88 self.id
89 }
90
91 /// Get the chain name
92 ///
93 /// # Examples
94 ///
95 /// ```rust
96 /// use reinhardt_tasks::TaskChain;
97 ///
98 /// let chain = TaskChain::new("my-chain");
99 /// assert_eq!(chain.name(), "my-chain");
100 /// ```
101 pub fn name(&self) -> &str {
102 &self.name
103 }
104
105 /// Add a task to the chain
106 ///
107 /// # Examples
108 ///
109 /// ```rust
110 /// use reinhardt_tasks::{TaskChain, TaskId};
111 ///
112 /// let mut chain = TaskChain::new("workflow");
113 /// let task_id = TaskId::new();
114 /// chain.add_task(task_id);
115 /// assert_eq!(chain.task_count(), 1);
116 /// ```
117 pub fn add_task(&mut self, task_id: TaskId) {
118 self.task_ids.push(task_id);
119 }
120
121 /// Get the number of tasks in the chain
122 ///
123 /// # Examples
124 ///
125 /// ```rust
126 /// use reinhardt_tasks::{TaskChain, TaskId};
127 ///
128 /// let mut chain = TaskChain::new("workflow");
129 /// chain.add_task(TaskId::new());
130 /// chain.add_task(TaskId::new());
131 /// assert_eq!(chain.task_count(), 2);
132 /// ```
133 pub fn task_count(&self) -> usize {
134 self.task_ids.len()
135 }
136
137 /// Get the current task ID
138 ///
139 /// # Examples
140 ///
141 /// ```rust
142 /// use reinhardt_tasks::{TaskChain, TaskId};
143 ///
144 /// let mut chain = TaskChain::new("workflow");
145 /// let task_id = TaskId::new();
146 /// chain.add_task(task_id);
147 ///
148 /// assert_eq!(chain.current_task(), Some(task_id));
149 /// ```
150 pub fn current_task(&self) -> Option<TaskId> {
151 self.task_ids.get(self.current_index).copied()
152 }
153
154 /// Move to the next task in the chain
155 ///
156 /// Returns `true` if there are more tasks, `false` if the chain is complete.
157 ///
158 /// # Examples
159 ///
160 /// ```rust
161 /// use reinhardt_tasks::{TaskChain, TaskId};
162 ///
163 /// let mut chain = TaskChain::new("workflow");
164 /// chain.add_task(TaskId::new());
165 /// chain.add_task(TaskId::new());
166 ///
167 /// assert!(chain.advance());
168 /// assert!(!chain.advance()); // No more tasks
169 /// ```
170 pub fn advance(&mut self) -> bool {
171 self.current_index += 1;
172 self.current_index < self.task_ids.len()
173 }
174
175 /// Get the chain status
176 ///
177 /// # Examples
178 ///
179 /// ```rust
180 /// use reinhardt_tasks::{TaskChain, ChainStatus};
181 ///
182 /// let chain = TaskChain::new("workflow");
183 /// assert_eq!(chain.status(), ChainStatus::Pending);
184 /// ```
185 pub fn status(&self) -> ChainStatus {
186 self.status
187 }
188
189 /// Set the chain status
190 ///
191 /// # Examples
192 ///
193 /// ```rust
194 /// use reinhardt_tasks::{TaskChain, ChainStatus};
195 ///
196 /// let mut chain = TaskChain::new("workflow");
197 /// chain.set_status(ChainStatus::Running);
198 /// assert_eq!(chain.status(), ChainStatus::Running);
199 /// ```
200 pub fn set_status(&mut self, status: ChainStatus) {
201 self.status = status;
202 }
203
204 /// Check if the chain is complete
205 ///
206 /// # Examples
207 ///
208 /// ```rust
209 /// use reinhardt_tasks::{TaskChain, ChainStatus};
210 ///
211 /// let mut chain = TaskChain::new("workflow");
212 /// chain.set_status(ChainStatus::Completed);
213 /// assert!(chain.is_complete());
214 /// ```
215 pub fn is_complete(&self) -> bool {
216 matches!(self.status, ChainStatus::Completed | ChainStatus::Failed)
217 }
218
219 /// Execute the chain
220 ///
221 /// This method will execute all tasks in the chain sequentially.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use reinhardt_tasks::{TaskChain, DummyBackend};
227 /// use std::sync::Arc;
228 ///
229 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
230 /// let mut chain = TaskChain::new("workflow");
231 /// let backend = Arc::new(DummyBackend::new());
232 ///
233 /// chain.execute(backend).await?;
234 /// assert!(chain.is_complete());
235 /// # Ok(())
236 /// # }
237 /// # tokio::runtime::Runtime::new().unwrap().block_on(example()).unwrap();
238 /// ```
239 pub async fn execute(
240 &mut self,
241 backend: Arc<dyn TaskBackend>,
242 ) -> Result<(), TaskExecutionError> {
243 self.set_status(ChainStatus::Running);
244
245 while let Some(task_id) = self.current_task() {
246 // Check task status
247 let status = backend.get_status(task_id).await?;
248
249 match status {
250 TaskStatus::Success => {
251 // Task completed successfully, move to next
252 if !self.advance() {
253 // All tasks completed
254 self.set_status(ChainStatus::Completed);
255 return Ok(());
256 }
257 }
258 TaskStatus::Failure => {
259 // Task failed, mark chain as failed
260 self.set_status(ChainStatus::Failed);
261 return Err(TaskExecutionError::ExecutionFailed(format!(
262 "Task {} in chain {} failed",
263 task_id, self.name
264 )));
265 }
266 TaskStatus::Pending | TaskStatus::Running | TaskStatus::Retry => {
267 // Task still in progress, return and wait for next check
268 return Ok(());
269 }
270 }
271 }
272
273 self.set_status(ChainStatus::Completed);
274 Ok(())
275 }
276}
277
278/// Task chain builder
279///
280/// Provides a fluent interface for building task chains.
281///
282/// # Examples
283///
284/// ```rust
285/// use reinhardt_tasks::{TaskChainBuilder, TaskId};
286///
287/// let chain = TaskChainBuilder::new("payment-flow")
288/// .add_task(TaskId::new())
289/// .add_task(TaskId::new())
290/// .build();
291///
292/// assert_eq!(chain.task_count(), 2);
293/// ```
294pub struct TaskChainBuilder {
295 chain: TaskChain,
296}
297
298impl TaskChainBuilder {
299 /// Create a new task chain builder
300 ///
301 /// # Examples
302 ///
303 /// ```rust
304 /// use reinhardt_tasks::TaskChainBuilder;
305 ///
306 /// let builder = TaskChainBuilder::new("my-workflow");
307 /// ```
308 pub fn new(name: impl Into<String>) -> Self {
309 Self {
310 chain: TaskChain::new(name),
311 }
312 }
313
314 /// Add a task to the chain
315 ///
316 /// # Examples
317 ///
318 /// ```rust
319 /// use reinhardt_tasks::{TaskChainBuilder, TaskId};
320 ///
321 /// let builder = TaskChainBuilder::new("workflow")
322 /// .add_task(TaskId::new());
323 /// ```
324 pub fn add_task(mut self, task_id: TaskId) -> Self {
325 self.chain.add_task(task_id);
326 self
327 }
328
329 /// Add multiple tasks to the chain
330 ///
331 /// # Examples
332 ///
333 /// ```rust
334 /// use reinhardt_tasks::{TaskChainBuilder, TaskId};
335 ///
336 /// let tasks = vec![TaskId::new(), TaskId::new(), TaskId::new()];
337 /// let chain = TaskChainBuilder::new("batch")
338 /// .add_tasks(tasks)
339 /// .build();
340 ///
341 /// assert_eq!(chain.task_count(), 3);
342 /// ```
343 pub fn add_tasks(mut self, task_ids: Vec<TaskId>) -> Self {
344 for task_id in task_ids {
345 self.chain.add_task(task_id);
346 }
347 self
348 }
349
350 /// Build the task chain
351 ///
352 /// # Examples
353 ///
354 /// ```rust
355 /// use reinhardt_tasks::{TaskChainBuilder, TaskId};
356 ///
357 /// let chain = TaskChainBuilder::new("workflow")
358 /// .add_task(TaskId::new())
359 /// .build();
360 /// ```
361 pub fn build(self) -> TaskChain {
362 self.chain
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::{DummyBackend, Task, TaskPriority};
370
371 struct TestTask {
372 id: TaskId,
373 }
374
375 impl Task for TestTask {
376 fn id(&self) -> TaskId {
377 self.id
378 }
379
380 fn name(&self) -> &str {
381 "test"
382 }
383
384 fn priority(&self) -> TaskPriority {
385 TaskPriority::default()
386 }
387 }
388
389 #[test]
390 fn test_chain_creation() {
391 let chain = TaskChain::new("test-chain");
392 assert_eq!(chain.name(), "test-chain");
393 assert_eq!(chain.task_count(), 0);
394 assert_eq!(chain.status(), ChainStatus::Pending);
395 }
396
397 #[test]
398 fn test_chain_add_task() {
399 let mut chain = TaskChain::new("test");
400 let task_id = TaskId::new();
401 chain.add_task(task_id);
402 assert_eq!(chain.task_count(), 1);
403 assert_eq!(chain.current_task(), Some(task_id));
404 }
405
406 #[test]
407 fn test_chain_advance() {
408 let mut chain = TaskChain::new("test");
409 chain.add_task(TaskId::new());
410 chain.add_task(TaskId::new());
411
412 assert!(chain.advance());
413 assert!(!chain.advance());
414 }
415
416 #[test]
417 fn test_chain_builder() {
418 let task1 = TaskId::new();
419 let task2 = TaskId::new();
420
421 let chain = TaskChainBuilder::new("builder-test")
422 .add_task(task1)
423 .add_task(task2)
424 .build();
425
426 assert_eq!(chain.task_count(), 2);
427 assert_eq!(chain.name(), "builder-test");
428 }
429
430 #[test]
431 fn test_chain_builder_multiple() {
432 let tasks = vec![TaskId::new(), TaskId::new(), TaskId::new()];
433 let chain = TaskChainBuilder::new("batch").add_tasks(tasks).build();
434
435 assert_eq!(chain.task_count(), 3);
436 }
437
438 #[test]
439 fn test_chain_status() {
440 let mut chain = TaskChain::new("test");
441 assert_eq!(chain.status(), ChainStatus::Pending);
442
443 chain.set_status(ChainStatus::Running);
444 assert_eq!(chain.status(), ChainStatus::Running);
445
446 chain.set_status(ChainStatus::Completed);
447 assert!(chain.is_complete());
448 }
449
450 #[tokio::test]
451 async fn test_chain_execution() {
452 let backend = Arc::new(DummyBackend::new());
453 let mut chain = TaskChain::new("test-execution");
454
455 let task1 = Box::new(TestTask { id: TaskId::new() });
456 let task2 = Box::new(TestTask { id: TaskId::new() });
457
458 let id1 = backend.enqueue(task1).await.unwrap();
459 let id2 = backend.enqueue(task2).await.unwrap();
460
461 chain.add_task(id1);
462 chain.add_task(id2);
463
464 // DummyBackend always returns Success, so chain should complete
465 chain.execute(backend).await.unwrap();
466 assert_eq!(chain.status(), ChainStatus::Completed);
467 }
468}