mod database;
pub mod filesystem;
mod package;
use async_trait::async_trait;
use std::collections::HashMap;
use uuid::Uuid;
use crate::database::Database;
use crate::registry::error::RegistryError;
use crate::registry::loader::{PackageLoader, PackageValidator, TaskRegistrar};
use crate::registry::traits::{RegistryStorage, WorkflowRegistry};
use crate::registry::types::{LoadedWorkflow, WorkflowMetadata, WorkflowPackageId};
use crate::task::TaskNamespace;
pub struct WorkflowRegistryImpl<S: RegistryStorage> {
pub(super) storage: S,
pub(super) database: Database,
#[allow(dead_code)]
loader: PackageLoader,
registrar: TaskRegistrar,
#[allow(dead_code)]
validator: PackageValidator,
pub(super) loaded_packages: HashMap<Uuid, Vec<TaskNamespace>>,
}
impl<S: RegistryStorage> WorkflowRegistryImpl<S> {
pub fn new(storage: S, database: Database) -> Result<Self, RegistryError> {
let loader = PackageLoader::new().map_err(RegistryError::Loader)?;
let registrar = TaskRegistrar::new().map_err(RegistryError::Loader)?;
let validator = PackageValidator::new().map_err(RegistryError::Loader)?;
Ok(Self {
storage,
database,
loader,
registrar,
validator,
loaded_packages: HashMap::new(),
})
}
pub fn with_strict_validation(storage: S, database: Database) -> Result<Self, RegistryError> {
let loader = PackageLoader::new().map_err(RegistryError::Loader)?;
let registrar = TaskRegistrar::new().map_err(RegistryError::Loader)?;
let validator = PackageValidator::strict().map_err(RegistryError::Loader)?;
Ok(Self {
storage,
database,
loader,
registrar,
validator,
loaded_packages: HashMap::new(),
})
}
pub fn loaded_package_count(&self) -> usize {
self.loaded_packages.len()
}
pub fn total_registered_tasks(&self) -> usize {
self.loaded_packages.values().map(|tasks| tasks.len()).sum()
}
pub async fn register_workflow_package(
&mut self,
package_data: Vec<u8>,
) -> Result<Uuid, RegistryError> {
WorkflowRegistry::register_workflow(self, package_data).await
}
pub async fn get_workflow_package_by_id(
&self,
package_id: Uuid,
) -> Result<Option<(WorkflowMetadata, Vec<u8>)>, RegistryError> {
let (registry_id, metadata) = match self.get_package_metadata_by_id(package_id).await? {
Some(data) => data,
None => return Ok(None),
};
let package_data = match self.storage.retrieve_binary(®istry_id).await? {
Some(data) => data,
None => {
return Err(RegistryError::Internal(
"Package metadata exists but binary data is missing".to_string(),
));
}
};
Ok(Some((metadata, package_data)))
}
pub async fn get_workflow_package_by_name(
&self,
package_name: &str,
version: &str,
) -> Result<Option<(WorkflowMetadata, Vec<u8>)>, RegistryError> {
match self.get_workflow(package_name, version).await? {
Some(loaded) => Ok(Some((loaded.metadata, loaded.package_data))),
None => Ok(None),
}
}
pub async fn exists_by_id(&self, package_id: Uuid) -> Result<bool, RegistryError> {
Ok(self.get_package_metadata_by_id(package_id).await?.is_some())
}
pub async fn exists_by_name(
&self,
package_name: &str,
version: &str,
) -> Result<bool, RegistryError> {
Ok(self
.get_package_metadata(package_name, version)
.await?
.is_some())
}
pub async fn list_packages(&self) -> Result<Vec<WorkflowMetadata>, RegistryError> {
self.list_all_packages().await
}
pub async fn unregister_workflow_package_by_id(
&mut self,
package_id: Uuid,
) -> Result<(), RegistryError> {
let (registry_id, _metadata) = match self.get_package_metadata_by_id(package_id).await? {
Some(data) => data,
None => return Ok(()), };
if let Some(_namespaces) = self.loaded_packages.remove(&package_id) {
self.registrar
.unregister_package_tasks(&package_id.to_string())
.map_err(RegistryError::Loader)?;
}
self.delete_package_metadata_by_id(package_id).await?;
self.storage.delete_binary(®istry_id).await?;
Ok(())
}
pub async fn unregister_workflow_package_by_name(
&mut self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
if self
.get_package_metadata(package_name, version)
.await?
.is_none()
{
return Ok(()); }
self.unregister_workflow(package_name, version).await
}
}
#[async_trait]
impl<S: RegistryStorage + Send + Sync> WorkflowRegistry for WorkflowRegistryImpl<S> {
async fn register_workflow(
&mut self,
package_data: Vec<u8>,
) -> Result<WorkflowPackageId, RegistryError> {
if !Self::is_cloacina_package(&package_data) {
return Err(RegistryError::ValidationError {
reason: "Package data is not a valid .cloacina bzip2 source archive. \
Raw library registration is not supported."
.to_string(),
});
}
let work_dir = tempfile::TempDir::new()
.map_err(|e| RegistryError::Internal(format!("Failed to create temp dir: {}", e)))?;
let archive_path = work_dir.path().join("pkg.cloacina");
std::fs::write(&archive_path, &package_data)
.map_err(|e| RegistryError::Internal(format!("Failed to write archive: {}", e)))?;
let extract_dir = work_dir.path().join("source");
std::fs::create_dir_all(&extract_dir)
.map_err(|e| RegistryError::Internal(format!("Failed to create extract dir: {}", e)))?;
let source_dir = fidius_core::package::unpack_package(&archive_path, &extract_dir)
.map_err(|e| RegistryError::ValidationError {
reason: format!("Failed to unpack source archive: {}", e),
})?;
let manifest = fidius_core::package::load_manifest::<
cloacina_workflow_plugin::CloacinaMetadata,
>(&source_dir)
.map_err(|e| RegistryError::ValidationError {
reason: format!("Failed to load package.toml: {}", e),
})?;
let pkg_name = manifest.package.name.clone();
let pkg_version = manifest.package.version.clone();
if self
.get_package_metadata(&pkg_name, &pkg_version)
.await?
.is_some()
{
return Err(RegistryError::PackageExists {
package_name: pkg_name,
version: pkg_version,
});
}
let package_metadata = crate::registry::loader::package_loader::PackageMetadata {
package_name: pkg_name,
version: pkg_version,
description: manifest.metadata.description.clone(),
author: manifest.metadata.author.clone(),
tasks: vec![],
graph_data: None,
architecture: std::env::consts::ARCH.to_string(),
symbols: vec![],
};
let registry_id = self.storage.store_binary(package_data).await?;
let package_id = self
.store_package_metadata(®istry_id, &package_metadata)
.await?;
Ok(package_id)
}
async fn get_workflow(
&self,
package_name: &str,
version: &str,
) -> Result<Option<LoadedWorkflow>, RegistryError> {
let (registry_id, package_metadata) =
match self.get_package_metadata(package_name, version).await? {
Some(data) => data,
None => return Ok(None),
};
let package_data = match self.storage.retrieve_binary(®istry_id).await? {
Some(data) => data,
None => {
return Err(RegistryError::Internal(
"Package metadata exists but binary data is missing".to_string(),
));
}
};
let workflow_metadata = WorkflowMetadata {
id: Uuid::new_v4(), registry_id: Uuid::parse_str(®istry_id).map_err(RegistryError::InvalidUuid)?,
package_name: package_metadata.package_name.clone(),
version: package_metadata.version.clone(),
description: package_metadata.description.clone(),
author: package_metadata.author.clone(),
tasks: package_metadata
.tasks
.iter()
.map(|t| t.local_id.clone())
.collect(),
schedules: Vec::new(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
Ok(Some(LoadedWorkflow {
metadata: workflow_metadata,
package_data,
}))
}
async fn list_workflows(&self) -> Result<Vec<WorkflowMetadata>, RegistryError> {
self.list_all_packages().await
}
async fn unregister_workflow(
&mut self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
let (registry_id, _) = self
.get_package_metadata(package_name, version)
.await?
.ok_or_else(|| RegistryError::PackageNotFound {
package_name: package_name.to_string(),
version: version.to_string(),
})?;
let package_uuid = Uuid::parse_str(®istry_id).map_err(RegistryError::InvalidUuid)?;
if let Some(_namespaces) = self.loaded_packages.remove(&package_uuid) {
self.registrar
.unregister_package_tasks(&package_uuid.to_string())
.map_err(RegistryError::Loader)?;
}
self.delete_package_metadata(package_name, version).await?;
self.storage.delete_binary(®istry_id).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::registry::storage::FilesystemRegistryStorage;
use tempfile::TempDir;
#[tokio::test]
async fn test_registry_creation() {
let temp_dir = TempDir::new().unwrap();
let _storage = FilesystemRegistryStorage::new(temp_dir.path()).unwrap();
assert!(temp_dir.path().exists());
}
#[test]
fn test_registry_metrics() {
let temp_dir = TempDir::new().unwrap();
let _storage = FilesystemRegistryStorage::new(temp_dir.path()).unwrap();
assert!(temp_dir.path().exists());
}
}