Skip to main content

reinhardt_tasks/
chain.rs

1//! Task chaining
2//!
3//! Allows multiple tasks to be executed in sequence, with each task receiving
4//! the result of the previous task.
5
6use crate::{TaskBackend, TaskExecutionError, TaskId, TaskStatus};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9
10/// Task chain configuration
11///
12/// # Examples
13///
14/// ```rust
15/// use reinhardt_tasks::TaskChain;
16///
17/// let chain = TaskChain::new("email-workflow");
18/// assert_eq!(chain.name(), "email-workflow");
19/// ```
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct TaskChain {
22	/// Chain identifier
23	id: TaskId,
24	/// Chain name
25	name: String,
26	/// Task IDs in execution order
27	task_ids: Vec<TaskId>,
28	/// Current task index
29	current_index: usize,
30	/// Chain status
31	status: ChainStatus,
32}
33
34/// Status of a task chain
35///
36/// # Examples
37///
38/// ```rust
39/// use reinhardt_tasks::ChainStatus;
40///
41/// let status = ChainStatus::Pending;
42/// assert_eq!(status, ChainStatus::Pending);
43/// ```
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45pub enum ChainStatus {
46	/// Chain is waiting to start
47	Pending,
48	/// Chain is currently executing
49	Running,
50	/// Chain completed successfully
51	Completed,
52	/// Chain failed
53	Failed,
54}
55
56impl TaskChain {
57	/// Create a new task chain
58	///
59	/// # Examples
60	///
61	/// ```rust
62	/// use reinhardt_tasks::TaskChain;
63	///
64	/// let chain = TaskChain::new("payment-processing");
65	/// assert_eq!(chain.name(), "payment-processing");
66	/// ```
67	pub fn new(name: impl Into<String>) -> Self {
68		Self {
69			id: TaskId::new(),
70			name: name.into(),
71			task_ids: Vec::new(),
72			current_index: 0,
73			status: ChainStatus::Pending,
74		}
75	}
76
77	/// Get the chain ID
78	///
79	/// # Examples
80	///
81	/// ```rust
82	/// use reinhardt_tasks::TaskChain;
83	///
84	/// let chain = TaskChain::new("test-chain");
85	/// let id = chain.id();
86	/// ```
87	pub fn id(&self) -> TaskId {
88		self.id
89	}
90
91	/// Get the chain name
92	///
93	/// # Examples
94	///
95	/// ```rust
96	/// use reinhardt_tasks::TaskChain;
97	///
98	/// let chain = TaskChain::new("my-chain");
99	/// assert_eq!(chain.name(), "my-chain");
100	/// ```
101	pub fn name(&self) -> &str {
102		&self.name
103	}
104
105	/// Add a task to the chain
106	///
107	/// # Examples
108	///
109	/// ```rust
110	/// use reinhardt_tasks::{TaskChain, TaskId};
111	///
112	/// let mut chain = TaskChain::new("workflow");
113	/// let task_id = TaskId::new();
114	/// chain.add_task(task_id);
115	/// assert_eq!(chain.task_count(), 1);
116	/// ```
117	pub fn add_task(&mut self, task_id: TaskId) {
118		self.task_ids.push(task_id);
119	}
120
121	/// Get the number of tasks in the chain
122	///
123	/// # Examples
124	///
125	/// ```rust
126	/// use reinhardt_tasks::{TaskChain, TaskId};
127	///
128	/// let mut chain = TaskChain::new("workflow");
129	/// chain.add_task(TaskId::new());
130	/// chain.add_task(TaskId::new());
131	/// assert_eq!(chain.task_count(), 2);
132	/// ```
133	pub fn task_count(&self) -> usize {
134		self.task_ids.len()
135	}
136
137	/// Get the current task ID
138	///
139	/// # Examples
140	///
141	/// ```rust
142	/// use reinhardt_tasks::{TaskChain, TaskId};
143	///
144	/// let mut chain = TaskChain::new("workflow");
145	/// let task_id = TaskId::new();
146	/// chain.add_task(task_id);
147	///
148	/// assert_eq!(chain.current_task(), Some(task_id));
149	/// ```
150	pub fn current_task(&self) -> Option<TaskId> {
151		self.task_ids.get(self.current_index).copied()
152	}
153
154	/// Move to the next task in the chain
155	///
156	/// Returns `true` if there are more tasks, `false` if the chain is complete.
157	///
158	/// # Examples
159	///
160	/// ```rust
161	/// use reinhardt_tasks::{TaskChain, TaskId};
162	///
163	/// let mut chain = TaskChain::new("workflow");
164	/// chain.add_task(TaskId::new());
165	/// chain.add_task(TaskId::new());
166	///
167	/// assert!(chain.advance());
168	/// assert!(!chain.advance()); // No more tasks
169	/// ```
170	pub fn advance(&mut self) -> bool {
171		self.current_index += 1;
172		self.current_index < self.task_ids.len()
173	}
174
175	/// Get the chain status
176	///
177	/// # Examples
178	///
179	/// ```rust
180	/// use reinhardt_tasks::{TaskChain, ChainStatus};
181	///
182	/// let chain = TaskChain::new("workflow");
183	/// assert_eq!(chain.status(), ChainStatus::Pending);
184	/// ```
185	pub fn status(&self) -> ChainStatus {
186		self.status
187	}
188
189	/// Set the chain status
190	///
191	/// # Examples
192	///
193	/// ```rust
194	/// use reinhardt_tasks::{TaskChain, ChainStatus};
195	///
196	/// let mut chain = TaskChain::new("workflow");
197	/// chain.set_status(ChainStatus::Running);
198	/// assert_eq!(chain.status(), ChainStatus::Running);
199	/// ```
200	pub fn set_status(&mut self, status: ChainStatus) {
201		self.status = status;
202	}
203
204	/// Check if the chain is complete
205	///
206	/// # Examples
207	///
208	/// ```rust
209	/// use reinhardt_tasks::{TaskChain, ChainStatus};
210	///
211	/// let mut chain = TaskChain::new("workflow");
212	/// chain.set_status(ChainStatus::Completed);
213	/// assert!(chain.is_complete());
214	/// ```
215	pub fn is_complete(&self) -> bool {
216		matches!(self.status, ChainStatus::Completed | ChainStatus::Failed)
217	}
218
219	/// Execute the chain
220	///
221	/// This method will execute all tasks in the chain sequentially.
222	///
223	/// # Examples
224	///
225	/// ```
226	/// use reinhardt_tasks::{TaskChain, DummyBackend};
227	/// use std::sync::Arc;
228	///
229	/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
230	/// let mut chain = TaskChain::new("workflow");
231	/// let backend = Arc::new(DummyBackend::new());
232	///
233	/// chain.execute(backend).await?;
234	/// assert!(chain.is_complete());
235	/// # Ok(())
236	/// # }
237	/// # tokio::runtime::Runtime::new().unwrap().block_on(example()).unwrap();
238	/// ```
239	pub async fn execute(
240		&mut self,
241		backend: Arc<dyn TaskBackend>,
242	) -> Result<(), TaskExecutionError> {
243		self.set_status(ChainStatus::Running);
244
245		while let Some(task_id) = self.current_task() {
246			// Check task status
247			let status = backend.get_status(task_id).await?;
248
249			match status {
250				TaskStatus::Success => {
251					// Task completed successfully, move to next
252					if !self.advance() {
253						// All tasks completed
254						self.set_status(ChainStatus::Completed);
255						return Ok(());
256					}
257				}
258				TaskStatus::Failure => {
259					// Task failed, mark chain as failed
260					self.set_status(ChainStatus::Failed);
261					return Err(TaskExecutionError::ExecutionFailed(format!(
262						"Task {} in chain {} failed",
263						task_id, self.name
264					)));
265				}
266				TaskStatus::Pending | TaskStatus::Running | TaskStatus::Retry => {
267					// Task still in progress, return and wait for next check
268					return Ok(());
269				}
270			}
271		}
272
273		self.set_status(ChainStatus::Completed);
274		Ok(())
275	}
276}
277
278/// Task chain builder
279///
280/// Provides a fluent interface for building task chains.
281///
282/// # Examples
283///
284/// ```rust
285/// use reinhardt_tasks::{TaskChainBuilder, TaskId};
286///
287/// let chain = TaskChainBuilder::new("payment-flow")
288///     .add_task(TaskId::new())
289///     .add_task(TaskId::new())
290///     .build();
291///
292/// assert_eq!(chain.task_count(), 2);
293/// ```
294pub struct TaskChainBuilder {
295	chain: TaskChain,
296}
297
298impl TaskChainBuilder {
299	/// Create a new task chain builder
300	///
301	/// # Examples
302	///
303	/// ```rust
304	/// use reinhardt_tasks::TaskChainBuilder;
305	///
306	/// let builder = TaskChainBuilder::new("my-workflow");
307	/// ```
308	pub fn new(name: impl Into<String>) -> Self {
309		Self {
310			chain: TaskChain::new(name),
311		}
312	}
313
314	/// Add a task to the chain
315	///
316	/// # Examples
317	///
318	/// ```rust
319	/// use reinhardt_tasks::{TaskChainBuilder, TaskId};
320	///
321	/// let builder = TaskChainBuilder::new("workflow")
322	///     .add_task(TaskId::new());
323	/// ```
324	pub fn add_task(mut self, task_id: TaskId) -> Self {
325		self.chain.add_task(task_id);
326		self
327	}
328
329	/// Add multiple tasks to the chain
330	///
331	/// # Examples
332	///
333	/// ```rust
334	/// use reinhardt_tasks::{TaskChainBuilder, TaskId};
335	///
336	/// let tasks = vec![TaskId::new(), TaskId::new(), TaskId::new()];
337	/// let chain = TaskChainBuilder::new("batch")
338	///     .add_tasks(tasks)
339	///     .build();
340	///
341	/// assert_eq!(chain.task_count(), 3);
342	/// ```
343	pub fn add_tasks(mut self, task_ids: Vec<TaskId>) -> Self {
344		for task_id in task_ids {
345			self.chain.add_task(task_id);
346		}
347		self
348	}
349
350	/// Build the task chain
351	///
352	/// # Examples
353	///
354	/// ```rust
355	/// use reinhardt_tasks::{TaskChainBuilder, TaskId};
356	///
357	/// let chain = TaskChainBuilder::new("workflow")
358	///     .add_task(TaskId::new())
359	///     .build();
360	/// ```
361	pub fn build(self) -> TaskChain {
362		self.chain
363	}
364}
365
366#[cfg(test)]
367mod tests {
368	use super::*;
369	use crate::{DummyBackend, Task, TaskPriority};
370
371	struct TestTask {
372		id: TaskId,
373	}
374
375	impl Task for TestTask {
376		fn id(&self) -> TaskId {
377			self.id
378		}
379
380		fn name(&self) -> &str {
381			"test"
382		}
383
384		fn priority(&self) -> TaskPriority {
385			TaskPriority::default()
386		}
387	}
388
389	#[test]
390	fn test_chain_creation() {
391		let chain = TaskChain::new("test-chain");
392		assert_eq!(chain.name(), "test-chain");
393		assert_eq!(chain.task_count(), 0);
394		assert_eq!(chain.status(), ChainStatus::Pending);
395	}
396
397	#[test]
398	fn test_chain_add_task() {
399		let mut chain = TaskChain::new("test");
400		let task_id = TaskId::new();
401		chain.add_task(task_id);
402		assert_eq!(chain.task_count(), 1);
403		assert_eq!(chain.current_task(), Some(task_id));
404	}
405
406	#[test]
407	fn test_chain_advance() {
408		let mut chain = TaskChain::new("test");
409		chain.add_task(TaskId::new());
410		chain.add_task(TaskId::new());
411
412		assert!(chain.advance());
413		assert!(!chain.advance());
414	}
415
416	#[test]
417	fn test_chain_builder() {
418		let task1 = TaskId::new();
419		let task2 = TaskId::new();
420
421		let chain = TaskChainBuilder::new("builder-test")
422			.add_task(task1)
423			.add_task(task2)
424			.build();
425
426		assert_eq!(chain.task_count(), 2);
427		assert_eq!(chain.name(), "builder-test");
428	}
429
430	#[test]
431	fn test_chain_builder_multiple() {
432		let tasks = vec![TaskId::new(), TaskId::new(), TaskId::new()];
433		let chain = TaskChainBuilder::new("batch").add_tasks(tasks).build();
434
435		assert_eq!(chain.task_count(), 3);
436	}
437
438	#[test]
439	fn test_chain_status() {
440		let mut chain = TaskChain::new("test");
441		assert_eq!(chain.status(), ChainStatus::Pending);
442
443		chain.set_status(ChainStatus::Running);
444		assert_eq!(chain.status(), ChainStatus::Running);
445
446		chain.set_status(ChainStatus::Completed);
447		assert!(chain.is_complete());
448	}
449
450	#[tokio::test]
451	async fn test_chain_execution() {
452		let backend = Arc::new(DummyBackend::new());
453		let mut chain = TaskChain::new("test-execution");
454
455		let task1 = Box::new(TestTask { id: TaskId::new() });
456		let task2 = Box::new(TestTask { id: TaskId::new() });
457
458		let id1 = backend.enqueue(task1).await.unwrap();
459		let id2 = backend.enqueue(task2).await.unwrap();
460
461		chain.add_task(id1);
462		chain.add_task(id2);
463
464		// DummyBackend always returns Success, so chain should complete
465		chain.execute(backend).await.unwrap();
466		assert_eq!(chain.status(), ChainStatus::Completed);
467	}
468}