Skip to main content

forge_runtime/jobs/
registry.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use forge_core::Result;
7use forge_core::job::{ForgeJob, JobContext, JobInfo};
8use serde_json::Value;
9
10/// Normalize args for deserialization.
11/// - Converts `null` to `{}` so both unit `()` and empty structs deserialize correctly.
12/// - Unwraps `{"args": ...}` or `{"input": ...}` wrapper if present (callers may use either format).
13fn normalize_args(args: Value) -> Value {
14    let unwrapped = match &args {
15        Value::Object(map) if map.len() == 1 => {
16            if map.contains_key("args") {
17                map.get("args").cloned().unwrap_or(Value::Null)
18            } else if map.contains_key("input") {
19                map.get("input").cloned().unwrap_or(Value::Null)
20            } else {
21                args
22            }
23        }
24        _ => args,
25    };
26
27    match &unwrapped {
28        Value::Null => Value::Object(serde_json::Map::new()),
29        _ => unwrapped,
30    }
31}
32
33/// Type alias for boxed job handler function.
34pub type BoxedJobHandler = Arc<
35    dyn Fn(&JobContext, Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>
36        + Send
37        + Sync,
38>;
39
40pub type BoxedJobCompensation = Arc<
41    dyn for<'a> Fn(
42            &'a JobContext,
43            Value,
44            &'a str,
45        ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>
46        + Send
47        + Sync,
48>;
49
50/// Entry in the job registry.
51pub struct JobEntry {
52    /// Job metadata.
53    pub info: JobInfo,
54    /// Job handler function.
55    pub handler: BoxedJobHandler,
56    /// Job compensation handler.
57    pub compensation: BoxedJobCompensation,
58}
59
60/// Registry of all FORGE jobs.
61#[derive(Clone, Default)]
62pub struct JobRegistry {
63    jobs: HashMap<String, Arc<JobEntry>>,
64}
65
66impl JobRegistry {
67    /// Create a new empty registry.
68    pub fn new() -> Self {
69        Self {
70            jobs: HashMap::new(),
71        }
72    }
73
74    /// Register a job type.
75    pub fn register<J: ForgeJob>(&mut self)
76    where
77        J::Args: serde::de::DeserializeOwned + Send + 'static,
78        J::Output: serde::Serialize + Send + 'static,
79    {
80        let info = J::info();
81        let name = info.name.to_string();
82
83        let handler: BoxedJobHandler = Arc::new(move |ctx, args| {
84            Box::pin(async move {
85                let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
86                    .map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
87                let result = J::execute(ctx, parsed_args).await?;
88                serde_json::to_value(result)
89                    .map_err(|e| forge_core::ForgeError::Internal(e.to_string()))
90            })
91        });
92
93        let compensation: BoxedJobCompensation = Arc::new(move |ctx, args, reason| {
94            Box::pin(async move {
95                let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
96                    .map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
97                J::compensate(ctx, parsed_args, reason).await
98            })
99        });
100
101        self.jobs.insert(
102            name,
103            Arc::new(JobEntry {
104                info,
105                handler,
106                compensation,
107            }),
108        );
109    }
110
111    /// Get a job entry by name.
112    pub fn get(&self, name: &str) -> Option<Arc<JobEntry>> {
113        self.jobs.get(name).cloned()
114    }
115
116    /// Get job info by name.
117    pub fn info(&self, name: &str) -> Option<&JobInfo> {
118        self.jobs.get(name).map(|e| &e.info)
119    }
120
121    /// Check if a job exists.
122    pub fn exists(&self, name: &str) -> bool {
123        self.jobs.contains_key(name)
124    }
125
126    /// Get all job names.
127    pub fn job_names(&self) -> impl Iterator<Item = &str> {
128        self.jobs.keys().map(|s| s.as_str())
129    }
130
131    /// Get all jobs.
132    pub fn jobs(&self) -> impl Iterator<Item = (&str, &Arc<JobEntry>)> {
133        self.jobs.iter().map(|(k, v)| (k.as_str(), v))
134    }
135
136    /// Get the number of registered jobs.
137    pub fn len(&self) -> usize {
138        self.jobs.len()
139    }
140
141    /// Check if registry is empty.
142    pub fn is_empty(&self) -> bool {
143        self.jobs.is_empty()
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_empty_registry() {
153        let registry = JobRegistry::new();
154        assert!(registry.is_empty());
155        assert_eq!(registry.len(), 0);
156        assert!(registry.get("nonexistent").is_none());
157    }
158}