#![allow(unexpected_cfgs, clippy::too_many_arguments)]
pub mod inventory_entries;
pub mod types;
pub use inventory_entries::{
ComputationGraphEntry, ReactorEntry, TaskEntry, TriggerEntry, TriggerlessGraph,
TriggerlessGraphEntry, TriggerlessGraphFn, TriggerlessGraphRegistration,
WorkflowDescriptorEntry,
};
pub use types::{
AccumulatorDeclarationEntry, CloacinaMetadata, GraphExecutionRequest, GraphExecutionResult,
GraphPackageMetadata, PackageTasksMetadata, ReactorPackageMetadata, TaskExecutionRequest,
TaskExecutionResult, TaskMetadataEntry, TriggerInvokeRequest, TriggerInvokeResult,
TriggerPackageMetadata, TriggerlessGraphInvokeRequest, TriggerlessGraphInvokeResult,
TriggerlessGraphMetadataEntry,
};
pub use fidius;
pub use fidius_core;
pub use fidius_core::descriptor;
pub use fidius_core::inventory;
pub use fidius_core::registry;
pub use fidius_core::status;
pub use fidius_core::wire;
pub use fidius::plugin_impl;
pub use fidius_core::error::PluginError;
pub use fidius_core::package::{PackageHeader, PackageManifest};
pub use fidius::fidius_plugin_registry;
#[macro_export]
macro_rules! package {
() => {
#[cfg(feature = "packaged")]
pub mod _ffi {
use $crate::CloacinaPlugin as _;
use $crate::__fidius_CloacinaPlugin;
mod __cloacina_package_marker {
pub struct Once;
}
pub struct CloacinaPackagePlugin;
#[$crate::plugin_impl(CloacinaPlugin, crate = "cloacina_workflow_plugin")]
impl $crate::CloacinaPlugin for CloacinaPackagePlugin {
fn get_task_metadata(
&self,
) -> ::core::result::Result<$crate::PackageTasksMetadata, $crate::PluginError>
{
let mut tasks: ::std::vec::Vec<$crate::TaskMetadataEntry> =
::std::vec::Vec::new();
let mut workflow_name: ::std::string::String = ::std::string::String::new();
for (idx, entry) in $crate::inventory::iter::<$crate::TaskEntry>
.into_iter()
.enumerate()
{
let ns = (entry.namespace)();
if workflow_name.is_empty() {
workflow_name = ns.workflow_id.clone();
}
let task = (entry.constructor)();
let dependencies: ::std::vec::Vec<::std::string::String> =
cloacina_workflow::Task::dependencies(&*task)
.iter()
.map(|n| n.task_id.clone())
.collect();
tasks.push($crate::TaskMetadataEntry {
index: idx as u32,
id: cloacina_workflow::Task::id(&*task).to_string(),
namespaced_id_template: format!(
"{}::{}::{}::{}",
ns.tenant_id, ns.package_name, ns.workflow_id, ns.task_id,
),
dependencies,
description: format!("Task: {}", cloacina_workflow::Task::id(&*task)),
source_location: format!("{}/lib.rs", env!("CARGO_PKG_NAME")),
});
}
let descriptor = $crate::inventory::iter::<$crate::WorkflowDescriptorEntry>
.into_iter()
.next();
let (description, author, fingerprint, graph_data_json, triggers) =
match descriptor {
Some(d) => (
if d.description.is_empty() {
None
} else {
Some(d.description.to_string())
},
if d.author.is_empty() {
None
} else {
Some(d.author.to_string())
},
if d.fingerprint.is_empty() {
None
} else {
Some(d.fingerprint.to_string())
},
if d.graph_data_json.is_empty() {
None
} else {
Some(d.graph_data_json.to_string())
},
(d.triggers)(),
),
None => (None, None, None, None, ::std::vec::Vec::new()),
};
Ok($crate::PackageTasksMetadata {
workflow_name,
package_name: env!("CARGO_PKG_NAME").to_string(),
package_description: description,
package_author: author,
workflow_fingerprint: fingerprint,
graph_data_json,
tasks,
triggers,
})
}
fn execute_task(
&self,
request: $crate::TaskExecutionRequest,
) -> ::core::result::Result<$crate::TaskExecutionResult, $crate::PluginError>
{
use $crate::CloacinaPlugin as _;
static CDYLIB_RUNTIME: ::std::sync::OnceLock<
cloacina_workflow::__private::tokio::runtime::Runtime,
> = ::std::sync::OnceLock::new();
let rt = CDYLIB_RUNTIME.get_or_init(|| {
cloacina_workflow::__private::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.thread_name("package-shell-cdylib-worker")
.build()
.expect("Failed to create cdylib tokio runtime")
});
let task_arc_opt = $crate::inventory::iter::<$crate::TaskEntry>
.into_iter()
.map(|entry| (entry.constructor)())
.find(|t| cloacina_workflow::Task::id(&**t) == request.task_name);
let task = match task_arc_opt {
Some(t) => t,
None => {
return Ok($crate::TaskExecutionResult {
success: false,
context_json: None,
error: Some(format!("Unknown task: {}", request.task_name)),
});
}
};
let context: cloacina_workflow::Context<::serde_json::Value> =
match cloacina_workflow::Context::from_json(request.context_json) {
Ok(c) => c,
Err(e) => {
return Err($crate::PluginError {
code: "CONTEXT_ERROR".to_string(),
message: format!("Failed to parse context: {}", e),
details: None,
});
}
};
let result = rt.block_on(async move {
cloacina_workflow::Task::execute(&*task, context).await
});
match result {
Ok(updated) => {
let ctx_json = updated.to_json().map_err(|e| $crate::PluginError {
code: "SERIALIZATION_ERROR".to_string(),
message: format!("Failed to serialize context: {}", e),
details: None,
})?;
Ok($crate::TaskExecutionResult {
success: true,
context_json: Some(ctx_json),
error: None,
})
}
Err(e) => Ok($crate::TaskExecutionResult {
success: false,
context_json: None,
error: Some(format!("Task '{}' failed: {:?}", request.task_name, e)),
}),
}
}
fn get_graph_metadata(
&self,
) -> ::core::result::Result<$crate::GraphPackageMetadata, $crate::PluginError>
{
let entries: ::std::vec::Vec<&$crate::ComputationGraphEntry> =
$crate::inventory::iter::<$crate::ComputationGraphEntry>
.into_iter()
.collect();
if entries.is_empty() {
return Err($crate::PluginError {
code: "NOT_SUPPORTED".to_string(),
message: "Package declares no computation graph".to_string(),
details: None,
});
}
if entries.len() > 1 {
return Err($crate::PluginError {
code: "MULTIPLE_GRAPHS".to_string(),
message: format!(
"Package declares {} computation graphs; the unified shell \
supports at most one CG per cdylib",
entries.len()
),
details: None,
});
}
let reg = (entries[0].constructor)();
let accumulators: ::std::vec::Vec<$crate::AccumulatorDeclarationEntry> = reg
.accumulator_names
.iter()
.map(|name| $crate::AccumulatorDeclarationEntry {
name: name.clone(),
accumulator_type: "passthrough".to_string(),
config: ::std::collections::HashMap::new(),
})
.collect();
Ok($crate::GraphPackageMetadata {
graph_name: entries[0].name.to_string(),
package_name: env!("CARGO_PKG_NAME").to_string(),
reaction_mode: reg.reaction_mode.clone(),
input_strategy: "latest".to_string(),
accumulators,
trigger_reactor: reg.trigger_reactor.clone(),
})
}
fn execute_graph(
&self,
request: $crate::GraphExecutionRequest,
) -> ::core::result::Result<$crate::GraphExecutionResult, $crate::PluginError>
{
static CDYLIB_RUNTIME: ::std::sync::OnceLock<
cloacina_workflow::__private::tokio::runtime::Runtime,
> = ::std::sync::OnceLock::new();
let rt = CDYLIB_RUNTIME.get_or_init(|| {
cloacina_workflow::__private::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.thread_name("package-shell-cg-worker")
.build()
.expect("Failed to create cdylib tokio runtime for computation graph")
});
let entry_opt = $crate::inventory::iter::<$crate::ComputationGraphEntry>
.into_iter()
.next();
let entry = match entry_opt {
Some(e) => e,
None => {
return Err($crate::PluginError {
code: "NOT_SUPPORTED".to_string(),
message: "Package declares no computation graph".to_string(),
details: None,
});
}
};
let reg = (entry.constructor)();
let mut cache = cloacina_computation_graph::InputCache::new();
for (source_name, json_str) in &request.cache {
let value: ::serde_json::Value =
::serde_json::from_str(json_str).map_err(|e| $crate::PluginError {
code: "DESERIALIZATION_ERROR".to_string(),
message: format!(
"Failed to parse cache entry '{}': {}",
source_name, e
),
details: None,
})?;
let bytes = cloacina_computation_graph::serialize(&value).map_err(|e| {
$crate::PluginError {
code: "SERIALIZATION_ERROR".to_string(),
message: format!(
"Failed to serialize cache entry '{}': {}",
source_name, e
),
details: None,
}
})?;
cache.update(
cloacina_computation_graph::SourceName::new(source_name),
bytes,
);
}
let result = rt.block_on(async { (reg.graph_fn)(cache).await });
match result {
cloacina_computation_graph::GraphResult::Completed { outputs } => {
let terminal_json: ::std::vec::Vec<::std::string::String> = outputs
.iter()
.filter_map(|o| {
o.downcast_ref::<::serde_json::Value>()
.map(|v| ::serde_json::to_string(v).unwrap_or_default())
})
.collect();
Ok($crate::GraphExecutionResult {
success: true,
terminal_outputs_json: if terminal_json.is_empty() {
None
} else {
Some(terminal_json)
},
error: None,
})
}
cloacina_computation_graph::GraphResult::Error(e) => {
Ok($crate::GraphExecutionResult {
success: false,
terminal_outputs_json: None,
error: Some(format!("{}", e)),
})
}
}
}
fn get_reactor_metadata(
&self,
) -> ::core::result::Result<
::std::vec::Vec<$crate::ReactorPackageMetadata>,
$crate::PluginError,
> {
let mut out: ::std::vec::Vec<$crate::ReactorPackageMetadata> =
::std::vec::Vec::new();
for entry in $crate::inventory::iter::<$crate::ReactorEntry> {
let reg = (entry.constructor)();
let accumulators: ::std::vec::Vec<$crate::AccumulatorDeclarationEntry> =
reg.accumulator_names
.iter()
.map(|name| $crate::AccumulatorDeclarationEntry {
name: name.clone(),
accumulator_type: "passthrough".to_string(),
config: ::std::collections::HashMap::new(),
})
.collect();
out.push($crate::ReactorPackageMetadata {
name: reg.name,
package_name: env!("CARGO_PKG_NAME").to_string(),
reaction_mode: reg.reaction_mode.as_str().to_string(),
accumulators,
});
}
Ok(out)
}
fn get_trigger_metadata(
&self,
) -> ::core::result::Result<
::std::vec::Vec<$crate::TriggerPackageMetadata>,
$crate::PluginError,
> {
let mut out: ::std::vec::Vec<$crate::TriggerPackageMetadata> =
::std::vec::Vec::new();
for entry in $crate::inventory::iter::<$crate::TriggerEntry> {
let trigger = (entry.constructor)();
let poll_interval = format!(
"{}ms",
cloacina_workflow::Trigger::poll_interval(&*trigger).as_millis()
);
out.push($crate::TriggerPackageMetadata {
name: entry.name.to_string(),
package_name: env!("CARGO_PKG_NAME").to_string(),
poll_interval,
cron_expression: cloacina_workflow::Trigger::cron_expression(&*trigger),
allow_concurrent: cloacina_workflow::Trigger::allow_concurrent(
&*trigger,
),
});
}
Ok(out)
}
fn invoke_trigger_poll(
&self,
request: $crate::TriggerInvokeRequest,
) -> ::core::result::Result<$crate::TriggerInvokeResult, $crate::PluginError>
{
static CDYLIB_TRIGGER_RUNTIME: ::std::sync::OnceLock<
cloacina_workflow::__private::tokio::runtime::Runtime,
> = ::std::sync::OnceLock::new();
let rt = CDYLIB_TRIGGER_RUNTIME.get_or_init(|| {
cloacina_workflow::__private::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.thread_name("package-shell-trigger-worker")
.build()
.expect("Failed to create cdylib trigger tokio runtime")
});
let trigger_arc_opt = $crate::inventory::iter::<$crate::TriggerEntry>
.into_iter()
.find(|entry| entry.name == request.trigger_name)
.map(|entry| (entry.constructor)());
let trigger = match trigger_arc_opt {
Some(t) => t,
None => {
return Ok($crate::TriggerInvokeResult {
fire: false,
context_json: None,
error: Some(format!("Unknown trigger: {}", request.trigger_name)),
});
}
};
let poll_result = rt
.block_on(async move { cloacina_workflow::Trigger::poll(&*trigger).await });
match poll_result {
Ok(cloacina_workflow::TriggerResult::Skip) => {
Ok($crate::TriggerInvokeResult {
fire: false,
context_json: None,
error: None,
})
}
Ok(cloacina_workflow::TriggerResult::Fire(None)) => {
Ok($crate::TriggerInvokeResult {
fire: true,
context_json: None,
error: None,
})
}
Ok(cloacina_workflow::TriggerResult::Fire(Some(ctx))) => {
match ctx.to_json() {
Ok(ctx_json) => Ok($crate::TriggerInvokeResult {
fire: true,
context_json: Some(ctx_json),
error: None,
}),
Err(e) => Err($crate::PluginError {
code: "SERIALIZATION_ERROR".to_string(),
message: format!("Failed to serialize trigger context: {}", e),
details: None,
}),
}
}
Err(e) => Ok($crate::TriggerInvokeResult {
fire: false,
context_json: None,
error: Some(format!(
"Trigger '{}' poll failed: {:?}",
request.trigger_name, e
)),
}),
}
}
fn get_triggerless_graph_metadata(
&self,
) -> ::core::result::Result<
::std::vec::Vec<$crate::TriggerlessGraphMetadataEntry>,
$crate::PluginError,
> {
let mut out: ::std::vec::Vec<$crate::TriggerlessGraphMetadataEntry> =
::std::vec::Vec::new();
for entry in $crate::inventory::iter::<$crate::TriggerlessGraphEntry> {
let reg = (entry.constructor)();
out.push($crate::TriggerlessGraphMetadataEntry {
name: entry.name.to_string(),
package_name: env!("CARGO_PKG_NAME").to_string(),
terminal_node_names: reg.terminal_node_names.clone(),
});
}
Ok(out)
}
fn invoke_triggerless_graph(
&self,
request: $crate::TriggerlessGraphInvokeRequest,
) -> ::core::result::Result<
$crate::TriggerlessGraphInvokeResult,
$crate::PluginError,
> {
static CDYLIB_TLCG_RUNTIME: ::std::sync::OnceLock<
cloacina_workflow::__private::tokio::runtime::Runtime,
> = ::std::sync::OnceLock::new();
let rt = CDYLIB_TLCG_RUNTIME.get_or_init(|| {
cloacina_workflow::__private::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.thread_name("package-shell-tlcg-worker")
.build()
.expect("Failed to create cdylib trigger-less CG tokio runtime")
});
let entry_opt = $crate::inventory::iter::<$crate::TriggerlessGraphEntry>
.into_iter()
.find(|e| e.name == request.graph_name);
let entry = match entry_opt {
Some(e) => e,
None => {
return Ok($crate::TriggerlessGraphInvokeResult {
success: false,
terminal_outputs_json: None,
error: Some(format!(
"Unknown trigger-less graph: {}",
request.graph_name
)),
});
}
};
let context: cloacina_workflow::Context<::serde_json::Value> =
match cloacina_workflow::Context::from_json(request.context_json) {
Ok(c) => c,
Err(e) => {
return Err($crate::PluginError {
code: "CONTEXT_ERROR".to_string(),
message: format!(
"Failed to parse context for graph '{}': {}",
request.graph_name, e
),
details: None,
});
}
};
let reg = (entry.constructor)();
let graph_fn = reg.graph_fn.clone();
let terminal_names = reg.terminal_node_names.clone();
let result = rt.block_on(async move { graph_fn(context).await });
match result {
::cloacina_computation_graph::GraphResult::Completed { outputs } => {
let mut json_outputs: ::std::vec::Vec<::serde_json::Value> =
::std::vec::Vec::with_capacity(outputs.len());
for boxed in outputs.iter() {
if let Some(value) = boxed.downcast_ref::<::serde_json::Value>() {
json_outputs.push(value.clone());
} else {
json_outputs.push(::serde_json::Value::Null);
}
}
while json_outputs.len() < terminal_names.len() {
json_outputs.push(::serde_json::Value::Null);
}
let outputs_json = match ::serde_json::to_string(&json_outputs) {
Ok(s) => s,
Err(e) => {
return Err($crate::PluginError {
code: "SERIALIZATION_ERROR".to_string(),
message: format!(
"Failed to serialize terminal outputs: {}",
e
),
details: None,
});
}
};
Ok($crate::TriggerlessGraphInvokeResult {
success: true,
terminal_outputs_json: Some(outputs_json),
error: None,
})
}
::cloacina_computation_graph::GraphResult::Error(err) => {
Ok($crate::TriggerlessGraphInvokeResult {
success: false,
terminal_outputs_json: None,
error: Some(format!(
"Graph '{}' failed: {}",
request.graph_name, err
)),
})
}
}
}
}
$crate::fidius_plugin_registry!();
}
};
}
pub const METHOD_GET_TASK_METADATA: usize = 0;
pub const METHOD_EXECUTE_TASK: usize = 1;
pub const METHOD_GET_GRAPH_METADATA: usize = 2;
pub const METHOD_EXECUTE_GRAPH: usize = 3;
pub const METHOD_GET_REACTOR_METADATA: usize = 4;
pub const METHOD_GET_TRIGGER_METADATA: usize = 5;
pub const METHOD_INVOKE_TRIGGER_POLL: usize = 6;
pub const METHOD_GET_TRIGGERLESS_GRAPH_METADATA: usize = 7;
pub const METHOD_INVOKE_TRIGGERLESS_GRAPH: usize = 8;
#[fidius::plugin_interface(version = 2, buffer = PluginAllocated)]
pub trait CloacinaPlugin: Send + Sync {
fn get_task_metadata(&self) -> Result<PackageTasksMetadata, PluginError>;
fn execute_task(
&self,
request: TaskExecutionRequest,
) -> Result<TaskExecutionResult, PluginError>;
fn get_graph_metadata(&self) -> Result<GraphPackageMetadata, PluginError>;
fn execute_graph(
&self,
request: GraphExecutionRequest,
) -> Result<GraphExecutionResult, PluginError>;
#[optional(since = 2)]
fn get_reactor_metadata(&self) -> Result<Vec<ReactorPackageMetadata>, PluginError>;
#[optional(since = 2)]
fn get_trigger_metadata(&self) -> Result<Vec<TriggerPackageMetadata>, PluginError>;
#[optional(since = 2)]
fn invoke_trigger_poll(
&self,
request: TriggerInvokeRequest,
) -> Result<TriggerInvokeResult, PluginError>;
#[optional(since = 2)]
fn get_triggerless_graph_metadata(
&self,
) -> Result<Vec<TriggerlessGraphMetadataEntry>, PluginError>;
#[optional(since = 2)]
fn invoke_triggerless_graph(
&self,
request: TriggerlessGraphInvokeRequest,
) -> Result<TriggerlessGraphInvokeResult, PluginError>;
}