use pyo3::prelude::*;
use pyo3::types::PyModule;
use std::path::Path;
use std::time::Duration;
use super::task::{pop_workflow_context, push_workflow_context};
use super::workflow_context::PyWorkflowContext;
use crate::task::TaskNamespace;
const IMPORT_TIMEOUT_SECS: u64 = 60;
const STDLIB_DENY_LIST: &[&str] = &[
"os",
"sys",
"subprocess",
"shutil",
"socket",
"http",
"urllib",
"ctypes",
"importlib",
"pathlib",
"io",
"json",
"pickle",
"marshal",
"code",
"codeop",
"compile",
"compileall",
"builtins",
"signal",
"multiprocessing",
"threading",
"tempfile",
"glob",
"fnmatch",
];
#[derive(Debug, thiserror::Error)]
pub enum PythonLoaderError {
#[error("Python import failed: {0}")]
ImportError(String),
#[error("Workflow validation failed: {0}")]
ValidationError(String),
#[error("Task registration failed: {0}")]
RegistrationError(String),
#[error("Python runtime error: {0}")]
RuntimeError(String),
}
impl From<PyErr> for PythonLoaderError {
fn from(err: PyErr) -> Self {
PythonLoaderError::RuntimeError(err.to_string())
}
}
pub fn ensure_cloaca_module(py: Python) -> PyResult<()> {
let sys_modules = py.import("sys")?.getattr("modules")?;
if sys_modules.contains("cloaca")? {
return Ok(());
}
let module = PyModule::new(py, "cloaca")?;
module.add_function(wrap_pyfunction!(super::task::task, &module)?)?;
module.add_class::<super::task::PyTaskHandle>()?;
module.add_class::<super::task::TaskDecorator>()?;
module.add_class::<super::context::PyContext>()?;
module.add_class::<super::workflow::PyWorkflowBuilder>()?;
module.add_class::<super::workflow::PyWorkflow>()?;
module.add_function(wrap_pyfunction!(
super::workflow::register_workflow_constructor,
&module
)?)?;
module.add_function(wrap_pyfunction!(super::trigger::trigger, &module)?)?;
module.add_class::<super::trigger::PyTriggerResult>()?;
module.add_class::<super::trigger::TriggerDecorator>()?;
module.add_class::<super::workflow_context::PyWorkflowContext>()?;
module.add_class::<super::namespace::PyTaskNamespace>()?;
sys_modules.set_item("cloaca", &module)?;
Ok(())
}
pub fn validate_no_stdlib_shadowing(
workflow_dir: &Path,
vendor_dir: &Path,
) -> Result<(), PythonLoaderError> {
for dir in [workflow_dir, vendor_dir] {
if !dir.exists() {
continue;
}
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
let module_name = name_str.strip_suffix(".py").unwrap_or(&name_str);
if STDLIB_DENY_LIST.contains(&module_name) {
return Err(PythonLoaderError::ImportError(format!(
"Package contains '{}' which shadows Python stdlib module '{}' — rejected for security",
name_str, module_name
)));
}
}
}
}
Ok(())
}
pub fn import_and_register_python_workflow(
workflow_dir: &Path,
vendor_dir: &Path,
entry_module: &str,
package_name: &str,
tenant_id: &str,
) -> Result<Vec<TaskNamespace>, PythonLoaderError> {
import_and_register_python_workflow_named(
workflow_dir,
vendor_dir,
entry_module,
package_name,
package_name,
tenant_id,
)
}
pub fn import_and_register_python_workflow_named(
workflow_dir: &Path,
vendor_dir: &Path,
entry_module: &str,
package_name: &str,
workflow_name: &str,
tenant_id: &str,
) -> Result<Vec<TaskNamespace>, PythonLoaderError> {
validate_no_stdlib_shadowing(workflow_dir, vendor_dir)?;
let workflow_dir = workflow_dir.to_path_buf();
let vendor_dir = vendor_dir.to_path_buf();
let entry_module = entry_module.to_string();
let package_name = package_name.to_string();
let workflow_name = workflow_name.to_string();
let tenant_id = tenant_id.to_string();
let timeout = Duration::from_secs(IMPORT_TIMEOUT_SECS);
let handle = std::thread::spawn(move || -> Result<Vec<TaskNamespace>, PythonLoaderError> {
Python::with_gil(|py| {
ensure_cloaca_module(py)?;
let sys = py.import("sys")?;
let path = sys.getattr("path")?;
path.call_method1(
"append",
(workflow_dir
.to_str()
.ok_or(PythonLoaderError::RuntimeError(
"Invalid workflow_dir path".to_string(),
))?,),
)?;
if vendor_dir.exists() {
path.call_method1(
"append",
(vendor_dir.to_str().ok_or(PythonLoaderError::RuntimeError(
"Invalid vendor_dir path".to_string(),
))?,),
)?;
}
let context = PyWorkflowContext::new(&tenant_id, &package_name, &workflow_name);
push_workflow_context(context.clone());
let import_result = py.import(entry_module.as_str());
if let Err(e) = import_result {
pop_workflow_context();
return Err(PythonLoaderError::ImportError(format!(
"Failed to import '{}': {}",
entry_module, e
)));
}
pop_workflow_context();
let python_triggers = crate::python::trigger::drain_python_triggers();
for trigger_def in python_triggers {
let trigger_name = trigger_def.name.clone();
let wrapper = std::sync::Arc::new(
crate::python::trigger::PythonTriggerWrapper::new(&trigger_def),
);
let wrapper_for_closure = wrapper.clone();
crate::trigger::register_trigger_constructor(trigger_name.clone(), move || {
wrapper_for_closure.clone()
});
tracing::info!("Registered Python trigger: {}", trigger_name);
}
let (t, p, w) = context.as_components();
let registry = crate::task::global_task_registry();
let guard = registry.read();
let mut namespaces = Vec::new();
let mut workflow = crate::Workflow::new(w);
workflow.set_tenant(t);
workflow.set_package(p);
for (namespace, constructor) in guard.iter() {
if namespace.tenant_id == t
&& namespace.package_name == p
&& namespace.workflow_id == w
{
namespaces.push(namespace.clone());
let task_instance = constructor();
workflow.add_task(task_instance).map_err(|e| {
PythonLoaderError::RegistrationError(format!("Failed to add task: {}", e))
})?;
}
}
drop(guard);
if namespaces.is_empty() {
return Err(PythonLoaderError::RegistrationError(format!(
"No tasks registered after importing '{}'. Ensure the module uses @cloaca.task decorators.",
entry_module
)));
}
workflow.validate().map_err(|e| {
PythonLoaderError::ValidationError(format!("Workflow validation failed: {}", e))
})?;
let final_workflow = workflow.finalize();
let workflow_name = final_workflow.name().to_string();
crate::workflow::register_workflow_constructor(workflow_name, move || {
final_workflow.clone()
});
tracing::info!(
"Python workflow imported: {} tasks registered for {}::{}::{}",
namespaces.len(),
t,
p,
w
);
Ok(namespaces)
})
});
let start = std::time::Instant::now();
loop {
if handle.is_finished() {
let result = handle.join().map_err(|_| {
PythonLoaderError::RuntimeError("Python import thread panicked".to_string())
})??;
return Ok(result);
}
if start.elapsed() > timeout {
return Err(PythonLoaderError::RuntimeError(format!(
"Python workflow import timed out after {}s",
timeout.as_secs()
)));
}
std::thread::sleep(Duration::from_millis(100));
}
}