forge_runtime/jobs/
registry.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use forge_core::Result;
7use forge_core::job::{ForgeJob, JobContext, JobInfo};
8use serde_json::Value;
9
10fn normalize_args(args: Value) -> Value {
14 let unwrapped = match &args {
15 Value::Object(map) if map.len() == 1 => {
16 if map.contains_key("args") {
17 map.get("args").cloned().unwrap_or(Value::Null)
18 } else if map.contains_key("input") {
19 map.get("input").cloned().unwrap_or(Value::Null)
20 } else {
21 args
22 }
23 }
24 _ => args,
25 };
26
27 match &unwrapped {
28 Value::Null => Value::Object(serde_json::Map::new()),
29 _ => unwrapped,
30 }
31}
32
33pub type BoxedJobHandler = Arc<
35 dyn Fn(&JobContext, Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>
36 + Send
37 + Sync,
38>;
39
40pub type BoxedJobCompensation = Arc<
41 dyn for<'a> Fn(
42 &'a JobContext,
43 Value,
44 &'a str,
45 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>
46 + Send
47 + Sync,
48>;
49
50pub struct JobEntry {
52 pub info: JobInfo,
54 pub handler: BoxedJobHandler,
56 pub compensation: BoxedJobCompensation,
58}
59
60#[derive(Clone, Default)]
62pub struct JobRegistry {
63 jobs: HashMap<String, Arc<JobEntry>>,
64}
65
66impl JobRegistry {
67 pub fn new() -> Self {
69 Self {
70 jobs: HashMap::new(),
71 }
72 }
73
74 pub fn register<J: ForgeJob>(&mut self)
76 where
77 J::Args: serde::de::DeserializeOwned + Send + 'static,
78 J::Output: serde::Serialize + Send + 'static,
79 {
80 let info = J::info();
81 let name = info.name.to_string();
82
83 let handler: BoxedJobHandler = Arc::new(move |ctx, args| {
84 Box::pin(async move {
85 let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
86 .map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
87 let result = J::execute(ctx, parsed_args).await?;
88 serde_json::to_value(result)
89 .map_err(|e| forge_core::ForgeError::Internal(e.to_string()))
90 })
91 });
92
93 let compensation: BoxedJobCompensation = Arc::new(move |ctx, args, reason| {
94 Box::pin(async move {
95 let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
96 .map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
97 J::compensate(ctx, parsed_args, reason).await
98 })
99 });
100
101 self.jobs.insert(
102 name,
103 Arc::new(JobEntry {
104 info,
105 handler,
106 compensation,
107 }),
108 );
109 }
110
111 pub fn get(&self, name: &str) -> Option<Arc<JobEntry>> {
113 self.jobs.get(name).cloned()
114 }
115
116 pub fn info(&self, name: &str) -> Option<&JobInfo> {
118 self.jobs.get(name).map(|e| &e.info)
119 }
120
121 pub fn exists(&self, name: &str) -> bool {
123 self.jobs.contains_key(name)
124 }
125
126 pub fn job_names(&self) -> impl Iterator<Item = &str> {
128 self.jobs.keys().map(|s| s.as_str())
129 }
130
131 pub fn jobs(&self) -> impl Iterator<Item = (&str, &Arc<JobEntry>)> {
133 self.jobs.iter().map(|(k, v)| (k.as_str(), v))
134 }
135
136 pub fn len(&self) -> usize {
138 self.jobs.len()
139 }
140
141 pub fn is_empty(&self) -> bool {
143 self.jobs.is_empty()
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_empty_registry() {
153 let registry = JobRegistry::new();
154 assert!(registry.is_empty());
155 assert_eq!(registry.len(), 0);
156 assert!(registry.get("nonexistent").is_none());
157 }
158}