use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use cloacina_workflow_plugin::{GraphExecutionRequest, GraphPackageMetadata};
use super::accumulator::{
accumulator_runtime, accumulator_runtime_with_source, AccumulatorContext,
AccumulatorRuntimeConfig, BoundarySender,
};
use super::reactor::{CompiledGraphFn, InputStrategy, ReactionCriteria};
use super::scheduler::{
AccumulatorDeclaration, AccumulatorFactory, AccumulatorSpawnConfig,
ComputationGraphDeclaration, ReactorDeclaration,
};
use super::types::{GraphError, GraphResult, InputCache, SourceName};
struct LoadedGraphPlugin {
handle: std::sync::Mutex<fidius_host::PluginHandle>,
_temp_dir: tempfile::TempDir,
}
unsafe impl Send for LoadedGraphPlugin {}
unsafe impl Sync for LoadedGraphPlugin {}
impl LoadedGraphPlugin {
fn load(library_data: &[u8]) -> Result<Self, String> {
let temp_dir =
tempfile::TempDir::new().map_err(|e| format!("Failed to create temp dir: {}", e))?;
let library_extension = if cfg!(target_os = "macos") {
"dylib"
} else if cfg!(target_os = "windows") {
"dll"
} else {
"so"
};
let temp_path = temp_dir
.path()
.join(format!("graph_plugin.{}", library_extension));
std::fs::write(&temp_path, library_data)
.map_err(|e| format!("Failed to write library: {}", e))?;
let loaded = fidius_host::loader::load_library(&temp_path)
.map_err(|e| format!("Failed to load library: {}", e))?;
let plugin = loaded
.plugins
.into_iter()
.next()
.ok_or_else(|| "No plugins in library".to_string())?;
let handle = fidius_host::PluginHandle::from_loaded(plugin);
Ok(Self {
handle: std::sync::Mutex::new(handle),
_temp_dir: temp_dir,
})
}
fn execute_graph(
&self,
request: GraphExecutionRequest,
) -> Result<cloacina_workflow_plugin::GraphExecutionResult, String> {
let handle = self
.handle
.lock()
.map_err(|e| format!("Plugin mutex poisoned: {}", e))?;
handle
.call_method(METHOD_EXECUTE_GRAPH, &(request,))
.map_err(|e| format!("execute_graph FFI call failed: {}", e))
}
}
pub use cloacina_workflow_plugin::{
METHOD_EXECUTE_GRAPH, METHOD_EXECUTE_TASK, METHOD_GET_GRAPH_METADATA,
METHOD_GET_REACTOR_METADATA, METHOD_GET_TASK_METADATA, METHOD_GET_TRIGGERLESS_GRAPH_METADATA,
METHOD_GET_TRIGGER_METADATA, METHOD_INVOKE_TRIGGERLESS_GRAPH, METHOD_INVOKE_TRIGGER_POLL,
};
pub fn call_get_reactor_metadata(
handle: &fidius_host::PluginHandle,
) -> Result<Vec<cloacina_workflow_plugin::ReactorPackageMetadata>, String> {
match handle.call_method::<(), Vec<cloacina_workflow_plugin::ReactorPackageMetadata>>(
METHOD_GET_REACTOR_METADATA,
&(),
) {
Ok(metadata) => Ok(metadata),
Err(fidius_host::CallError::NotImplemented { .. }) => Ok(Vec::new()),
Err(e) => Err(format!("get_reactor_metadata FFI call failed: {}", e)),
}
}
pub fn call_get_trigger_metadata(
handle: &fidius_host::PluginHandle,
) -> Result<Vec<cloacina_workflow_plugin::TriggerPackageMetadata>, String> {
match handle.call_method::<(), Vec<cloacina_workflow_plugin::TriggerPackageMetadata>>(
METHOD_GET_TRIGGER_METADATA,
&(),
) {
Ok(metadata) => Ok(metadata),
Err(fidius_host::CallError::NotImplemented { .. }) => Ok(Vec::new()),
Err(e) => Err(format!("get_trigger_metadata FFI call failed: {}", e)),
}
}
pub fn build_declaration_from_ffi(
graph_meta: &GraphPackageMetadata,
library_data: Vec<u8>,
) -> ComputationGraphDeclaration {
let criteria = match graph_meta.reaction_mode.as_str() {
"when_all" => ReactionCriteria::WhenAll,
_ => ReactionCriteria::WhenAny,
};
let strategy = match graph_meta.input_strategy.as_str() {
"sequential" => InputStrategy::Sequential,
_ => InputStrategy::Latest,
};
let graph_fn: CompiledGraphFn = match LoadedGraphPlugin::load(&library_data) {
Ok(plugin) => {
let plugin = Arc::new(plugin);
Arc::new(move |cache: InputCache| {
let plugin = plugin.clone();
Box::pin(async move { execute_graph_via_ffi(&plugin, &cache).await })
})
}
Err(e) => {
let error_msg = format!("Graph plugin library failed to load: {}", e);
tracing::warn!("{}", error_msg);
Arc::new(move |_cache: InputCache| {
let msg = error_msg.clone();
Box::pin(async move { GraphResult::error(GraphError::Execution(msg)) })
})
}
};
let accumulators = graph_meta
.accumulators
.iter()
.map(|acc_entry| {
let factory: Arc<dyn AccumulatorFactory> = match acc_entry.accumulator_type.as_str() {
"stream" => Arc::new(StreamBackendAccumulatorFactory::new(
acc_entry.config.clone(),
)),
_ => Arc::new(PassthroughAccumulatorFactory),
};
AccumulatorDeclaration {
name: acc_entry.name.clone(),
factory,
}
})
.collect();
ComputationGraphDeclaration {
name: graph_meta.graph_name.clone(),
accumulators,
reactor: ReactorDeclaration {
criteria,
strategy,
graph_fn,
},
tenant_id: None, reactor_name: graph_meta.trigger_reactor.clone(),
}
}
async fn execute_graph_via_ffi(plugin: &Arc<LoadedGraphPlugin>, cache: &InputCache) -> GraphResult {
let cache_snapshot = cache.snapshot();
let mut ffi_cache: HashMap<String, String> = HashMap::new();
for source_name in cache_snapshot.sources() {
if let Some(raw_bytes) = cache_snapshot.get_raw(source_name.as_str()) {
match bincode::deserialize::<Vec<u8>>(raw_bytes) {
Ok(original_bytes) => {
let json_str = String::from_utf8(original_bytes).unwrap_or_else(|e| {
tracing::warn!(
source = source_name.as_str(),
"cache entry is not valid UTF-8, hex-encoding: {}",
e
);
raw_bytes.iter().map(|b| format!("{:02x}", b)).collect()
});
ffi_cache.insert(source_name.as_str().to_string(), json_str);
}
Err(e) => {
return GraphResult::error(GraphError::Serialization(format!(
"Failed to deserialize cache entry '{}' for FFI: {}",
source_name.as_str(),
e
)));
}
}
}
}
let request = GraphExecutionRequest { cache: ffi_cache };
let plugin = plugin.clone();
let result = tokio::task::spawn_blocking(move || plugin.execute_graph(request)).await;
match result {
Ok(Ok(ffi_result)) => {
if ffi_result.success {
let outputs: Vec<Box<dyn std::any::Any + Send>> =
if let Some(json_outputs) = ffi_result.terminal_outputs_json {
json_outputs
.into_iter()
.filter_map(|json_str| {
serde_json::from_str::<serde_json::Value>(&json_str)
.ok()
.map(|v| Box::new(v) as Box<dyn std::any::Any + Send>)
})
.collect()
} else {
vec![]
};
GraphResult::completed(outputs)
} else {
let error_msg = ffi_result
.error
.unwrap_or_else(|| "unknown FFI execution error".to_string());
GraphResult::error(GraphError::NodeExecution(error_msg))
}
}
Ok(Err(e)) => GraphResult::error(GraphError::NodeExecution(format!(
"FFI execute_graph call failed: {}",
e
))),
Err(join_err) => GraphResult::error(GraphError::NodeExecution(format!(
"FFI execute_graph panicked: {}",
join_err
))),
}
}
pub struct PassthroughAccumulatorFactory;
struct GenericPassthroughAccumulator;
#[async_trait::async_trait]
impl super::Accumulator for GenericPassthroughAccumulator {
type Output = Vec<u8>;
fn process(&mut self, event: Vec<u8>) -> Option<Vec<u8>> {
Some(event)
}
}
impl AccumulatorFactory for PassthroughAccumulatorFactory {
fn spawn(
&self,
name: String,
boundary_tx: mpsc::Sender<(SourceName, Vec<u8>)>,
shutdown_rx: watch::Receiver<bool>,
config: AccumulatorSpawnConfig,
) -> (mpsc::Sender<Vec<u8>>, JoinHandle<()>) {
let (socket_tx, socket_rx) = mpsc::channel(64);
let checkpoint = config.dal.map(|dal| {
super::accumulator::CheckpointHandle::new(dal, config.graph_name.clone(), name.clone())
});
let sender = BoundarySender::new(boundary_tx, SourceName::new(&name));
let ctx = AccumulatorContext {
output: sender,
name: name.clone(),
shutdown: shutdown_rx,
checkpoint,
health: config.health_tx,
};
let handle = tokio::spawn(accumulator_runtime(
GenericPassthroughAccumulator,
ctx,
socket_rx,
AccumulatorRuntimeConfig::default(),
));
(socket_tx, handle)
}
}
pub struct StreamBackendAccumulatorFactory {
config: std::collections::HashMap<String, String>,
}
impl StreamBackendAccumulatorFactory {
pub fn new(config: std::collections::HashMap<String, String>) -> Self {
Self { config }
}
}
#[cfg(feature = "kafka")]
struct KafkaEventSource {
broker_var: String,
topic: String,
group: String,
extra: std::collections::HashMap<String, String>,
name: String,
}
#[cfg(feature = "kafka")]
#[async_trait::async_trait]
impl super::accumulator::EventSource for KafkaEventSource {
async fn run(
self,
events: mpsc::Sender<Vec<u8>>,
mut shutdown: watch::Receiver<bool>,
) -> Result<(), super::accumulator::AccumulatorError> {
let broker_url = crate::var(&self.broker_var).map_err(|e| {
super::accumulator::AccumulatorError::Init(format!(
"cannot resolve broker var '{}': {}",
self.broker_var, e
))
})?;
let stream_config = super::stream_backend::StreamConfig {
broker_url,
topic: self.topic,
group: self.group,
extra: self.extra,
};
use super::stream_backend::StreamBackend as _;
let mut backend = super::stream_backend::kafka::KafkaStreamBackend::connect(&stream_config)
.await
.map_err(|e| {
super::accumulator::AccumulatorError::Init(format!("Kafka connect failed: {}", e))
})?;
tracing::info!(accumulator = %self.name, "Kafka event source started");
loop {
tokio::select! {
result = backend.recv() => {
match result {
Ok(msg) => {
tracing::debug!(
accumulator = %self.name,
offset = msg.offset,
bytes = msg.payload.len(),
"Kafka message received"
);
if events.send(msg.payload).await.is_err() {
break;
}
}
Err(e) => {
tracing::warn!(accumulator = %self.name, "Kafka recv error: {}", e);
}
}
}
_ = shutdown.changed() => {
tracing::debug!(accumulator = %self.name, "Kafka event source shutting down");
break;
}
}
}
Ok(())
}
}
impl AccumulatorFactory for StreamBackendAccumulatorFactory {
fn spawn(
&self,
name: String,
boundary_tx: mpsc::Sender<(SourceName, Vec<u8>)>,
shutdown_rx: watch::Receiver<bool>,
config: AccumulatorSpawnConfig,
) -> (mpsc::Sender<Vec<u8>>, JoinHandle<()>) {
let (socket_tx, socket_rx) = mpsc::channel(1024);
let checkpoint = config.dal.map(|dal| {
super::accumulator::CheckpointHandle::new(dal, config.graph_name.clone(), name.clone())
});
let sender = BoundarySender::new(boundary_tx, SourceName::new(&name));
let ctx = AccumulatorContext {
output: sender,
name: name.clone(),
shutdown: shutdown_rx,
checkpoint,
health: config.health_tx,
};
let topic = self.config.get("topic").cloned().unwrap_or_default();
let group = self
.config
.get("group")
.cloned()
.unwrap_or_else(|| format!("{}_group", name));
let broker_var = self
.config
.get("broker")
.cloned()
.expect("stream accumulator config must include 'broker' key");
let extra_config: std::collections::HashMap<String, String> = self
.config
.iter()
.filter(|(k, _)| !["topic", "group", "backend", "broker"].contains(&k.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
#[cfg(feature = "kafka")]
let handle = {
let source = KafkaEventSource {
broker_var,
topic,
group,
extra: extra_config,
name: name.clone(),
};
tokio::spawn(accumulator_runtime_with_source(
GenericPassthroughAccumulator,
ctx,
socket_rx,
AccumulatorRuntimeConfig::default(),
source,
))
};
#[cfg(not(feature = "kafka"))]
let handle = {
let _ = (topic, group, extra_config, broker_var);
tracing::error!(accumulator = %name, "stream accumulator requires 'kafka' feature");
tokio::spawn(accumulator_runtime(
GenericPassthroughAccumulator,
ctx,
socket_rx,
AccumulatorRuntimeConfig::default(),
))
};
(socket_tx, handle)
}
}
pub async fn dispatch_runtime_reactors_into_scheduler(
runtime: &crate::Runtime,
scheduler: &super::scheduler::ComputationGraphScheduler,
accumulator_overrides: &[cloacina_workflow_plugin::types::AccumulatorConfig],
tenant_id: Option<String>,
) -> Result<Vec<String>, String> {
let mut dispatched = Vec::new();
for name in runtime.reactor_names() {
let registration = match runtime.get_reactor(&name) {
Some(r) => r,
None => continue,
};
let accumulators: Vec<AccumulatorDeclaration> = registration
.accumulator_names
.iter()
.map(|acc_name| {
let factory: Arc<dyn AccumulatorFactory> = match accumulator_overrides
.iter()
.find(|cfg| &cfg.name == acc_name)
{
Some(override_cfg) => match override_cfg.accumulator_type.as_str() {
"stream" => Arc::new(StreamBackendAccumulatorFactory::new(
override_cfg.config.clone(),
)),
_ => Arc::new(PassthroughAccumulatorFactory),
},
None => Arc::new(PassthroughAccumulatorFactory),
};
AccumulatorDeclaration {
name: acc_name.clone(),
factory,
}
})
.collect();
let criteria = registration.reaction_mode.into();
let strategy = InputStrategy::Latest;
scheduler
.load_reactor(
name.clone(),
accumulators,
criteria,
strategy,
tenant_id.clone(),
vec![],
)
.await?;
tracing::info!(reactor = %name, "package-declared reactor loaded into scheduler");
dispatched.push(name);
}
Ok(dispatched)
}
pub async fn dispatch_package_reactors_into_scheduler(
reactor_metadata: &[cloacina_workflow_plugin::ReactorPackageMetadata],
scheduler: &super::scheduler::ComputationGraphScheduler,
accumulator_overrides: &[cloacina_workflow_plugin::types::AccumulatorConfig],
tenant_id: Option<String>,
) -> Result<Vec<String>, String> {
use cloacina_computation_graph::ReactionMode;
let mut dispatched = Vec::new();
for meta in reactor_metadata {
let accumulators: Vec<AccumulatorDeclaration> = meta
.accumulators
.iter()
.map(|acc| {
let factory: Arc<dyn AccumulatorFactory> = match accumulator_overrides
.iter()
.find(|cfg| cfg.name == acc.name)
{
Some(override_cfg) => match override_cfg.accumulator_type.as_str() {
"stream" => Arc::new(StreamBackendAccumulatorFactory::new(
override_cfg.config.clone(),
)),
_ => Arc::new(PassthroughAccumulatorFactory),
},
None => match acc.accumulator_type.as_str() {
"stream" => {
Arc::new(StreamBackendAccumulatorFactory::new(acc.config.clone()))
}
_ => Arc::new(PassthroughAccumulatorFactory),
},
};
AccumulatorDeclaration {
name: acc.name.clone(),
factory,
}
})
.collect();
let criteria = match meta.reaction_mode.as_str() {
"when_all" => ReactionMode::WhenAll.into(),
_ => ReactionMode::WhenAny.into(),
};
let strategy = InputStrategy::Latest;
scheduler
.load_reactor(
meta.name.clone(),
accumulators,
criteria,
strategy,
tenant_id.clone(),
vec![],
)
.await?;
tracing::info!(
reactor = %meta.name,
package = %meta.package_name,
"package-declared reactor loaded into scheduler (via get_reactor_metadata)"
);
dispatched.push(meta.name.clone());
}
Ok(dispatched)
}
#[cfg(test)]
mod tests {
use super::*;
use cloacina_workflow_plugin::AccumulatorDeclarationEntry;
#[test]
fn test_build_declaration_from_ffi_metadata() {
let meta = GraphPackageMetadata {
graph_name: "test_graph".to_string(),
package_name: "test-pkg".to_string(),
reaction_mode: "when_any".to_string(),
input_strategy: "latest".to_string(),
accumulators: vec![
AccumulatorDeclarationEntry {
name: "alpha".to_string(),
accumulator_type: "passthrough".to_string(),
config: HashMap::new(),
},
AccumulatorDeclarationEntry {
name: "beta".to_string(),
accumulator_type: "stream".to_string(),
config: [("topic".to_string(), "test.topic".to_string())]
.into_iter()
.collect(),
},
],
trigger_reactor: None,
};
let decl = build_declaration_from_ffi(&meta, vec![0u8; 100]);
assert_eq!(decl.name, "test_graph");
assert_eq!(decl.accumulators.len(), 2);
assert_eq!(decl.accumulators[0].name, "alpha");
assert_eq!(decl.accumulators[1].name, "beta");
}
#[test]
fn test_reaction_mode_parsing() {
let meta_any = GraphPackageMetadata {
graph_name: "g".to_string(),
package_name: "p".to_string(),
reaction_mode: "when_any".to_string(),
input_strategy: "latest".to_string(),
accumulators: vec![],
trigger_reactor: None,
};
let decl_any = build_declaration_from_ffi(&meta_any, vec![]);
assert!(matches!(
decl_any.reactor.criteria,
ReactionCriteria::WhenAny
));
let meta_all = GraphPackageMetadata {
graph_name: "g".to_string(),
package_name: "p".to_string(),
reaction_mode: "when_all".to_string(),
input_strategy: "sequential".to_string(),
accumulators: vec![],
trigger_reactor: None,
};
let decl_all = build_declaration_from_ffi(&meta_all, vec![]);
assert!(matches!(
decl_all.reactor.criteria,
ReactionCriteria::WhenAll
));
assert!(matches!(
decl_all.reactor.strategy,
InputStrategy::Sequential
));
}
}