use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use forge_core::Result;
use forge_core::job::{ForgeJob, JobContext, JobInfo};
use serde_json::Value;
fn normalize_args(args: Value) -> Value {
let unwrapped = match &args {
Value::Object(map) if map.len() == 1 => {
if map.contains_key("args") {
map.get("args").cloned().unwrap_or(Value::Null)
} else if map.contains_key("input") {
map.get("input").cloned().unwrap_or(Value::Null)
} else {
args
}
}
_ => args,
};
match &unwrapped {
Value::Null => Value::Object(serde_json::Map::new()),
_ => unwrapped,
}
}
pub type BoxedJobHandler = Arc<
dyn Fn(&JobContext, Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>
+ Send
+ Sync,
>;
pub type BoxedJobCompensation = Arc<
dyn for<'a> Fn(
&'a JobContext,
Value,
&'a str,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>
+ Send
+ Sync,
>;
pub struct JobEntry {
pub info: JobInfo,
pub handler: BoxedJobHandler,
pub compensation: BoxedJobCompensation,
}
#[derive(Clone, Default)]
pub struct JobRegistry {
jobs: HashMap<String, Arc<JobEntry>>,
}
impl JobRegistry {
pub fn new() -> Self {
Self {
jobs: HashMap::new(),
}
}
pub fn register<J: ForgeJob>(&mut self)
where
J::Args: serde::de::DeserializeOwned + Send + 'static,
J::Output: serde::Serialize + Send + 'static,
{
let info = J::info();
let name = info.name.to_string();
let handler: BoxedJobHandler = Arc::new(move |ctx, args| {
Box::pin(async move {
let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
.map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
let result = J::execute(ctx, parsed_args).await?;
serde_json::to_value(result)
.map_err(|e| forge_core::ForgeError::Internal(e.to_string()))
})
});
let compensation: BoxedJobCompensation = Arc::new(move |ctx, args, reason| {
Box::pin(async move {
let parsed_args: J::Args = serde_json::from_value(normalize_args(args))
.map_err(|e| forge_core::ForgeError::Validation(e.to_string()))?;
J::compensate(ctx, parsed_args, reason).await
})
});
self.jobs.insert(
name,
Arc::new(JobEntry {
info,
handler,
compensation,
}),
);
}
pub fn get(&self, name: &str) -> Option<Arc<JobEntry>> {
self.jobs.get(name).cloned()
}
pub fn info(&self, name: &str) -> Option<&JobInfo> {
self.jobs.get(name).map(|e| &e.info)
}
pub fn exists(&self, name: &str) -> bool {
self.jobs.contains_key(name)
}
pub fn job_names(&self) -> impl Iterator<Item = &str> {
self.jobs.keys().map(|s| s.as_str())
}
pub fn jobs(&self) -> impl Iterator<Item = (&str, &Arc<JobEntry>)> {
self.jobs.iter().map(|(k, v)| (k.as_str(), v))
}
pub fn len(&self) -> usize {
self.jobs.len()
}
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}
}