use tracing::{debug, error, info, warn};
use super::{PackageState, RegistryReconciler};
use crate::registry::error::RegistryError;
use crate::registry::types::{WorkflowMetadata, WorkflowPackageId};
use crate::task::{global_task_registry, TaskNamespace};
use crate::workflow::global_workflow_registry;
impl RegistryReconciler {
pub(super) async fn load_package(
&self,
metadata: WorkflowMetadata,
) -> Result<(), RegistryError> {
debug!(
"Loading package: {} v{}",
metadata.package_name, metadata.version
);
let loaded_workflow = self
.registry
.get_workflow(&metadata.package_name, &metadata.version)
.await?
.ok_or_else(|| RegistryError::PackageNotFound {
package_name: metadata.package_name.clone(),
version: metadata.version.clone(),
})?;
let work_dir = tempfile::TempDir::new().map_err(|e| RegistryError::RegistrationFailed {
message: format!("Failed to create temp dir: {}", e),
})?;
let archive_path = work_dir.path().join(format!(
"{}-{}.cloacina",
metadata.package_name, metadata.version
));
tokio::fs::write(&archive_path, &loaded_workflow.package_data)
.await
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Failed to write archive to temp file: {}", e),
})?;
let extract_dir = work_dir.path().join("source");
tokio::fs::create_dir_all(&extract_dir).await.map_err(|e| {
RegistryError::RegistrationFailed {
message: format!("Failed to create extract dir: {}", e),
}
})?;
let archive_path_clone = archive_path.clone();
let extract_dir_clone = extract_dir.clone();
let source_dir = tokio::task::spawn_blocking(move || {
fidius_core::package::unpack_package(&archive_path_clone, &extract_dir_clone).map_err(
|e| RegistryError::RegistrationFailed {
message: format!("Failed to unpack source archive: {}", e),
},
)
})
.await
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("spawn_blocking failed during unpack: {}", e),
})??;
let source_dir_clone = source_dir.clone();
let cloacina_manifest = tokio::task::spawn_blocking(move || {
fidius_core::package::load_manifest::<cloacina_workflow_plugin::CloacinaMetadata>(
&source_dir_clone,
)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Failed to load package.toml: {}", e),
})
})
.await
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("spawn_blocking failed during manifest load: {}", e),
})??;
debug!(
"Package manifest loaded: {} v{} language={}",
cloacina_manifest.package.name,
cloacina_manifest.package.version,
cloacina_manifest.metadata.language
);
let (task_namespaces, workflow_name, trigger_names) = if cloacina_manifest.metadata.language
== "rust"
{
debug!(
"Compiling Rust source for package: {}",
metadata.package_name
);
let lib_path = Self::compile_source_package(&source_dir).await?;
let library_data = tokio::fs::read(&lib_path).await.map_err(|e| {
RegistryError::RegistrationFailed {
message: format!(
"Failed to read compiled library {}: {}",
lib_path.display(),
e
),
}
})?;
let task_namespaces = self
.register_package_tasks(&metadata, &library_data)
.await?;
let workflow_name = self
.register_package_workflows(&metadata, &library_data)
.await?;
let trigger_names =
self.register_package_triggers(&metadata, &cloacina_manifest.metadata)?;
(task_namespaces, workflow_name, trigger_names)
} else if cloacina_manifest.metadata.language == "python" {
debug!("Loading Python package: {}", metadata.package_name);
pyo3::prepare_freethreaded_python();
let extracted = tokio::task::spawn_blocking({
let archive_data = loaded_workflow.package_data.clone();
let staging = work_dir.path().join("python-staging");
move || {
std::fs::create_dir_all(&staging).map_err(|e| {
RegistryError::RegistrationFailed {
message: format!("Failed to create Python staging dir: {}", e),
}
})?;
crate::registry::loader::python_loader::extract_python_package(
&archive_data,
&staging,
)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Failed to extract Python package: {}", e),
})
}
})
.await
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("spawn_blocking failed during Python extraction: {}", e),
})??;
let tenant_id = self.config.default_tenant_id.clone();
let task_namespaces = tokio::task::spawn_blocking({
let workflow_dir = extracted.workflow_dir.clone();
let vendor_dir = extracted.vendor_dir.clone();
let entry_module = extracted.entry_module.clone();
let package_name = extracted.package_name.clone();
let workflow_name = extracted.workflow_name.clone();
let tenant_id = tenant_id.clone();
move || {
crate::python::loader::import_and_register_python_workflow_named(
&workflow_dir,
&vendor_dir,
&entry_module,
&package_name,
&workflow_name,
&tenant_id,
)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Python workflow import failed: {}", e),
})
}
})
.await
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("spawn_blocking failed during Python import: {}", e),
})??;
let workflow_name = Some(extracted.workflow_name.clone());
let trigger_names =
self.register_package_triggers(&metadata, &cloacina_manifest.metadata)?;
info!(
"Python package loaded: {} v{} — {} tasks, workflow '{}'",
metadata.package_name,
metadata.version,
task_namespaces.len(),
extracted.workflow_name,
);
(task_namespaces, workflow_name, trigger_names)
} else {
return Err(RegistryError::RegistrationFailed {
message: format!(
"Unsupported package language '{}' for package {} — only 'rust' and 'python' are supported",
cloacina_manifest.metadata.language, metadata.package_name
),
});
};
let package_state = PackageState {
metadata: metadata.clone(),
task_namespaces,
workflow_name,
trigger_names,
};
let mut loaded_packages = self.loaded_packages.write().await;
loaded_packages.insert(metadata.id, package_state);
Ok(())
}
pub(super) async fn unload_package(
&self,
package_id: WorkflowPackageId,
) -> Result<(), RegistryError> {
debug!("Unloading package: {}", package_id);
let mut loaded_packages = self.loaded_packages.write().await;
let package_state =
loaded_packages
.remove(&package_id)
.ok_or_else(|| RegistryError::PackageNotFound {
package_name: package_id.to_string(),
version: "unknown".to_string(),
})?;
drop(loaded_packages);
self.unregister_package_tasks(package_id, &package_state.task_namespaces)
.await?;
if let Some(workflow_name) = &package_state.workflow_name {
self.unregister_package_workflow(workflow_name).await?;
}
if !package_state.trigger_names.is_empty() {
self.unregister_package_triggers(&package_state.trigger_names);
}
info!(
"Unloaded package: {} v{}",
package_state.metadata.package_name, package_state.metadata.version
);
Ok(())
}
pub(super) async fn register_package_tasks(
&self,
metadata: &WorkflowMetadata,
package_data: &[u8],
) -> Result<Vec<TaskNamespace>, RegistryError> {
debug!(
"Loading tasks for package: {} v{}",
metadata.package_name, metadata.version
);
let package_metadata = self
.package_loader
.extract_metadata(package_data)
.await
.map_err(RegistryError::Loader)?;
debug!(
"Package {} contains {} tasks",
package_metadata.package_name,
package_metadata.tasks.len()
);
let package_id = metadata.id.to_string();
let tenant_id = Some(self.config.default_tenant_id.as_str());
let task_namespaces = self
.task_registrar
.register_package_tasks(&package_id, package_data, &package_metadata, tenant_id)
.await
.map_err(RegistryError::Loader)?;
info!(
"Successfully registered {} tasks for package {} v{}",
task_namespaces.len(),
metadata.package_name,
metadata.version
);
Ok(task_namespaces)
}
pub(super) async fn register_package_workflows(
&self,
metadata: &WorkflowMetadata,
package_data: &[u8],
) -> Result<Option<String>, RegistryError> {
debug!(
"Loading workflows for package: {} v{}",
metadata.package_name, metadata.version
);
let package_metadata = self
.package_loader
.extract_metadata(package_data)
.await
.map_err(RegistryError::Loader)?;
if !package_metadata.tasks.is_empty() {
debug!(
"Package {} has {} tasks - workflow exists since it compiled with packaged_workflow macro",
metadata.package_name,
package_metadata.tasks.len()
);
let workflow_name = {
if let Some(first_task) = package_metadata.tasks.first() {
let template = &first_task.namespaced_id_template;
debug!("Parsing workflow_name from template: '{}'", template);
let parts: Vec<&str> = template.split("::").collect();
if parts.len() >= 3 {
let workflow_part = parts[2];
if workflow_part == "{workflow}" {
let task_registry = crate::task::global_task_registry();
let mut found_id = None;
let registry = task_registry.read();
for (namespace, _) in registry.iter() {
if namespace.package_name == metadata.package_name
&& namespace.tenant_id == self.config.default_tenant_id
{
debug!(
"Found registered task with workflow_id: '{}'",
namespace.workflow_id
);
found_id = Some(namespace.workflow_id.clone());
break;
}
}
found_id.unwrap_or_else(|| metadata.package_name.clone())
} else {
workflow_part.to_string()
}
} else {
debug!("Template format unexpected, using package name as fallback");
metadata.package_name.clone()
}
} else {
debug!("No tasks in package metadata, using package name as fallback");
metadata.package_name.clone()
}
};
debug!(
"Using workflow_name '{}' for workflow registration",
workflow_name
);
let task_package_name = metadata.package_name.clone();
debug!(
"Using task_package_name '{}' for task lookup",
task_package_name
);
let _workflow = self.create_workflow_from_host_registry(
&task_package_name, &workflow_name,
&self.config.default_tenant_id,
)?;
let workflow_registry = global_workflow_registry();
let mut registry = workflow_registry.write();
let workflow_name_for_closure = workflow_name.clone();
let package_name_for_closure = task_package_name.clone(); let workflow_name_for_closure_static = workflow_name.clone();
let tenant_id_for_closure = self.config.default_tenant_id.clone();
registry.insert(
workflow_name.clone(),
Box::new(move || {
debug!(
"Creating workflow instance for {} using host registry",
workflow_name_for_closure
);
match Self::create_workflow_from_host_registry_static(
&package_name_for_closure,
&workflow_name_for_closure_static,
&tenant_id_for_closure,
) {
Ok(workflow) => workflow,
Err(e) => {
error!("Failed to create workflow from host registry: {}", e);
crate::workflow::Workflow::new(&workflow_name_for_closure)
}
}
}),
);
info!(
"Registered workflow '{}' for package {} v{}",
workflow_name, metadata.package_name, metadata.version
);
Ok(Some(workflow_name))
} else {
debug!(
"Package {} has no workflow data - registering as task-only package",
metadata.package_name
);
Ok(None)
}
}
pub(super) fn create_workflow_from_host_registry(
&self,
package_name: &str,
workflow_name: &str,
tenant_id: &str,
) -> Result<crate::workflow::Workflow, RegistryError> {
let mut workflow = crate::workflow::Workflow::new(workflow_name);
workflow.set_tenant(tenant_id);
workflow.set_package(package_name);
let task_registry = crate::task::global_task_registry();
let registry = task_registry.read();
let mut found_tasks = 0;
for (namespace, task_constructor) in registry.iter() {
if namespace.package_name == package_name
&& namespace.workflow_id == workflow_name
&& namespace.tenant_id == tenant_id
{
let task = task_constructor();
workflow
.add_task(task)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!(
"Failed to add task {} to workflow: {:?}",
namespace.task_id, e
),
})?;
found_tasks += 1;
}
}
debug!(
"Created workflow '{}' with {} tasks from host registry",
workflow_name, found_tasks
);
workflow
.validate()
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Workflow validation failed: {:?}", e),
})?;
Ok(workflow.finalize())
}
pub(super) fn create_workflow_from_host_registry_static(
package_name: &str,
workflow_name: &str,
tenant_id: &str,
) -> Result<crate::workflow::Workflow, RegistryError> {
let mut workflow = crate::workflow::Workflow::new(workflow_name);
workflow.set_tenant(tenant_id);
workflow.set_package(package_name);
let task_registry = crate::task::global_task_registry();
let registry = task_registry.read();
let mut found_tasks = 0;
for (namespace, task_constructor) in registry.iter() {
if namespace.package_name == package_name
&& namespace.workflow_id == workflow_name
&& namespace.tenant_id == tenant_id
{
let task = task_constructor();
workflow
.add_task(task)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!(
"Failed to add task {} to workflow: {:?}",
namespace.task_id, e
),
})?;
found_tasks += 1;
}
}
debug!(
"Created workflow '{}' with {} tasks from host registry (static)",
workflow_name, found_tasks
);
workflow
.validate()
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Workflow validation failed: {:?}", e),
})?;
Ok(workflow.finalize())
}
pub(super) async fn unregister_package_tasks(
&self,
package_id: WorkflowPackageId,
task_namespaces: &[TaskNamespace],
) -> Result<(), RegistryError> {
let package_id_str = package_id.to_string();
self.task_registrar
.unregister_package_tasks(&package_id_str)
.map_err(|e| RegistryError::RegistrationFailed {
message: format!("Failed to unregister package tasks: {}", e),
})?;
let task_registry = global_task_registry();
let mut registry = task_registry.write();
for namespace in task_namespaces {
registry.remove(namespace);
debug!("Unregistered task: {}", namespace);
}
Ok(())
}
pub(super) async fn unregister_package_workflow(
&self,
workflow_name: &str,
) -> Result<(), RegistryError> {
let workflow_registry = global_workflow_registry();
let mut registry = workflow_registry.write();
registry.remove(workflow_name);
debug!("Unregistered workflow: {}", workflow_name);
Ok(())
}
pub(super) fn register_package_triggers(
&self,
metadata: &WorkflowMetadata,
cloacina_metadata: &cloacina_workflow_plugin::CloacinaMetadata,
) -> Result<Vec<String>, RegistryError> {
if cloacina_metadata.triggers.is_empty() {
return Ok(vec![]);
}
let mut tracked_trigger_names = Vec::new();
for trigger_def in &cloacina_metadata.triggers {
if crate::trigger::registry::is_trigger_registered(&trigger_def.name) {
info!(
"Trigger '{}' (workflow: {}, interval: {}) registered from package {} v{}",
trigger_def.name,
trigger_def.workflow,
trigger_def.poll_interval,
metadata.package_name,
metadata.version
);
tracked_trigger_names.push(trigger_def.name.clone());
} else {
warn!(
"Trigger '{}' declared in package.toml for package {} v{} but not found in \
registry — the package must provide a Trigger impl (via #[trigger] macro or \
manual registration)",
trigger_def.name, metadata.package_name, metadata.version
);
}
}
if !tracked_trigger_names.is_empty() {
info!(
"Tracking {} triggers for package {} v{}",
tracked_trigger_names.len(),
metadata.package_name,
metadata.version
);
}
Ok(tracked_trigger_names)
}
pub(super) fn unregister_package_triggers(&self, trigger_names: &[String]) {
for name in trigger_names {
if crate::trigger::deregister_trigger(name) {
debug!("Deregistered trigger: {}", name);
} else {
warn!("Trigger '{}' was not found in registry during unload", name);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::registry::reconciler::ReconcilerConfig;
use crate::registry::workflow_registry::filesystem::FilesystemWorkflowRegistry;
use serial_test::serial;
use std::sync::Arc;
use uuid::Uuid;
fn make_test_reconciler() -> RegistryReconciler {
let registry = Arc::new(FilesystemWorkflowRegistry::new(vec![]));
let config = ReconcilerConfig::default();
let (_tx, rx) = tokio::sync::watch::channel(false);
RegistryReconciler::new(registry, config, rx).expect("Failed to create test reconciler")
}
fn make_test_metadata() -> WorkflowMetadata {
WorkflowMetadata {
id: Uuid::new_v4(),
registry_id: Uuid::new_v4(),
package_name: "test-pkg".to_string(),
version: "1.0.0".to_string(),
description: Some("Test package".to_string()),
author: None,
tasks: vec![],
schedules: vec![],
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}
}
fn make_cloacina_metadata_with_triggers(
triggers: Vec<cloacina_workflow_plugin::TriggerDefinition>,
) -> cloacina_workflow_plugin::CloacinaMetadata {
cloacina_workflow_plugin::CloacinaMetadata {
workflow_name: "test-workflow".to_string(),
language: "python".to_string(),
description: Some("Test".to_string()),
author: None,
requires_python: None,
entry_module: Some("test.tasks".to_string()),
triggers,
}
}
#[tokio::test]
#[serial]
async fn register_triggers_with_no_triggers_returns_empty() {
let reconciler = make_test_reconciler();
let metadata = make_test_metadata();
let cloacina_meta = make_cloacina_metadata_with_triggers(vec![]);
let result = reconciler
.register_package_triggers(&metadata, &cloacina_meta)
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
#[serial]
async fn register_triggers_tracks_registered_triggers() {
let reconciler = make_test_reconciler();
let metadata = make_test_metadata();
let trigger_name = format!("test-trigger-{}", Uuid::new_v4());
crate::trigger::registry::register_trigger_constructor(trigger_name.clone(), || {
Arc::new(DummyTrigger {
name: "dummy".to_string(),
})
});
let cloacina_meta = make_cloacina_metadata_with_triggers(vec![
cloacina_workflow_plugin::TriggerDefinition {
name: trigger_name.clone(),
workflow: "test-workflow".to_string(),
poll_interval: "5s".to_string(),
cron_expression: None,
allow_concurrent: false,
},
]);
let result = reconciler
.register_package_triggers(&metadata, &cloacina_meta)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0], trigger_name);
crate::trigger::deregister_trigger(&trigger_name);
}
#[tokio::test]
#[serial]
async fn register_triggers_skips_unregistered_triggers() {
let reconciler = make_test_reconciler();
let metadata = make_test_metadata();
let trigger_name = format!("nonexistent-trigger-{}", Uuid::new_v4());
let cloacina_meta = make_cloacina_metadata_with_triggers(vec![
cloacina_workflow_plugin::TriggerDefinition {
name: trigger_name.clone(),
workflow: "test-workflow".to_string(),
poll_interval: "10s".to_string(),
cron_expression: None,
allow_concurrent: false,
},
]);
let result = reconciler
.register_package_triggers(&metadata, &cloacina_meta)
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
#[serial]
async fn register_triggers_mixed_registered_and_missing() {
let reconciler = make_test_reconciler();
let metadata = make_test_metadata();
let registered_name = format!("registered-trigger-{}", Uuid::new_v4());
let missing_name = format!("missing-trigger-{}", Uuid::new_v4());
crate::trigger::registry::register_trigger_constructor(registered_name.clone(), || {
Arc::new(DummyTrigger {
name: "dummy".to_string(),
})
});
let cloacina_meta = make_cloacina_metadata_with_triggers(vec![
cloacina_workflow_plugin::TriggerDefinition {
name: registered_name.clone(),
workflow: "wf1".to_string(),
poll_interval: "5s".to_string(),
cron_expression: None,
allow_concurrent: false,
},
cloacina_workflow_plugin::TriggerDefinition {
name: missing_name.clone(),
workflow: "wf2".to_string(),
poll_interval: "10s".to_string(),
cron_expression: None,
allow_concurrent: false,
},
]);
let result = reconciler
.register_package_triggers(&metadata, &cloacina_meta)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0], registered_name);
crate::trigger::deregister_trigger(®istered_name);
}
#[tokio::test]
#[serial]
async fn unregister_triggers_removes_from_global_registry() {
let reconciler = make_test_reconciler();
let trigger_name = format!("unregister-test-{}", Uuid::new_v4());
crate::trigger::registry::register_trigger_constructor(trigger_name.clone(), || {
Arc::new(DummyTrigger {
name: "dummy".to_string(),
})
});
assert!(crate::trigger::registry::is_trigger_registered(
&trigger_name
));
reconciler.unregister_package_triggers(std::slice::from_ref(&trigger_name));
assert!(!crate::trigger::registry::is_trigger_registered(
&trigger_name
));
}
#[tokio::test]
#[serial]
async fn unregister_triggers_handles_already_removed() {
let reconciler = make_test_reconciler();
let trigger_name = format!("already-gone-{}", Uuid::new_v4());
reconciler.unregister_package_triggers(&[trigger_name]);
}
#[tokio::test]
#[serial]
async fn unregister_triggers_empty_list_is_noop() {
let reconciler = make_test_reconciler();
reconciler.unregister_package_triggers(&[]);
}
#[tokio::test]
#[serial]
async fn unregister_workflow_removes_from_global_registry() {
let reconciler = make_test_reconciler();
let workflow_name = format!("test-wf-{}", Uuid::new_v4());
{
let registry = global_workflow_registry();
let mut reg = registry.write();
let wf_name = workflow_name.clone();
reg.insert(
workflow_name.clone(),
Box::new(move || crate::workflow::Workflow::new(&wf_name)),
);
}
{
let registry = global_workflow_registry();
let reg = registry.read();
assert!(reg.contains_key(&workflow_name));
}
reconciler
.unregister_package_workflow(&workflow_name)
.await
.unwrap();
{
let registry = global_workflow_registry();
let reg = registry.read();
assert!(!reg.contains_key(&workflow_name));
}
}
#[tokio::test]
#[serial]
async fn unregister_workflow_nonexistent_is_ok() {
let reconciler = make_test_reconciler();
let result = reconciler
.unregister_package_workflow("does-not-exist")
.await;
assert!(result.is_ok());
}
#[derive(Debug, Clone)]
struct DummyTrigger {
name: String,
}
#[async_trait::async_trait]
impl crate::trigger::Trigger for DummyTrigger {
fn name(&self) -> &str {
&self.name
}
fn poll_interval(&self) -> std::time::Duration {
std::time::Duration::from_secs(60)
}
fn allow_concurrent(&self) -> bool {
false
}
async fn poll(
&self,
) -> Result<crate::trigger::TriggerResult, crate::trigger::TriggerError> {
Ok(crate::trigger::TriggerResult::Skip)
}
}
}