mod dynamic_task;
mod extraction;
mod types;
pub use types::{OwnedTaskMetadata, OwnedTaskMetadataCollection};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use dynamic_task::DynamicLibraryTask;
use crate::registry::error::LoaderError;
use crate::registry::loader::package_loader::PackageMetadata;
use crate::task::{register_task_constructor, Task, TaskNamespace};
pub struct TaskRegistrar {
pub(super) temp_dir: TempDir,
registered_tasks: Arc<RwLock<HashMap<String, Vec<TaskNamespace>>>>,
loaded_packages: Arc<RwLock<HashMap<String, ()>>>,
}
impl TaskRegistrar {
pub fn new() -> Result<Self, LoaderError> {
let temp_dir = TempDir::new().map_err(|e| LoaderError::TempDirectory {
error: e.to_string(),
})?;
Ok(Self {
temp_dir,
registered_tasks: Arc::new(RwLock::new(HashMap::new())),
loaded_packages: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn register_package_tasks(
&self,
package_id: &str,
package_data: &[u8],
_metadata: &PackageMetadata,
tenant_id: Option<&str>,
) -> Result<Vec<TaskNamespace>, LoaderError> {
let tenant_id = tenant_id.unwrap_or("public");
let task_metadata = self
.extract_task_metadata_from_library(package_data)
.await?;
let mut registered_namespaces = Vec::new();
let workflow_name = &task_metadata.workflow_name;
let package_name = &task_metadata.package_name;
for task in &task_metadata.tasks {
let task_id = &task.local_id;
let dependencies_json = &task.dependencies_json;
let dependency_namespaces: Vec<TaskNamespace> = if dependencies_json.trim() == "[]" {
Vec::new()
} else {
let dep_names: Vec<String> =
serde_json::from_str(dependencies_json).map_err(|e| {
LoaderError::MetadataExtraction {
reason: format!(
"Failed to parse dependencies JSON '{}': {}",
dependencies_json, e
),
}
})?;
dep_names
.into_iter()
.map(|dep_name| {
if dep_name.contains("::") {
let full_name = dep_name.replace("{tenant}", tenant_id);
crate::parse_namespace(&full_name).map_err(|e| {
LoaderError::MetadataExtraction {
reason: format!(
"Invalid dependency namespace '{}': {}",
full_name, e
),
}
})
} else {
Ok(TaskNamespace::new(
tenant_id,
package_name,
workflow_name,
&dep_name,
))
}
})
.collect::<Result<Vec<_>, _>>()?
};
let namespace = TaskNamespace::new(tenant_id, package_name, workflow_name, task_id);
let library_data = package_data.to_vec();
let task_name = task_id.to_string();
let pkg_name = package_name.to_string();
let deps = dependency_namespaces.clone();
let constructor = Box::new(move || {
Arc::new(DynamicLibraryTask::new(
library_data.clone(),
task_name.clone(),
pkg_name.clone(),
deps.clone(),
)) as Arc<dyn Task>
});
register_task_constructor(namespace.clone(), constructor);
registered_namespaces.push(namespace);
}
{
let mut registered = self.registered_tasks.write();
registered.insert(package_id.to_string(), registered_namespaces.clone());
}
tracing::info!(
"Successfully registered {} tasks for package {} using host-managed approach",
registered_namespaces.len(),
package_name
);
Ok(registered_namespaces)
}
pub fn unregister_package_tasks(&self, package_id: &str) -> Result<(), LoaderError> {
let namespaces = {
let mut registered = self.registered_tasks.write();
registered.remove(package_id)
};
if let Some(namespaces) = namespaces {
tracing::warn!(
"Task unregistration requested for package '{}' with {} tasks, but global registry doesn't support removal yet",
package_id,
namespaces.len()
);
}
{
let mut packages = self.loaded_packages.write();
packages.remove(package_id);
}
Ok(())
}
pub fn get_registered_namespaces(&self, package_id: &str) -> Vec<TaskNamespace> {
let registered = self.registered_tasks.read();
registered.get(package_id).cloned().unwrap_or_default()
}
pub fn loaded_package_count(&self) -> usize {
let packages = self.loaded_packages.read();
packages.len()
}
pub fn total_registered_tasks(&self) -> usize {
let registered = self.registered_tasks.read();
registered.values().map(|tasks| tasks.len()).sum()
}
pub fn temp_dir(&self) -> &Path {
self.temp_dir.path()
}
}
impl Default for TaskRegistrar {
fn default() -> Self {
Self::new().expect("Failed to create default TaskRegistrar")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::registry::loader::package_loader::TaskMetadata as LoaderTaskMetadata;
fn create_mock_package_metadata(package_name: &str, task_count: usize) -> PackageMetadata {
let tasks: Vec<LoaderTaskMetadata> = (0..task_count)
.map(|i| LoaderTaskMetadata {
index: i as u32,
local_id: format!("task_{}", i),
namespaced_id_template: format!("{{tenant_id}}/{{package_name}}/task_{}", i),
dependencies: Vec::new(),
description: format!("Test task {}", i),
source_location: "test.rs:1".to_string(),
})
.collect();
PackageMetadata {
package_name: package_name.to_string(),
version: "1.0.0".to_string(),
description: Some("Test package".to_string()),
author: Some("Test Author".to_string()),
tasks,
graph_data: None,
architecture: "x86_64".to_string(),
symbols: vec!["fidius_get_registry".to_string()],
}
}
fn create_mock_binary_data() -> Vec<u8> {
vec![0x7f, 0x45, 0x4c, 0x46, 0x02, 0x01, 0x01, 0x00] }
#[tokio::test]
async fn test_task_registrar_creation() {
let registrar = TaskRegistrar::new().expect("Failed to create TaskRegistrar");
assert_eq!(registrar.loaded_package_count(), 0);
assert_eq!(registrar.total_registered_tasks(), 0);
assert!(registrar.temp_dir().exists());
}
#[tokio::test]
async fn test_task_registrar_default() {
let registrar = TaskRegistrar::default();
assert_eq!(registrar.loaded_package_count(), 0);
assert!(registrar.temp_dir().exists());
}
#[tokio::test]
async fn test_register_package_tasks_with_invalid_binary() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("test_package", 2);
let invalid_data = b"not a valid library".to_vec();
let result = registrar
.register_package_tasks("test_id", &invalid_data, &metadata, Some("test_tenant"))
.await;
assert!(result.is_err());
match result.unwrap_err() {
LoaderError::LibraryLoad { .. } => {
}
other => panic!("Expected LibraryLoad error, got: {:?}", other),
}
}
#[tokio::test]
async fn test_register_package_tasks_with_missing_symbols() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("test_package", 1);
let mock_data = create_mock_binary_data();
let result = registrar
.register_package_tasks("test_id", &mock_data, &metadata, Some("test_tenant"))
.await;
assert!(result.is_err());
match result.unwrap_err() {
LoaderError::LibraryLoad { .. } | LoaderError::SymbolNotFound { .. } => {
}
other => panic!(
"Expected LibraryLoad or SymbolNotFound error, got: {:?}",
other
),
}
}
#[tokio::test]
async fn test_register_package_tasks_empty_metadata() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("empty_package", 0);
let mock_data = create_mock_binary_data();
let result = registrar
.register_package_tasks("empty_id", &mock_data, &metadata, Some("test_tenant"))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_unregister_nonexistent_package() {
let registrar = TaskRegistrar::new().unwrap();
let result = registrar.unregister_package_tasks("nonexistent_package");
assert!(result.is_ok());
}
#[tokio::test]
async fn test_get_registered_namespaces_empty() {
let registrar = TaskRegistrar::new().unwrap();
let namespaces = registrar.get_registered_namespaces("nonexistent_package");
assert!(namespaces.is_empty());
}
#[tokio::test]
async fn test_registrar_metrics() {
let registrar = TaskRegistrar::new().unwrap();
assert_eq!(registrar.loaded_package_count(), 0);
assert_eq!(registrar.total_registered_tasks(), 0);
let metadata = create_mock_package_metadata("test", 3);
let invalid_data = b"invalid".to_vec();
let _ = registrar
.register_package_tasks("test", &invalid_data, &metadata, None)
.await;
assert_eq!(registrar.loaded_package_count(), 0);
assert_eq!(registrar.total_registered_tasks(), 0);
}
#[tokio::test]
async fn test_concurrent_registrar_operations() {
use std::sync::Arc;
use tokio::task;
let registrar = Arc::new(TaskRegistrar::new().unwrap());
let mut handles = Vec::new();
for i in 0..5 {
let registrar_clone = Arc::clone(®istrar);
let handle = task::spawn(async move {
let metadata = create_mock_package_metadata(&format!("package_{}", i), 2);
let mock_data = create_mock_binary_data();
let _ = registrar_clone
.register_package_tasks(
&format!("id_{}", i),
&mock_data,
&metadata,
Some("tenant"),
)
.await;
let _ = registrar_clone.unregister_package_tasks(&format!("id_{}", i));
i
});
handles.push(handle);
}
for handle in handles {
let task_id = handle.await.expect("Task should complete");
assert!(task_id < 5);
}
assert_eq!(registrar.loaded_package_count(), 0);
}
#[tokio::test]
async fn test_temp_directory_isolation() {
let registrar1 = TaskRegistrar::new().unwrap();
let registrar2 = TaskRegistrar::new().unwrap();
assert_ne!(registrar1.temp_dir(), registrar2.temp_dir());
assert!(registrar1.temp_dir().exists());
assert!(registrar2.temp_dir().exists());
}
#[tokio::test]
async fn test_package_id_tracking() {
let registrar = TaskRegistrar::new().unwrap();
for _ in 0..3 {
let result = registrar.unregister_package_tasks("same_package_id");
assert!(result.is_ok());
}
assert_eq!(registrar.loaded_package_count(), 0);
}
#[tokio::test]
async fn test_tenant_isolation() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("shared_package", 1);
let mock_data = create_mock_binary_data();
let result1 = registrar
.register_package_tasks("pkg1", &mock_data, &metadata, Some("tenant_a"))
.await;
let result2 = registrar
.register_package_tasks("pkg2", &mock_data, &metadata, Some("tenant_b"))
.await;
assert!(result1.is_err());
assert!(result2.is_err());
}
#[tokio::test]
async fn test_default_tenant() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("test_package", 1);
let mock_data = create_mock_binary_data();
let result = registrar
.register_package_tasks("test", &mock_data, &metadata, None)
.await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_large_package_metadata() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("large_package", 100);
let mock_data = create_mock_binary_data();
let result = registrar
.register_package_tasks("large", &mock_data, &metadata, Some("test"))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_error_message_quality() {
let registrar = TaskRegistrar::new().unwrap();
let metadata = create_mock_package_metadata("test", 1);
let invalid_data = b"definitely not a library".to_vec();
let result = registrar
.register_package_tasks("test", &invalid_data, &metadata, Some("test"))
.await;
assert!(result.is_err());
let error = result.unwrap_err();
let error_string = format!("{}", error);
assert!(!error_string.is_empty());
assert!(error_string.contains("Failed to load library") || error_string.contains("Symbol"));
}
#[test]
fn test_registrar_sync_creation() {
let result = TaskRegistrar::new();
assert!(result.is_ok());
let registrar = result.unwrap();
assert!(registrar.temp_dir().exists());
}
}