use cloacina_computation_graph::{GraphError, GraphResult};
use cloacina_workflow::Context;
use cloacina_workflow_plugin::{
TriggerlessGraphInvokeRequest, TriggerlessGraphInvokeResult, METHOD_INVOKE_TRIGGERLESS_GRAPH,
};
use std::sync::Arc;
pub fn build_ffi_triggerless_graph_fn(
handle: Arc<fidius_host::PluginHandle>,
graph_name: String,
terminal_count: usize,
) -> cloacina_workflow_plugin::TriggerlessGraphFn {
Arc::new(move |ctx: Context<serde_json::Value>| {
let handle = handle.clone();
let graph_name = graph_name.clone();
let terminal_count = terminal_count;
Box::pin(async move {
let context_json = match ctx.to_json() {
Ok(s) => s,
Err(e) => {
return GraphResult::error(GraphError::Serialization(format!(
"context serialize failed: {}",
e
)));
}
};
let request = TriggerlessGraphInvokeRequest {
graph_name: graph_name.clone(),
context_json,
};
let call_result: Result<TriggerlessGraphInvokeResult, fidius_host::CallError> =
tokio::task::spawn_blocking(move || {
handle.call_method(METHOD_INVOKE_TRIGGERLESS_GRAPH, &request)
})
.await
.unwrap_or_else(|e| {
Err(fidius_host::CallError::Serialization(format!(
"spawn_blocking join failed: {}",
e
)))
});
let r = match call_result {
Ok(r) => r,
Err(e) => {
return GraphResult::error(GraphError::Execution(format!(
"FFI invoke_triggerless_graph for '{}' failed: {:?}",
graph_name, e
)));
}
};
if let Some(err) = r.error {
return GraphResult::error(GraphError::Execution(err));
}
if !r.success {
return GraphResult::error(GraphError::Execution(format!(
"Graph '{}' returned !success without an error message",
graph_name
)));
}
let parsed: Vec<serde_json::Value> = match r.terminal_outputs_json {
Some(json) => match serde_json::from_str::<Vec<serde_json::Value>>(&json) {
Ok(v) => v,
Err(e) => {
return GraphResult::error(GraphError::Serialization(format!(
"Failed to parse terminal outputs: {}",
e
)));
}
},
None => Vec::new(),
};
let mut outputs: Vec<Box<dyn std::any::Any + Send>> =
Vec::with_capacity(terminal_count.max(parsed.len()));
for v in parsed.into_iter() {
outputs.push(Box::new(v) as Box<dyn std::any::Any + Send>);
}
while outputs.len() < terminal_count {
outputs.push(Box::new(serde_json::Value::Null) as Box<dyn std::any::Any + Send>);
}
GraphResult::completed(outputs)
})
})
}