reinhardt_tasks/registry.rs
1//! Task registry for dynamic task dispatch
2//!
3//! This module provides a registry system to store and retrieve task executors
4//! by name, enabling dynamic task dispatch in distributed task systems.
5
6use crate::{TaskError, TaskExecutor, TaskResult};
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13/// Serialized task data for storage and transmission
14///
15/// # Examples
16///
17/// ```rust
18/// use reinhardt_tasks::SerializedTask;
19///
20/// let task = SerializedTask::new(
21/// "send_email".to_string(),
22/// r#"{"to":"user@example.com","subject":"Hello"}"#.to_string(),
23/// );
24///
25/// assert_eq!(task.name(), "send_email");
26/// ```
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct SerializedTask {
29 name: String,
30 data: String,
31}
32
33impl SerializedTask {
34 /// Create a new serialized task
35 ///
36 /// # Examples
37 ///
38 /// ```rust
39 /// use reinhardt_tasks::SerializedTask;
40 ///
41 /// let task = SerializedTask::new("process_data".to_string(), "{}".to_string());
42 /// ```
43 pub fn new(name: String, data: String) -> Self {
44 Self { name, data }
45 }
46
47 /// Get the task name
48 ///
49 /// # Examples
50 ///
51 /// ```rust
52 /// use reinhardt_tasks::SerializedTask;
53 ///
54 /// let task = SerializedTask::new("task_name".to_string(), "{}".to_string());
55 /// assert_eq!(task.name(), "task_name");
56 /// ```
57 pub fn name(&self) -> &str {
58 &self.name
59 }
60
61 /// Get the task data
62 ///
63 /// # Examples
64 ///
65 /// ```rust
66 /// use reinhardt_tasks::SerializedTask;
67 ///
68 /// let task = SerializedTask::new("task".to_string(), r#"{"key":"value"}"#.to_string());
69 /// assert_eq!(task.data(), r#"{"key":"value"}"#);
70 /// ```
71 pub fn data(&self) -> &str {
72 &self.data
73 }
74
75 /// Convert to JSON string
76 ///
77 /// # Examples
78 ///
79 /// ```rust
80 /// use reinhardt_tasks::SerializedTask;
81 ///
82 /// let task = SerializedTask::new("test".to_string(), "{}".to_string());
83 /// let json = task.to_json().unwrap();
84 /// assert!(json.contains("\"name\":\"test\""));
85 /// ```
86 pub fn to_json(&self) -> Result<String, serde_json::Error> {
87 serde_json::to_string(self)
88 }
89
90 /// Create from JSON string
91 ///
92 /// # Examples
93 ///
94 /// ```rust
95 /// use reinhardt_tasks::SerializedTask;
96 ///
97 /// let json = r#"{"name":"test","data":"{}"}"#;
98 /// let task = SerializedTask::from_json(json).unwrap();
99 /// assert_eq!(task.name(), "test");
100 /// ```
101 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
102 serde_json::from_str(json)
103 }
104}
105
106/// Task factory trait for creating task executors from serialized data
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use reinhardt_tasks::{TaskFactory, TaskResult, TaskExecutor, Task, TaskError, TaskId};
112/// use async_trait::async_trait;
113///
114/// struct EmailTask { to: String }
115///
116/// impl Task for EmailTask {
117/// fn name(&self) -> &str { "EmailTask" }
118/// fn id(&self) -> TaskId { TaskId::new() }
119/// }
120///
121/// #[async_trait]
122/// impl TaskExecutor for EmailTask {
123/// async fn execute(&self) -> TaskResult<()> { Ok(()) }
124/// }
125///
126/// struct EmailTaskFactory;
127///
128/// #[async_trait]
129/// impl TaskFactory for EmailTaskFactory {
130/// async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
131/// // Deserialize data and create task executor
132/// let email_data: serde_json::Value = serde_json::from_str(data)
133/// .map_err(|e| TaskError::SerializationError(e.to_string()))?;
134/// Ok(Box::new(EmailTask { to: email_data["to"].as_str().unwrap().to_string() }))
135/// }
136/// }
137/// ```
138#[async_trait]
139pub trait TaskFactory: Send + Sync {
140 /// Create a task executor from serialized data
141 async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>>;
142}
143
144/// Global task registry for dynamic task dispatch
145///
146/// This registry maintains a mapping of task names to their factory functions,
147/// allowing workers to dynamically create and execute tasks based on serialized data.
148///
149/// # Examples
150///
151/// ```rust,no_run
152/// use reinhardt_tasks::{TaskRegistry, TaskFactory, TaskResult, TaskExecutor, Task, TaskError, TaskId};
153/// use async_trait::async_trait;
154/// use std::sync::Arc;
155///
156/// # struct EmailTaskFactory;
157/// # struct EmailTask { to: String }
158/// # impl Task for EmailTask {
159/// # fn name(&self) -> &str { "EmailTask" }
160/// # fn id(&self) -> TaskId { TaskId::new() }
161/// # }
162/// # #[async_trait]
163/// # impl TaskExecutor for EmailTask {
164/// # async fn execute(&self) -> TaskResult<()> { Ok(()) }
165/// # }
166/// # #[async_trait]
167/// # impl TaskFactory for EmailTaskFactory {
168/// # async fn create(&self, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
169/// # let email_data: serde_json::Value = serde_json::from_str(data)
170/// # .map_err(|e| TaskError::SerializationError(e.to_string()))?;
171/// # Ok(Box::new(EmailTask { to: email_data["to"].as_str().unwrap().to_string() }))
172/// # }
173/// # }
174///
175/// # async fn example() -> TaskResult<()> {
176/// let registry = TaskRegistry::new();
177///
178/// // Register a task factory
179/// registry.register("send_email".to_string(), Arc::new(EmailTaskFactory)).await;
180///
181/// // Create task from serialized data
182/// let task_data = r#"{"to":"user@example.com"}"#;
183/// let executor = registry.create("send_email", task_data).await?;
184/// # Ok(())
185/// # }
186/// ```
187pub struct TaskRegistry {
188 factories: Arc<RwLock<HashMap<String, Arc<dyn TaskFactory>>>>,
189}
190
191impl TaskRegistry {
192 /// Create a new task registry
193 ///
194 /// # Examples
195 ///
196 /// ```rust
197 /// use reinhardt_tasks::TaskRegistry;
198 ///
199 /// let registry = TaskRegistry::new();
200 /// ```
201 pub fn new() -> Self {
202 Self {
203 factories: Arc::new(RwLock::new(HashMap::new())),
204 }
205 }
206
207 /// Register a task factory
208 ///
209 /// # Examples
210 ///
211 /// ```rust,no_run
212 /// use reinhardt_tasks::{TaskRegistry, TaskFactory, Task, TaskId};
213 /// use std::sync::Arc;
214 ///
215 /// # struct MyTaskFactory;
216 /// # struct MyTask;
217 /// # impl Task for MyTask {
218 /// # fn name(&self) -> &str { "MyTask" }
219 /// # fn id(&self) -> TaskId { TaskId::new() }
220 /// # }
221 /// # #[async_trait::async_trait]
222 /// # impl reinhardt_tasks::TaskExecutor for MyTask {
223 /// # async fn execute(&self) -> reinhardt_tasks::TaskResult<()> { Ok(()) }
224 /// # }
225 /// # #[async_trait::async_trait]
226 /// # impl TaskFactory for MyTaskFactory {
227 /// # async fn create(&self, _data: &str) -> reinhardt_tasks::TaskResult<Box<dyn reinhardt_tasks::TaskExecutor>> {
228 /// # Ok(Box::new(MyTask))
229 /// # }
230 /// # }
231 ///
232 /// # async fn example() {
233 /// let registry = TaskRegistry::new();
234 /// let factory = Arc::new(MyTaskFactory);
235 /// registry.register("my_task".to_string(), factory).await;
236 /// # }
237 /// ```
238 pub async fn register(&self, name: String, factory: Arc<dyn TaskFactory>) {
239 let mut factories = self.factories.write().await;
240 factories.insert(name, factory);
241 }
242
243 /// Unregister a task factory
244 ///
245 /// # Examples
246 ///
247 /// ```rust,no_run
248 /// use reinhardt_tasks::TaskRegistry;
249 ///
250 /// # async fn example() {
251 /// let registry = TaskRegistry::new();
252 /// registry.unregister("task_name").await;
253 /// # }
254 /// ```
255 pub async fn unregister(&self, name: &str) {
256 let mut factories = self.factories.write().await;
257 factories.remove(name);
258 }
259
260 /// Check if a task is registered
261 ///
262 /// # Examples
263 ///
264 /// ```rust,no_run
265 /// use reinhardt_tasks::TaskRegistry;
266 ///
267 /// # async fn example() {
268 /// let registry = TaskRegistry::new();
269 /// let exists = registry.has("task_name").await;
270 /// assert!(!exists);
271 /// # }
272 /// ```
273 pub async fn has(&self, name: &str) -> bool {
274 let factories = self.factories.read().await;
275 factories.contains_key(name)
276 }
277
278 /// Create a task executor from serialized data
279 ///
280 /// # Examples
281 ///
282 /// ```rust,no_run
283 /// use reinhardt_tasks::TaskRegistry;
284 ///
285 /// # async fn example() -> reinhardt_tasks::TaskResult<()> {
286 /// let registry = TaskRegistry::new();
287 /// let executor = registry.create("task_name", r#"{"key":"value"}"#).await?;
288 /// # Ok(())
289 /// # }
290 /// ```
291 pub async fn create(&self, name: &str, data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
292 let factories = self.factories.read().await;
293
294 let factory = factories
295 .get(name)
296 .ok_or_else(|| TaskError::ExecutionFailed(format!("Task not registered: {}", name)))?;
297
298 factory.create(data).await
299 }
300
301 /// Get all registered task names
302 ///
303 /// # Examples
304 ///
305 /// ```rust,no_run
306 /// use reinhardt_tasks::TaskRegistry;
307 ///
308 /// # async fn example() {
309 /// let registry = TaskRegistry::new();
310 /// let task_names = registry.list().await;
311 /// println!("Registered tasks: {:?}", task_names);
312 /// # }
313 /// ```
314 pub async fn list(&self) -> Vec<String> {
315 let factories = self.factories.read().await;
316 factories.keys().cloned().collect()
317 }
318
319 /// Clear all registered task factories
320 ///
321 /// # Examples
322 ///
323 /// ```rust
324 /// use reinhardt_tasks::TaskRegistry;
325 ///
326 /// # async fn example() {
327 /// let registry = TaskRegistry::new();
328 /// registry.clear().await;
329 /// # }
330 /// ```
331 pub async fn clear(&self) {
332 let mut factories = self.factories.write().await;
333 factories.clear();
334 }
335}
336
337impl Default for TaskRegistry {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::{Task, TaskId, TaskPriority};
347
348 struct TestTask {
349 id: TaskId,
350 }
351
352 impl Task for TestTask {
353 fn id(&self) -> TaskId {
354 self.id
355 }
356
357 fn name(&self) -> &str {
358 "test_task"
359 }
360
361 fn priority(&self) -> TaskPriority {
362 TaskPriority::default()
363 }
364 }
365
366 #[async_trait]
367 impl TaskExecutor for TestTask {
368 async fn execute(&self) -> TaskResult<()> {
369 Ok(())
370 }
371 }
372
373 struct TestTaskFactory;
374
375 #[async_trait]
376 impl TaskFactory for TestTaskFactory {
377 async fn create(&self, _data: &str) -> TaskResult<Box<dyn TaskExecutor>> {
378 Ok(Box::new(TestTask { id: TaskId::new() }))
379 }
380 }
381
382 #[test]
383 fn test_serialized_task() {
384 let task = SerializedTask::new("test".to_string(), r#"{"key":"value"}"#.to_string());
385 assert_eq!(task.name(), "test");
386 assert_eq!(task.data(), r#"{"key":"value"}"#);
387 }
388
389 #[test]
390 fn test_serialized_task_json() {
391 let task = SerializedTask::new("test".to_string(), "{}".to_string());
392 let json = task.to_json().unwrap();
393 let restored = SerializedTask::from_json(&json).unwrap();
394 assert_eq!(restored.name(), "test");
395 }
396
397 #[tokio::test]
398 async fn test_registry_register_and_has() {
399 let registry = TaskRegistry::new();
400 let factory = Arc::new(TestTaskFactory);
401
402 assert!(!registry.has("test_task").await);
403
404 registry.register("test_task".to_string(), factory).await;
405
406 assert!(registry.has("test_task").await);
407 }
408
409 #[tokio::test]
410 async fn test_registry_unregister() {
411 let registry = TaskRegistry::new();
412 let factory = Arc::new(TestTaskFactory);
413
414 registry.register("test_task".to_string(), factory).await;
415 assert!(registry.has("test_task").await);
416
417 registry.unregister("test_task").await;
418 assert!(!registry.has("test_task").await);
419 }
420
421 #[tokio::test]
422 async fn test_registry_create() {
423 let registry = TaskRegistry::new();
424 let factory = Arc::new(TestTaskFactory);
425
426 registry.register("test_task".to_string(), factory).await;
427
428 let executor = registry.create("test_task", "{}").await;
429 assert!(executor.is_ok());
430 }
431
432 #[tokio::test]
433 async fn test_registry_create_not_found() {
434 let registry = TaskRegistry::new();
435
436 let result = registry.create("nonexistent", "{}").await;
437 assert!(result.is_err());
438 }
439
440 #[tokio::test]
441 async fn test_registry_list() {
442 let registry = TaskRegistry::new();
443 let factory = Arc::new(TestTaskFactory);
444
445 registry
446 .register("task1".to_string(), factory.clone())
447 .await;
448 registry.register("task2".to_string(), factory).await;
449
450 let names = registry.list().await;
451 assert_eq!(names.len(), 2);
452 assert!(names.contains(&"task1".to_string()));
453 assert!(names.contains(&"task2".to_string()));
454 }
455
456 #[tokio::test]
457 async fn test_registry_clear() {
458 let registry = TaskRegistry::new();
459 let factory = Arc::new(TestTaskFactory);
460
461 registry.register("task1".to_string(), factory).await;
462 assert!(registry.has("task1").await);
463
464 registry.clear().await;
465 assert!(!registry.has("task1").await);
466 }
467}