use async_trait::async_trait;
use diesel::prelude::*;
use uuid::Uuid;
use super::models::{NewUnifiedWorkflowRegistryEntry, UnifiedWorkflowRegistryEntry};
use crate::database::schema::unified::workflow_registry;
use crate::database::universal_types::{UniversalBinary, UniversalTimestamp, UniversalUuid};
use crate::database::Database;
use crate::models::workflow_packages::StorageType;
use crate::registry::error::StorageError;
use crate::registry::traits::RegistryStorage;
#[derive(Debug, Clone)]
pub struct UnifiedRegistryStorage {
database: Database,
}
impl UnifiedRegistryStorage {
pub fn new(database: Database) -> Self {
Self { database }
}
pub fn database(&self) -> &Database {
&self.database
}
}
#[async_trait]
impl RegistryStorage for UnifiedRegistryStorage {
async fn store_binary(&mut self, data: Vec<u8>) -> Result<String, StorageError> {
crate::dispatch_backend!(
self.database.backend(),
self.store_binary_postgres(data).await,
self.store_binary_sqlite(data).await
)
}
async fn retrieve_binary(&self, id: &str) -> Result<Option<Vec<u8>>, StorageError> {
crate::dispatch_backend!(
self.database.backend(),
self.retrieve_binary_postgres(id).await,
self.retrieve_binary_sqlite(id).await
)
}
async fn delete_binary(&mut self, id: &str) -> Result<(), StorageError> {
crate::dispatch_backend!(
self.database.backend(),
self.delete_binary_postgres(id).await,
self.delete_binary_sqlite(id).await
)
}
fn storage_type(&self) -> StorageType {
StorageType::Database
}
}
impl UnifiedRegistryStorage {
#[cfg(feature = "postgres")]
async fn store_binary_postgres(&self, data: Vec<u8>) -> Result<String, StorageError> {
let conn = self.database.get_postgres_connection().await.map_err(|e| {
StorageError::Backend(format!("Failed to get database connection: {}", e))
})?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_entry = NewUnifiedWorkflowRegistryEntry {
id,
created_at: now,
data: UniversalBinary::from(data),
};
conn.interact(move |conn| {
diesel::insert_into(workflow_registry::table)
.values(&new_entry)
.execute(conn)
})
.await
.map_err(|e| StorageError::Backend(format!("Database interaction error: {}", e)))?
.map_err(|e| StorageError::Backend(format!("Database error: {}", e)))?;
Ok(id.0.to_string())
}
#[cfg(feature = "sqlite")]
async fn store_binary_sqlite(&self, data: Vec<u8>) -> Result<String, StorageError> {
let conn = self
.database
.get_sqlite_connection()
.await
.map_err(|e| StorageError::Backend(format!("Failed to get connection: {}", e)))?;
let id = UniversalUuid::new_v4();
let now = UniversalTimestamp::now();
let new_entry = NewUnifiedWorkflowRegistryEntry {
id,
created_at: now,
data: UniversalBinary::from(data),
};
conn.interact(move |conn| {
diesel::insert_into(workflow_registry::table)
.values(&new_entry)
.execute(conn)
})
.await
.map_err(|e| StorageError::Backend(e.to_string()))?
.map_err(|e| StorageError::Backend(format!("Database error: {}", e)))?;
Ok(id.0.to_string())
}
#[cfg(feature = "postgres")]
async fn retrieve_binary_postgres(&self, id: &str) -> Result<Option<Vec<u8>>, StorageError> {
let registry_uuid =
Uuid::parse_str(id).map_err(|_| StorageError::InvalidId { id: id.to_string() })?;
let conn = self.database.get_postgres_connection().await.map_err(|e| {
StorageError::Backend(format!("Failed to get database connection: {}", e))
})?;
let registry_id = UniversalUuid(registry_uuid);
let entry: Option<UnifiedWorkflowRegistryEntry> = conn
.interact(move |conn| {
workflow_registry::table
.filter(workflow_registry::id.eq(registry_id))
.first::<UnifiedWorkflowRegistryEntry>(conn)
.optional()
})
.await
.map_err(|e| StorageError::Backend(format!("Database interaction error: {}", e)))?
.map_err(|e| StorageError::Backend(format!("Database error: {}", e)))?;
Ok(entry.map(|e| e.data.into_inner()))
}
#[cfg(feature = "sqlite")]
async fn retrieve_binary_sqlite(&self, id: &str) -> Result<Option<Vec<u8>>, StorageError> {
let uuid =
Uuid::parse_str(id).map_err(|_| StorageError::InvalidId { id: id.to_string() })?;
let conn = self
.database
.get_sqlite_connection()
.await
.map_err(|e| StorageError::Backend(format!("Failed to get connection: {}", e)))?;
let registry_id = UniversalUuid(uuid);
let result: Result<Option<UnifiedWorkflowRegistryEntry>, diesel::result::Error> = conn
.interact(move |conn| {
workflow_registry::table
.filter(workflow_registry::id.eq(registry_id))
.first::<UnifiedWorkflowRegistryEntry>(conn)
.optional()
})
.await
.map_err(|e| StorageError::Backend(e.to_string()))?;
match result {
Ok(Some(entry)) => Ok(Some(entry.data.into_inner())),
Ok(None) => Ok(None),
Err(e) => Err(StorageError::Backend(format!("Database error: {}", e))),
}
}
#[cfg(feature = "postgres")]
async fn delete_binary_postgres(&self, id: &str) -> Result<(), StorageError> {
let registry_uuid =
Uuid::parse_str(id).map_err(|_| StorageError::InvalidId { id: id.to_string() })?;
let conn = self.database.get_postgres_connection().await.map_err(|e| {
StorageError::Backend(format!("Failed to get database connection: {}", e))
})?;
let registry_id = UniversalUuid(registry_uuid);
conn.interact(move |conn| {
diesel::delete(workflow_registry::table.filter(workflow_registry::id.eq(registry_id)))
.execute(conn)
})
.await
.map_err(|e| StorageError::Backend(format!("Database interaction error: {}", e)))?
.map_err(|e| StorageError::Backend(format!("Database error: {}", e)))?;
Ok(())
}
#[cfg(feature = "sqlite")]
async fn delete_binary_sqlite(&self, id: &str) -> Result<(), StorageError> {
let uuid =
Uuid::parse_str(id).map_err(|_| StorageError::InvalidId { id: id.to_string() })?;
let conn = self
.database
.get_sqlite_connection()
.await
.map_err(|e| StorageError::Backend(format!("Failed to get connection: {}", e)))?;
let registry_id = UniversalUuid(uuid);
conn.interact(move |conn| {
diesel::delete(workflow_registry::table.filter(workflow_registry::id.eq(registry_id)))
.execute(conn)
})
.await
.map_err(|e| StorageError::Backend(e.to_string()))?
.map_err(|e| StorageError::Backend(format!("Database error: {}", e)))?;
Ok(())
}
}