mod database;
mod package;
use async_trait::async_trait;
use std::collections::HashMap;
use uuid::Uuid;
use crate::database::Database;
use crate::registry::error::RegistryError;
use crate::registry::loader::{PackageLoader, PackageValidator, TaskRegistrar};
use crate::registry::traits::{RegistryStorage, WorkflowRegistry};
use crate::registry::types::{LoadedWorkflow, WorkflowMetadata, WorkflowPackageId};
use crate::task::TaskNamespace;
pub struct WorkflowRegistryImpl<S: RegistryStorage> {
pub(super) storage: S,
pub(super) database: Database,
loader: PackageLoader,
registrar: TaskRegistrar,
validator: PackageValidator,
pub(super) loaded_packages: HashMap<Uuid, Vec<TaskNamespace>>,
}
impl<S: RegistryStorage> WorkflowRegistryImpl<S> {
pub fn new(storage: S, database: Database) -> Result<Self, RegistryError> {
let loader = PackageLoader::new().map_err(RegistryError::Loader)?;
let registrar = TaskRegistrar::new().map_err(RegistryError::Loader)?;
let validator = PackageValidator::new().map_err(RegistryError::Loader)?;
Ok(Self {
storage,
database,
loader,
registrar,
validator,
loaded_packages: HashMap::new(),
})
}
pub fn with_strict_validation(storage: S, database: Database) -> Result<Self, RegistryError> {
let loader = PackageLoader::new().map_err(RegistryError::Loader)?;
let registrar = TaskRegistrar::new().map_err(RegistryError::Loader)?;
let validator = PackageValidator::strict().map_err(RegistryError::Loader)?;
Ok(Self {
storage,
database,
loader,
registrar,
validator,
loaded_packages: HashMap::new(),
})
}
pub fn loaded_package_count(&self) -> usize {
self.loaded_packages.len()
}
pub fn total_registered_tasks(&self) -> usize {
self.loaded_packages.values().map(|tasks| tasks.len()).sum()
}
pub async fn register_workflow_package(
&mut self,
package_data: Vec<u8>,
) -> Result<Uuid, RegistryError> {
WorkflowRegistry::register_workflow(self, package_data).await
}
pub async fn get_workflow_package_by_id(
&self,
package_id: Uuid,
) -> Result<Option<(WorkflowMetadata, Vec<u8>)>, RegistryError> {
let (registry_id, metadata) = match self.get_package_metadata_by_id(package_id).await? {
Some(data) => data,
None => return Ok(None),
};
let package_data = match self.storage.retrieve_binary(®istry_id).await? {
Some(data) => data,
None => {
return Err(RegistryError::Internal(
"Package metadata exists but binary data is missing".to_string(),
));
}
};
Ok(Some((metadata, package_data)))
}
pub async fn get_workflow_package_by_name(
&self,
package_name: &str,
version: &str,
) -> Result<Option<(WorkflowMetadata, Vec<u8>)>, RegistryError> {
match self.get_workflow(package_name, version).await? {
Some(loaded) => Ok(Some((loaded.metadata, loaded.package_data))),
None => Ok(None),
}
}
pub async fn exists_by_id(&self, package_id: Uuid) -> Result<bool, RegistryError> {
Ok(self.get_package_metadata_by_id(package_id).await?.is_some())
}
pub async fn exists_by_name(
&self,
package_name: &str,
version: &str,
) -> Result<bool, RegistryError> {
Ok(self
.get_package_metadata(package_name, version)
.await?
.is_some())
}
pub async fn list_packages(&self) -> Result<Vec<WorkflowMetadata>, RegistryError> {
self.list_all_packages().await
}
pub async fn unregister_workflow_package_by_id(
&mut self,
package_id: Uuid,
) -> Result<(), RegistryError> {
let (registry_id, _metadata) = match self.get_package_metadata_by_id(package_id).await? {
Some(data) => data,
None => return Ok(()), };
if let Some(_namespaces) = self.loaded_packages.remove(&package_id) {
self.registrar
.unregister_package_tasks(&package_id.to_string())
.map_err(RegistryError::Loader)?;
}
self.delete_package_metadata_by_id(package_id).await?;
self.storage.delete_binary(®istry_id).await?;
Ok(())
}
pub async fn unregister_workflow_package_by_name(
&mut self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
if self
.get_package_metadata(package_name, version)
.await?
.is_none()
{
return Ok(()); }
self.unregister_workflow(package_name, version).await
}
}
#[async_trait]
impl<S: RegistryStorage + Send + Sync> WorkflowRegistry for WorkflowRegistryImpl<S> {
async fn register_workflow(
&mut self,
package_data: Vec<u8>,
) -> Result<WorkflowPackageId, RegistryError> {
let is_cloacina = Self::is_cloacina_package(&package_data);
let so_data = if is_cloacina {
Self::extract_so_from_cloacina(&package_data).await?
} else {
package_data.clone()
};
let validation_result = self
.validator
.validate_package(&so_data, None)
.await
.map_err(RegistryError::Loader)?;
if !validation_result.is_valid {
return Err(RegistryError::ValidationError {
reason: validation_result.errors.join("; "),
});
}
let package_metadata = if is_cloacina {
self.loader
.extract_metadata(&package_data)
.await
.map_err(RegistryError::Loader)?
} else {
return Err(RegistryError::ValidationError {
reason:
"Raw .so file registration not yet supported. Please use .cloacina packages."
.to_string(),
});
};
if self
.get_package_metadata(&package_metadata.package_name, &package_metadata.version)
.await?
.is_some()
{
return Err(RegistryError::PackageExists {
package_name: package_metadata.package_name,
version: package_metadata.version,
});
}
let registry_id = self.storage.store_binary(package_data).await?;
let package_id = self
.store_package_metadata(®istry_id, &package_metadata)
.await?;
let registered_namespaces = self
.registrar
.register_package_tasks(
&package_id.to_string(),
&so_data,
&package_metadata,
Some("public"), )
.await
.map_err(RegistryError::Loader)?;
self.loaded_packages
.insert(package_id, registered_namespaces);
Ok(package_id)
}
async fn get_workflow(
&self,
package_name: &str,
version: &str,
) -> Result<Option<LoadedWorkflow>, RegistryError> {
let (registry_id, package_metadata) =
match self.get_package_metadata(package_name, version).await? {
Some(data) => data,
None => return Ok(None),
};
let package_data = match self.storage.retrieve_binary(®istry_id).await? {
Some(data) => data,
None => {
return Err(RegistryError::Internal(
"Package metadata exists but binary data is missing".to_string(),
));
}
};
let workflow_metadata = WorkflowMetadata {
id: Uuid::new_v4(), registry_id: Uuid::parse_str(®istry_id).map_err(RegistryError::InvalidUuid)?,
package_name: package_metadata.package_name.clone(),
version: package_metadata.version.clone(),
description: package_metadata.description.clone(),
author: package_metadata.author.clone(),
tasks: package_metadata
.tasks
.iter()
.map(|t| t.local_id.clone())
.collect(),
schedules: Vec::new(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
Ok(Some(LoadedWorkflow {
metadata: workflow_metadata,
package_data,
}))
}
async fn list_workflows(&self) -> Result<Vec<WorkflowMetadata>, RegistryError> {
self.list_all_packages().await
}
async fn unregister_workflow(
&mut self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
let (registry_id, _) = self
.get_package_metadata(package_name, version)
.await?
.ok_or_else(|| RegistryError::PackageNotFound {
package_name: package_name.to_string(),
version: version.to_string(),
})?;
let package_uuid = Uuid::parse_str(®istry_id).map_err(RegistryError::InvalidUuid)?;
if let Some(_namespaces) = self.loaded_packages.remove(&package_uuid) {
self.registrar
.unregister_package_tasks(&package_uuid.to_string())
.map_err(RegistryError::Loader)?;
}
self.delete_package_metadata(package_name, version).await?;
self.storage.delete_binary(®istry_id).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::registry::storage::FilesystemRegistryStorage;
use tempfile::TempDir;
#[tokio::test]
async fn test_registry_creation() {
let temp_dir = TempDir::new().unwrap();
let _storage = FilesystemRegistryStorage::new(temp_dir.path()).unwrap();
assert!(temp_dir.path().exists());
}
#[test]
fn test_registry_metrics() {
let temp_dir = TempDir::new().unwrap();
let _storage = FilesystemRegistryStorage::new(temp_dir.path()).unwrap();
assert!(temp_dir.path().exists());
}
}