use anyhow::{Context, Result, anyhow};
use fancy_log::{LogLevel, log};
use crate::{
engine::interfaces::{ConnectionObject, MiddlewareOutput, ProcessingStep, TerminatorResult},
layers::l7::container::Container,
plugins::core::registry,
};
use crate::engine::{
context::{ApplicationContext, ExecutionContext},
key_scoping,
};
pub async fn execute<C: ExecutionContext>(
step: &ProcessingStep,
context: &mut C,
conn: ConnectionObject,
flow_path: String,
) -> Result<TerminatorResult> {
let timeout_secs =
crate::common::config::env_loader::get_env("FLOW_EXECUTION_TIMEOUT_SECS", "10".to_owned())
.parse::<u64>()
.unwrap_or(10);
if let Ok(result) = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
execute_recursive(step, context, conn, flow_path),
)
.await
{
result
} else {
log(
LogLevel::Error,
&format!("✗ Flow execution timed out after {timeout_secs}s"),
);
Err(anyhow!("Flow execution timeout"))
}
}
pub async fn execute_l7(
step: &ProcessingStep,
container: &mut Container,
flow_path: String,
) -> Result<TerminatorResult> {
let mut context = ApplicationContext { container };
let conn = ConnectionObject::Virtual("L7_Managed_Context".into());
execute(step, &mut context, conn, flow_path).await
}
async fn execute_recursive<C: ExecutionContext>(
step: &ProcessingStep,
context: &mut C,
conn: ConnectionObject,
flow_path: String,
) -> Result<TerminatorResult> {
if step.len() != 1 {
return Err(anyhow!(
"Invalid step: expected exactly 1 plugin, found {}",
step.len()
));
}
let (plugin_name, instance) = step
.iter()
.next()
.ok_or_else(|| anyhow!("Empty processing step"))?;
let resolved_inputs = context.resolve_inputs(&instance.input).await;
let plugin = registry::get_plugin(plugin_name)
.ok_or_else(|| anyhow!("Plugin '{plugin_name}' not found in registry"))?;
log(
LogLevel::Debug,
&format!("➜ Executing plugin: {plugin_name} (Path: '{flow_path}')"),
);
let is_external = registry::get_external_plugin(plugin_name).is_some();
if is_external && let Some(last_failure) = registry::EXTERNAL_PLUGIN_FAILURES.get(plugin_name) {
let quiet_period_secs = crate::common::config::env_loader::get_env(
"EXTERNAL_PLUGIN_QUIET_PERIOD_SECS",
"3".to_owned(),
)
.parse::<u64>()
.unwrap_or(3);
if last_failure.elapsed().as_secs() < quiet_period_secs {
log(
LogLevel::Warn,
&format!(
"➜ Circuit Breaker: Plugin '{plugin_name}' is in quiet period (last failure < {quiet_period_secs}s ago). Skipping IO and returning failure branch."
),
);
let output = MiddlewareOutput {
branch: "failure".into(),
store: Some(std::collections::HashMap::from([(
"error".to_owned(),
"circuit_breaker_active".to_owned(),
)])),
};
return handle_middleware_output(output, plugin_name, &flow_path, instance, context, conn)
.await;
}
}
let output_res = if let Some(http_middleware) = plugin.as_http_middleware() {
http_middleware
.execute(context.as_any_mut(), resolved_inputs)
.await
.with_context(|| format!("Error executing HTTP middleware '{plugin_name}'"))
} else if let Some(generic_middleware) = plugin.as_generic_middleware() {
generic_middleware
.execute(resolved_inputs)
.await
.with_context(|| format!("Error executing generic middleware '{plugin_name}'"))
} else if let Some(l7_middleware) = plugin.as_l7_middleware() {
l7_middleware
.execute_l7(context.as_any_mut(), resolved_inputs)
.await
.with_context(|| format!("Error executing L7 middleware '{plugin_name}'"))
} else if let Some(middleware) = plugin.as_middleware() {
middleware
.execute(resolved_inputs)
.await
.with_context(|| format!("Error executing middleware '{plugin_name}'"))
} else {
let terminator_result = if let Some(l7_terminator) = plugin.as_l7_terminator() {
l7_terminator
.execute_l7(context.as_any_mut(), resolved_inputs)
.await
.with_context(|| format!("Error executing L7 terminator '{plugin_name}'"))?
} else if let Some(terminator) = plugin.as_terminator() {
terminator
.execute(resolved_inputs, context.kv_mut(), conn)
.await
.with_context(|| format!("Error executing terminator '{plugin_name}'"))?
} else {
return Err(anyhow!(
"Plugin '{plugin_name}' is neither Middleware nor Terminator"
));
};
match &terminator_result {
TerminatorResult::Finished => {
log(
LogLevel::Debug,
&format!("✓ Flow terminated successfully by '{plugin_name}'"),
);
}
TerminatorResult::Upgrade { protocol, .. } => {
log(
LogLevel::Info,
&format!("➜ Flow upgrade requested by '{plugin_name}' -> Protocol: {protocol}"),
);
}
}
return Ok(terminator_result);
};
let output = match output_res {
Ok(out) => {
if is_external && out.branch == "failure" {
log(
LogLevel::Warn,
&format!(
"⚠ External plugin '{plugin_name}' returned 'failure' branch. Marking as failed in Circuit Breaker."
),
);
registry::EXTERNAL_PLUGIN_FAILURES.insert(plugin_name.clone(), std::time::Instant::now());
}
out
}
Err(e) => {
if is_external {
log(
LogLevel::Error,
&format!(
"✗ Runtime error in external plugin '{plugin_name}': {e}. Activating quiet period."
),
);
registry::EXTERNAL_PLUGIN_FAILURES.insert(plugin_name.clone(), std::time::Instant::now());
}
return Err(e);
}
};
handle_middleware_output(output, plugin_name, &flow_path, instance, context, conn).await
}
async fn handle_middleware_output<C: ExecutionContext>(
output: MiddlewareOutput,
plugin_name: &str,
flow_path: &str,
instance: &crate::engine::interfaces::PluginInstance,
context: &mut C,
conn: ConnectionObject,
) -> Result<TerminatorResult> {
log(
LogLevel::Debug,
&format!(
"✓ Middleware '{}' returned branch: '{}'",
plugin_name, output.branch
),
);
if let Some(updates) = output.store {
let kv = context.kv_mut();
for (raw_key, value) in updates.into_iter() {
if raw_key.contains('{') || raw_key.contains('}') {
log(
LogLevel::Error,
&format!(
"✗ Security: Plugin '{plugin_name}' attempted to store an invalid key name containing '{{' or '}}'. Ignoring: '{raw_key}'"
),
);
continue;
}
let scoped_key = key_scoping::format_scoped_key(flow_path, plugin_name, &raw_key);
log(
LogLevel::Debug,
&format!("⚙ KV Update: {scoped_key} = {value}"),
);
kv.insert(scoped_key, value);
}
}
if let Some(next_step) = instance.output.get(output.branch.as_ref()) {
let next_path = key_scoping::next_path(flow_path, plugin_name, output.branch.as_ref());
Box::pin(execute_recursive(next_step, context, conn, next_path)).await
} else {
Err(anyhow!(
"Flow stalled at '{}': branch '{}' not configured in output",
plugin_name,
output.branch
))
}
}