Skip to main content

reinhardt_tasks/
registry.rs

1//! Task registry for dynamic task dispatch
2//!
3//! This module provides a registry system to store and retrieve task executors
4//! by name, enabling dynamic task dispatch in distributed task systems.
5
6use crate::{TaskError, TaskExecutor, TaskResult};
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13/// Serialized task data for storage and transmission
14///
15/// # Examples
16///
17/// ```rust
18/// use reinhardt_tasks::SerializedTask;
19///
20/// let task = SerializedTask::new(
21///     "send_email".to_string(),
22///     r#"{"to":"user@example.com","subject":"Hello"}"#.to_string(),
23/// );
24///
25/// assert_eq!(task.name(), "send_email");
26/// ```
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct SerializedTask {
29	name: String,
30	data: String,
31}
32
33impl SerializedTask {
34	/// Create a new serialized task
35	///
36	/// # Examples
37	///
38	/// ```rust
39	/// use reinhardt_tasks::SerializedTask;
40	///
41	/// let task = SerializedTask::new("process_data".to_string(), "{}".to_string());
42	/// ```
43	pub fn new(name: String, data: String) -> Self {
44		Self { name, data }
45	}
46
47	/// Get the task name
48	///
49	/// # Examples
50	///
51	/// ```rust
52	/// use reinhardt_tasks::SerializedTask;
53	///
54	/// let task = SerializedTask::new("task_name".to_string(), "{}".to_string());
55	/// assert_eq!(task.name(), "task_name");
56	/// ```
57	pub fn name(&self) -> &str {
58		&self.name
59	}
60
61	/// Get the task data
62	///
63	/// # Examples
64	///
65	/// ```rust
66	/// use reinhardt_tasks::SerializedTask;
67	///
68	/// let task = SerializedTask::new("task".to_string(), r#"{"key":"value"}"#.to_string());
69	/// assert_eq!(task.data(), r#"{"key":"value"}"#);
70	/// ```
71	pub fn data(&self) -> &str {
72		&self.data
73	}
74
75	/// Convert to JSON string
76	///
77	/// # Examples
78	///
79	/// ```rust
80	/// use reinhardt_tasks::SerializedTask;
81	///
82	/// let task = SerializedTask::new("test".to_string(), "{}".to_string());
83	/// let json = task.to_json().unwrap();
84	/// assert!(json.contains("\"name\":\"test\""));
85	/// ```
86	pub fn to_json(&self) -> Result<String, serde_json::Error> {
87		serde_json::to_string(self)
88	}
89
90	/// Create from JSON string
91	///
92	/// # Examples
93	///
94	/// ```rust
95	/// use reinhardt_tasks::SerializedTask;
96	///
97	/// let json = r#"{"name":"test","data":"{}"}"#;
98	/// let task = SerializedTask::from_json(json).unwrap();
99	/// assert_eq!(task.name(), "test");
100	/// ```
101	pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
102		serde_json::from_str(json)
103	}
104}
105
106/// Task factory trait for creating task executors from serialized data
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use reinhardt_tasks::{TaskFactory, TaskResult, TaskExecutor, Task, TaskError, TaskId};
112/// use async_trait::async_trait;
113///
114/// struct EmailTask { to: String }
115///
116/// impl Task for EmailTask {
117///     fn name(&self) -> &str { "EmailTask" }
118///     fn id(&self) -> TaskId { TaskId::new() }
119/// }
120///
121/// #[async_trait]
122/// impl TaskExecutor for EmailTask {
123///     async fn execute(&self) -> TaskResult<()> { Ok(()) }
124/// }
125///
126/// struct EmailTaskFactory;
127///
128/// #[async_trait]
129/// impl TaskFactory for EmailTaskFactory {
130///     async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
131///         // Deserialize data and create task executor
132///         let email_data: serde_json::Value = serde_json::from_str(data)
133///             .map_err(|e| TaskError::SerializationError(e.to_string()))?;
134///         Ok(Box::new(EmailTask { to: email_data["to"].as_str().unwrap().to_string() }))
135///     }
136/// }
137/// ```
138#[async_trait]
139pub trait TaskFactory: Send + Sync {
140	/// Create a task executor from serialized data
141	async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>>;
142}
143
144/// Global task registry for dynamic task dispatch
145///
146/// This registry maintains a mapping of task names to their factory functions,
147/// allowing workers to dynamically create and execute tasks based on serialized data.
148///
149/// # Examples
150///
151/// ```rust,no_run
152/// use reinhardt_tasks::{TaskRegistry, TaskFactory, TaskResult, TaskExecutor, Task, TaskError, TaskId};
153/// use async_trait::async_trait;
154/// use std::sync::Arc;
155///
156/// # struct EmailTaskFactory;
157/// # struct EmailTask { to: String }
158/// # impl Task for EmailTask {
159/// #     fn name(&self) -> &str { "EmailTask" }
160/// #     fn id(&self) -> TaskId { TaskId::new() }
161/// # }
162/// # #[async_trait]
163/// # impl TaskExecutor for EmailTask {
164/// #     async fn execute(&self) -> TaskResult<()> { Ok(()) }
165/// # }
166/// # #[async_trait]
167/// # impl TaskFactory for EmailTaskFactory {
168/// #     async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
169/// #         let email_data: serde_json::Value = serde_json::from_str(data)
170/// #             .map_err(|e| TaskError::SerializationError(e.to_string()))?;
171/// #         Ok(Box::new(EmailTask { to: email_data["to"].as_str().unwrap().to_string() }))
172/// #     }
173/// # }
174///
175/// # async fn example() -> TaskResult<()> {
176/// let registry = TaskRegistry::new();
177///
178/// // Register a task factory
179/// registry.register("send_email".to_string(), Arc::new(EmailTaskFactory)).await;
180///
181/// // Create task from serialized data
182/// let task_data = r#"{"to":"user@example.com"}"#;
183/// let executor = registry.create("send_email", task_data).await?;
184/// # Ok(())
185/// # }
186/// ```
187pub struct TaskRegistry {
188	factories: Arc<RwLock<HashMap<String, Arc<dyn TaskFactory>>>>,
189}
190
191impl TaskRegistry {
192	/// Create a new task registry
193	///
194	/// # Examples
195	///
196	/// ```rust
197	/// use reinhardt_tasks::TaskRegistry;
198	///
199	/// let registry = TaskRegistry::new();
200	/// ```
201	pub fn new() -> Self {
202		Self {
203			factories: Arc::new(RwLock::new(HashMap::new())),
204		}
205	}
206
207	/// Register a task factory
208	///
209	/// # Examples
210	///
211	/// ```rust,no_run
212	/// use reinhardt_tasks::{TaskRegistry, TaskFactory, Task, TaskId};
213	/// use std::sync::Arc;
214	///
215	/// # struct MyTaskFactory;
216	/// # struct MyTask;
217	/// # impl Task for MyTask {
218	/// #     fn name(&self) -> &str { "MyTask" }
219	/// #     fn id(&self) -> TaskId { TaskId::new() }
220	/// # }
221	/// # #[async_trait::async_trait]
222	/// # impl reinhardt_tasks::TaskExecutor for MyTask {
223	/// #     async fn execute(&self) -> reinhardt_tasks::TaskResult<()> { Ok(()) }
224	/// # }
225	/// # #[async_trait::async_trait]
226	/// # impl TaskFactory for MyTaskFactory {
227	/// #     async fn create(&self, _data: &str) -> reinhardt_tasks::TaskResult<Box<dyn reinhardt_tasks::TaskExecutor>> {
228	/// #         Ok(Box::new(MyTask))
229	/// #     }
230	/// # }
231	///
232	/// # async fn example() {
233	/// let registry = TaskRegistry::new();
234	/// let factory = Arc::new(MyTaskFactory);
235	/// registry.register("my_task".to_string(), factory).await;
236	/// # }
237	/// ```
238	pub async fn register(&self, name: String, factory: Arc<dyn TaskFactory>) {
239		let mut factories = self.factories.write().await;
240		factories.insert(name, factory);
241	}
242
243	/// Unregister a task factory
244	///
245	/// # Examples
246	///
247	/// ```rust,no_run
248	/// use reinhardt_tasks::TaskRegistry;
249	///
250	/// # async fn example() {
251	/// let registry = TaskRegistry::new();
252	/// registry.unregister("task_name").await;
253	/// # }
254	/// ```
255	pub async fn unregister(&self, name: &str) {
256		let mut factories = self.factories.write().await;
257		factories.remove(name);
258	}
259
260	/// Check if a task is registered
261	///
262	/// # Examples
263	///
264	/// ```rust,no_run
265	/// use reinhardt_tasks::TaskRegistry;
266	///
267	/// # async fn example() {
268	/// let registry = TaskRegistry::new();
269	/// let exists = registry.has("task_name").await;
270	/// assert!(!exists);
271	/// # }
272	/// ```
273	pub async fn has(&self, name: &str) -> bool {
274		let factories = self.factories.read().await;
275		factories.contains_key(name)
276	}
277
278	/// Create a task executor from serialized data
279	///
280	/// # Examples
281	///
282	/// ```rust,no_run
283	/// use reinhardt_tasks::TaskRegistry;
284	///
285	/// # async fn example() -> reinhardt_tasks::TaskResult<()> {
286	/// let registry = TaskRegistry::new();
287	/// let executor = registry.create("task_name", r#"{"key":"value"}"#).await?;
288	/// # Ok(())
289	/// # }
290	/// ```
291	pub async fn create(&self, name: &str, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
292		let factories = self.factories.read().await;
293
294		let factory = factories
295			.get(name)
296			.ok_or_else(|| TaskError::ExecutionFailed(format!("Task not registered: {}", name)))?;
297
298		factory.create(data).await
299	}
300
301	/// Get all registered task names
302	///
303	/// # Examples
304	///
305	/// ```rust,no_run
306	/// use reinhardt_tasks::TaskRegistry;
307	///
308	/// # async fn example() {
309	/// let registry = TaskRegistry::new();
310	/// let task_names = registry.list().await;
311	/// println!("Registered tasks: {:?}", task_names);
312	/// # }
313	/// ```
314	pub async fn list(&self) -> Vec<String> {
315		let factories = self.factories.read().await;
316		factories.keys().cloned().collect()
317	}
318
319	/// Clear all registered task factories
320	///
321	/// # Examples
322	///
323	/// ```rust
324	/// use reinhardt_tasks::TaskRegistry;
325	///
326	/// # async fn example() {
327	/// let registry = TaskRegistry::new();
328	/// registry.clear().await;
329	/// # }
330	/// ```
331	pub async fn clear(&self) {
332		let mut factories = self.factories.write().await;
333		factories.clear();
334	}
335}
336
337impl Default for TaskRegistry {
338	fn default() -> Self {
339		Self::new()
340	}
341}
342
343#[cfg(test)]
344mod tests {
345	use super::*;
346	use crate::{Task, TaskId, TaskPriority};
347
348	struct TestTask {
349		id: TaskId,
350	}
351
352	impl Task for TestTask {
353		fn id(&self) -> TaskId {
354			self.id
355		}
356
357		fn name(&self) -> &str {
358			"test_task"
359		}
360
361		fn priority(&self) -> TaskPriority {
362			TaskPriority::default()
363		}
364	}
365
366	#[async_trait]
367	impl TaskExecutor for TestTask {
368		async fn execute(&self) -> TaskResult<()> {
369			Ok(())
370		}
371	}
372
373	struct TestTaskFactory;
374
375	#[async_trait]
376	impl TaskFactory for TestTaskFactory {
377		async fn create(&self, _data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
378			Ok(Box::new(TestTask { id: TaskId::new() }))
379		}
380	}
381
382	#[test]
383	fn test_serialized_task() {
384		let task = SerializedTask::new("test".to_string(), r#"{"key":"value"}"#.to_string());
385		assert_eq!(task.name(), "test");
386		assert_eq!(task.data(), r#"{"key":"value"}"#);
387	}
388
389	#[test]
390	fn test_serialized_task_json() {
391		let task = SerializedTask::new("test".to_string(), "{}".to_string());
392		let json = task.to_json().unwrap();
393		let restored = SerializedTask::from_json(&json).unwrap();
394		assert_eq!(restored.name(), "test");
395	}
396
397	#[tokio::test]
398	async fn test_registry_register_and_has() {
399		let registry = TaskRegistry::new();
400		let factory = Arc::new(TestTaskFactory);
401
402		assert!(!registry.has("test_task").await);
403
404		registry.register("test_task".to_string(), factory).await;
405
406		assert!(registry.has("test_task").await);
407	}
408
409	#[tokio::test]
410	async fn test_registry_unregister() {
411		let registry = TaskRegistry::new();
412		let factory = Arc::new(TestTaskFactory);
413
414		registry.register("test_task".to_string(), factory).await;
415		assert!(registry.has("test_task").await);
416
417		registry.unregister("test_task").await;
418		assert!(!registry.has("test_task").await);
419	}
420
421	#[tokio::test]
422	async fn test_registry_create() {
423		let registry = TaskRegistry::new();
424		let factory = Arc::new(TestTaskFactory);
425
426		registry.register("test_task".to_string(), factory).await;
427
428		let executor = registry.create("test_task", "{}").await;
429		assert!(executor.is_ok());
430	}
431
432	#[tokio::test]
433	async fn test_registry_create_not_found() {
434		let registry = TaskRegistry::new();
435
436		let result = registry.create("nonexistent", "{}").await;
437		assert!(result.is_err());
438	}
439
440	#[tokio::test]
441	async fn test_registry_list() {
442		let registry = TaskRegistry::new();
443		let factory = Arc::new(TestTaskFactory);
444
445		registry
446			.register("task1".to_string(), factory.clone())
447			.await;
448		registry.register("task2".to_string(), factory).await;
449
450		let names = registry.list().await;
451		assert_eq!(names.len(), 2);
452		assert!(names.contains(&"task1".to_string()));
453		assert!(names.contains(&"task2".to_string()));
454	}
455
456	#[tokio::test]
457	async fn test_registry_clear() {
458		let registry = TaskRegistry::new();
459		let factory = Arc::new(TestTaskFactory);
460
461		registry.register("task1".to_string(), factory).await;
462		assert!(registry.has("task1").await);
463
464		registry.clear().await;
465		assert!(!registry.has("task1").await);
466	}
467}