use super::models::{NewUnifiedWorkflowPackage, UnifiedWorkflowPackage};
use super::DAL;
use crate::database::schema::unified::workflow_packages;
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::models::workflow_packages::WorkflowPackage;
use crate::registry::error::RegistryError;
use crate::registry::loader::package_loader::PackageMetadata;
use diesel::prelude::*;
use uuid::Uuid;
#[derive(Clone)]
pub struct WorkflowPackagesDAL<'a> {
dal: &'a DAL,
}
impl<'a> WorkflowPackagesDAL<'a> {
pub fn new(dal: &'a DAL) -> Self {
Self { dal }
}
pub async fn store_package_metadata(
&self,
registry_id: &str,
package_metadata: &PackageMetadata,
storage_type: crate::models::workflow_packages::StorageType,
) -> Result<Uuid, RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.store_package_metadata_postgres(registry_id, package_metadata, storage_type)
.await,
self.store_package_metadata_sqlite(registry_id, package_metadata, storage_type)
.await
)
}
#[cfg(feature = "postgres")]
async fn store_package_metadata_postgres(
&self,
registry_id: &str,
package_metadata: &PackageMetadata,
storage_type: crate::models::workflow_packages::StorageType,
) -> Result<Uuid, RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let registry_uuid = Uuid::parse_str(registry_id).map_err(RegistryError::InvalidUuid)?;
let metadata =
serde_json::to_string(package_metadata).map_err(RegistryError::Serialization)?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedWorkflowPackage {
id,
registry_id: UniversalUuid(registry_uuid),
package_name: package_metadata.package_name.clone(),
version: package_metadata.version.clone(),
description: package_metadata.description.clone(),
author: package_metadata.author.clone(),
metadata,
storage_type: storage_type.as_str().to_string(),
created_at: now,
updated_at: now,
};
let package_name_clone = package_metadata.package_name.clone();
let version_clone = package_metadata.version.clone();
conn.interact(move |conn| {
diesel::insert_into(workflow_packages::table)
.values(&new_unified)
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| match e {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_info,
) => RegistryError::PackageExists {
package_name: package_name_clone,
version: version_clone,
},
_ => RegistryError::Database(format!("Database error: {}", e)),
})?;
Ok(id.0)
}
#[cfg(feature = "sqlite")]
async fn store_package_metadata_sqlite(
&self,
registry_id: &str,
package_metadata: &PackageMetadata,
storage_type: crate::models::workflow_packages::StorageType,
) -> Result<Uuid, RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let registry_uuid = Uuid::parse_str(registry_id).map_err(RegistryError::InvalidUuid)?;
let metadata =
serde_json::to_string(package_metadata).map_err(RegistryError::Serialization)?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_unified = NewUnifiedWorkflowPackage {
id,
registry_id: UniversalUuid(registry_uuid),
package_name: package_metadata.package_name.clone(),
version: package_metadata.version.clone(),
description: package_metadata.description.clone(),
author: package_metadata.author.clone(),
metadata,
storage_type: storage_type.as_str().to_string(),
created_at: now,
updated_at: now,
};
let package_name_clone = package_metadata.package_name.clone();
let version_clone = package_metadata.version.clone();
conn.interact(move |conn| {
diesel::insert_into(workflow_packages::table)
.values(&new_unified)
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| match e {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_info,
) => RegistryError::PackageExists {
package_name: package_name_clone,
version: version_clone,
},
_ => RegistryError::Database(format!("Database error: {}", e)),
})?;
Ok(id.0)
}
pub async fn get_package_metadata(
&self,
package_name: &str,
version: &str,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_package_metadata_postgres(package_name, version)
.await,
self.get_package_metadata_sqlite(package_name, version)
.await
)
}
#[cfg(feature = "postgres")]
async fn get_package_metadata_postgres(
&self,
package_name: &str,
version: &str,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let package_name = package_name.to_string();
let version = version.to_string();
let result: Option<UnifiedWorkflowPackage> = conn
.interact(move |conn| {
workflow_packages::table
.filter(workflow_packages::package_name.eq(&package_name))
.filter(workflow_packages::version.eq(&version))
.first::<UnifiedWorkflowPackage>(conn)
.optional()
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
if let Some(record) = result {
let metadata: PackageMetadata =
serde_json::from_str(&record.metadata).map_err(RegistryError::Serialization)?;
Ok(Some((record.registry_id.0.to_string(), metadata)))
} else {
Ok(None)
}
}
#[cfg(feature = "sqlite")]
async fn get_package_metadata_sqlite(
&self,
package_name: &str,
version: &str,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let package_name = package_name.to_string();
let version = version.to_string();
let result: Option<UnifiedWorkflowPackage> = conn
.interact(move |conn| {
workflow_packages::table
.filter(workflow_packages::package_name.eq(&package_name))
.filter(workflow_packages::version.eq(&version))
.first::<UnifiedWorkflowPackage>(conn)
.optional()
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
if let Some(record) = result {
let metadata: PackageMetadata =
serde_json::from_str(&record.metadata).map_err(RegistryError::Serialization)?;
Ok(Some((record.registry_id.0.to_string(), metadata)))
} else {
Ok(None)
}
}
pub async fn get_package_metadata_by_id(
&self,
package_id: Uuid,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.get_package_metadata_by_id_postgres(package_id).await,
self.get_package_metadata_by_id_sqlite(package_id).await
)
}
#[cfg(feature = "postgres")]
async fn get_package_metadata_by_id_postgres(
&self,
package_id: Uuid,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let id = UniversalUuid(package_id);
let result: Option<UnifiedWorkflowPackage> = conn
.interact(move |conn| {
workflow_packages::table
.filter(workflow_packages::id.eq(id))
.first::<UnifiedWorkflowPackage>(conn)
.optional()
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
if let Some(record) = result {
let metadata: PackageMetadata =
serde_json::from_str(&record.metadata).map_err(RegistryError::Serialization)?;
Ok(Some((record.registry_id.0.to_string(), metadata)))
} else {
Ok(None)
}
}
#[cfg(feature = "sqlite")]
async fn get_package_metadata_by_id_sqlite(
&self,
package_id: Uuid,
) -> Result<Option<(String, PackageMetadata)>, RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let id = UniversalUuid(package_id);
let result: Option<UnifiedWorkflowPackage> = conn
.interact(move |conn| {
workflow_packages::table
.filter(workflow_packages::id.eq(id))
.first::<UnifiedWorkflowPackage>(conn)
.optional()
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
if let Some(record) = result {
let metadata: PackageMetadata =
serde_json::from_str(&record.metadata).map_err(RegistryError::Serialization)?;
Ok(Some((record.registry_id.0.to_string(), metadata)))
} else {
Ok(None)
}
}
pub async fn list_all_packages(&self) -> Result<Vec<WorkflowPackage>, RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.list_all_packages_postgres().await,
self.list_all_packages_sqlite().await
)
}
#[cfg(feature = "postgres")]
async fn list_all_packages_postgres(&self) -> Result<Vec<WorkflowPackage>, RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let results: Vec<UnifiedWorkflowPackage> = conn
.interact(move |conn| workflow_packages::table.load::<UnifiedWorkflowPackage>(conn))
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(results.into_iter().map(Into::into).collect())
}
#[cfg(feature = "sqlite")]
async fn list_all_packages_sqlite(&self) -> Result<Vec<WorkflowPackage>, RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let results: Vec<UnifiedWorkflowPackage> = conn
.interact(move |conn| workflow_packages::table.load::<UnifiedWorkflowPackage>(conn))
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(results.into_iter().map(Into::into).collect())
}
pub async fn delete_package_metadata(
&self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_package_metadata_postgres(package_name, version)
.await,
self.delete_package_metadata_sqlite(package_name, version)
.await
)
}
#[cfg(feature = "postgres")]
async fn delete_package_metadata_postgres(
&self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let package_name = package_name.to_string();
let version = version.to_string();
conn.interact(move |conn| {
diesel::delete(
workflow_packages::table
.filter(workflow_packages::package_name.eq(&package_name))
.filter(workflow_packages::version.eq(&version)),
)
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn delete_package_metadata_sqlite(
&self,
package_name: &str,
version: &str,
) -> Result<(), RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let package_name = package_name.to_string();
let version = version.to_string();
conn.interact(move |conn| {
diesel::delete(
workflow_packages::table
.filter(workflow_packages::package_name.eq(&package_name))
.filter(workflow_packages::version.eq(&version)),
)
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(())
}
pub async fn delete_package_metadata_by_id(
&self,
package_id: Uuid,
) -> Result<(), RegistryError> {
crate::dispatch_backend!(
self.dal.backend(),
self.delete_package_metadata_by_id_postgres(package_id)
.await,
self.delete_package_metadata_by_id_sqlite(package_id).await
)
}
#[cfg(feature = "postgres")]
async fn delete_package_metadata_by_id_postgres(
&self,
package_id: Uuid,
) -> Result<(), RegistryError> {
let conn = self
.dal
.database
.get_postgres_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let id = UniversalUuid(package_id);
conn.interact(move |conn| {
diesel::delete(workflow_packages::table.filter(workflow_packages::id.eq(id)))
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn delete_package_metadata_by_id_sqlite(
&self,
package_id: Uuid,
) -> Result<(), RegistryError> {
let conn = self
.dal
.database
.get_sqlite_connection()
.await
.map_err(|e| RegistryError::Database(e.to_string()))?;
let id = UniversalUuid(package_id);
conn.interact(move |conn| {
diesel::delete(workflow_packages::table.filter(workflow_packages::id.eq(id)))
.execute(conn)
})
.await
.map_err(|e| RegistryError::Database(e.to_string()))?
.map_err(|e| RegistryError::Database(format!("Database error: {}", e)))?;
Ok(())
}
}