#![allow(clippy::too_many_arguments)]
use async_trait::async_trait;
use log::{debug, info};
use swagger::{ApiError, Has, XSpanIdString};
use crate::server::api_types::{
CreateRoCrateEntityResponse, DeleteRoCrateEntitiesResponse, DeleteRoCrateEntityResponse,
GetRoCrateEntityResponse, ListRoCrateEntitiesResponse, UpdateRoCrateEntityResponse,
};
use crate::models;
use sha2::{Digest, Sha256};
use std::io::Read as IoRead;
use super::{ApiContext, MAX_RECORD_TRANSFER_COUNT, database_error_with_msg};
#[async_trait]
pub trait RoCrateApi<C> {
async fn create_ro_crate_entity(
&self,
body: models::RoCrateEntityModel,
context: &C,
) -> Result<CreateRoCrateEntityResponse, ApiError>;
async fn get_ro_crate_entity(
&self,
id: i64,
context: &C,
) -> Result<GetRoCrateEntityResponse, ApiError>;
async fn list_ro_crate_entities(
&self,
workflow_id: i64,
offset: i64,
limit: i64,
context: &C,
) -> Result<ListRoCrateEntitiesResponse, ApiError>;
async fn update_ro_crate_entity(
&self,
id: i64,
body: models::RoCrateEntityModel,
context: &C,
) -> Result<UpdateRoCrateEntityResponse, ApiError>;
async fn delete_ro_crate_entity(
&self,
id: i64,
body: Option<serde_json::Value>,
context: &C,
) -> Result<DeleteRoCrateEntityResponse, ApiError>;
async fn delete_ro_crate_entities(
&self,
workflow_id: i64,
body: Option<serde_json::Value>,
context: &C,
) -> Result<DeleteRoCrateEntitiesResponse, ApiError>;
}
fn compute_file_sha256(path: &str) -> Option<String> {
let file = std::fs::File::open(path).ok()?;
let mut reader = std::io::BufReader::new(file);
let mut hasher = Sha256::new();
let mut buffer = [0u8; 8192];
loop {
match reader.read(&mut buffer) {
Ok(0) => break,
Ok(n) => hasher.update(&buffer[..n]),
Err(_) => return None,
}
}
Some(format!("{:x}", hasher.finalize()))
}
#[derive(Clone)]
pub struct RoCrateApiImpl {
pub context: ApiContext,
}
impl RoCrateApiImpl {
pub fn new(context: ApiContext) -> Self {
Self { context }
}
pub async fn create_entities_for_input_files(&self, workflow_id: i64) -> Result<i64, ApiError> {
let run_id: i64 = sqlx::query_scalar!(
"SELECT run_id FROM workflow_status WHERE id = $1",
workflow_id,
)
.fetch_optional(self.context.pool.as_ref())
.await
.map_err(|e| database_error_with_msg(e, "Failed to get workflow run_id"))?
.unwrap_or(0);
let input_files = match sqlx::query!(
r#"
SELECT id, workflow_id, name, path, st_mtime
FROM file
WHERE workflow_id = $1 AND st_mtime IS NOT NULL
"#,
workflow_id
)
.fetch_all(self.context.pool.as_ref())
.await
{
Ok(files) => files,
Err(e) => {
return Err(super::database_error_with_msg(
e,
"Failed to list input files for RO-Crate",
));
}
};
let existing_entities: std::collections::HashMap<i64, i64> = match sqlx::query!(
r#"SELECT id, file_id FROM ro_crate_entity WHERE workflow_id = $1 AND file_id IS NOT NULL"#,
workflow_id
)
.fetch_all(self.context.pool.as_ref())
.await
{
Ok(rows) => rows
.into_iter()
.filter_map(|r| r.file_id.map(|fid| (fid, r.id)))
.collect(),
Err(e) => {
return Err(super::database_error_with_msg(
e,
"Failed to check existing RO-Crate entities",
));
}
};
let mut upserted_count = 0i64;
for file in input_files {
let mime_type = mime_guess::from_path(&file.path)
.first()
.map(|m| m.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let basename = std::path::Path::new(&file.path)
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| file.path.clone());
let mut metadata = serde_json::json!({
"@id": file.path,
"@type": "File",
"name": basename,
"encodingFormat": mime_type,
"torc:run_id": run_id
});
if let Some(st_mtime) = file.st_mtime
&& let Some(datetime) =
chrono::DateTime::<chrono::Utc>::from_timestamp(st_mtime as i64, 0)
{
metadata["dateModified"] = serde_json::json!(datetime.to_rfc3339());
}
let metadata_str = metadata.to_string();
let result = if let Some(&entity_db_id) = existing_entities.get(&file.id) {
sqlx::query!(
r#"
UPDATE ro_crate_entity SET metadata = $1 WHERE id = $2
"#,
metadata_str,
entity_db_id,
)
.execute(self.context.pool.as_ref())
.await
} else {
sqlx::query!(
r#"
INSERT INTO ro_crate_entity (workflow_id, file_id, entity_id, entity_type, metadata)
VALUES ($1, $2, $3, $4, $5)
"#,
workflow_id,
file.id,
file.path,
"File",
metadata_str,
)
.execute(self.context.pool.as_ref())
.await
};
match result {
Ok(_) => {
debug!(
"Upserted RO-Crate entity for input file '{}' (file_id={})",
file.path, file.id
);
upserted_count += 1;
}
Err(e) => {
log::warn!(
"Failed to upsert RO-Crate entity for file '{}': {}",
file.path,
e
);
}
}
}
debug!(
"Upserted {} RO-Crate entities for input files in workflow_id={}",
upserted_count, workflow_id
);
Ok(upserted_count)
}
pub async fn create_server_software_entity(&self, workflow_id: i64) -> Result<(), ApiError> {
let run_id: i64 = sqlx::query_scalar!(
"SELECT run_id FROM workflow_status WHERE id = $1",
workflow_id,
)
.fetch_optional(self.context.pool.as_ref())
.await
.map_err(|e| database_error_with_msg(e, "Failed to get workflow run_id"))?
.unwrap_or(0);
let entity_id = format!("#software-torc-server-run-{}", run_id);
let exists = sqlx::query_scalar!(
"SELECT COUNT(*) FROM ro_crate_entity WHERE workflow_id = $1 AND entity_id = $2",
workflow_id,
entity_id,
)
.fetch_one(self.context.pool.as_ref())
.await
.map_err(|e| database_error_with_msg(e, "Failed to check existing software entity"))?;
if exists > 0 {
debug!(
"SoftwareApplication entity '{}' already exists for workflow_id={}, skipping",
entity_id, workflow_id
);
return Ok(());
}
let version = env!("CARGO_PKG_VERSION");
let exe_path = std::env::current_exe()
.map(|p| p.display().to_string())
.unwrap_or_else(|_| "unknown".to_string());
let sha256 = compute_file_sha256(&exe_path);
let mut metadata = serde_json::json!({
"@id": entity_id,
"@type": "SoftwareApplication",
"name": "torc-server",
"version": version,
"url": exe_path,
"torc:run_id": run_id,
});
if let Some(hash) = sha256 {
metadata["sha256"] = serde_json::json!(hash);
}
if let Ok(meta) = std::fs::metadata(&exe_path) {
metadata["contentSize"] = serde_json::json!(meta.len());
}
let metadata_str = metadata.to_string();
let entity_type = "SoftwareApplication";
match sqlx::query!(
r#"
INSERT INTO ro_crate_entity (workflow_id, file_id, entity_id, entity_type, metadata)
VALUES ($1, NULL, $2, $3, $4)
"#,
workflow_id,
entity_id,
entity_type,
metadata_str,
)
.execute(self.context.pool.as_ref())
.await
{
Ok(_) => {
debug!(
"Created SoftwareApplication entity for torc-server version={} (workflow_id={})",
version, workflow_id
);
}
Err(e) => {
log::warn!(
"Failed to create SoftwareApplication entity for torc-server: {}",
e
);
}
}
Ok(())
}
}
#[async_trait]
impl<C> RoCrateApi<C> for RoCrateApiImpl
where
C: Has<XSpanIdString> + Send + Sync,
{
async fn create_ro_crate_entity(
&self,
mut body: models::RoCrateEntityModel,
context: &C,
) -> Result<CreateRoCrateEntityResponse, ApiError> {
debug!(
"create_ro_crate_entity({:?}) - X-Span-ID: {:?}",
body,
context.get().0.clone()
);
let result = match sqlx::query!(
r#"
INSERT INTO ro_crate_entity (workflow_id, file_id, entity_id, entity_type, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
"#,
body.workflow_id,
body.file_id,
body.entity_id,
body.entity_type,
body.metadata,
)
.fetch_one(self.context.pool.as_ref())
.await
{
Ok(result) => result,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to create RO-Crate entity record",
));
}
};
body.id = Some(result.id);
debug!(
"Created RO-Crate entity with ID: {} for workflow_id={}",
result.id, body.workflow_id
);
Ok(CreateRoCrateEntityResponse::SuccessfulResponse(body))
}
async fn get_ro_crate_entity(
&self,
id: i64,
context: &C,
) -> Result<GetRoCrateEntityResponse, ApiError> {
debug!(
"get_ro_crate_entity({}) - X-Span-ID: {:?}",
id,
context.get().0.clone()
);
let record = match sqlx::query!(
r#"
SELECT id, workflow_id, file_id, entity_id, entity_type, metadata
FROM ro_crate_entity
WHERE id = $1
"#,
id
)
.fetch_optional(self.context.pool.as_ref())
.await
{
Ok(Some(record)) => record,
Ok(None) => {
let error_response = models::ErrorResponse::new(serde_json::json!({
"message": format!("RO-Crate entity not found with ID: {}", id)
}));
return Ok(GetRoCrateEntityResponse::NotFoundErrorResponse(
error_response,
));
}
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to fetch RO-Crate entity",
));
}
};
let model = models::RoCrateEntityModel {
id: Some(record.id),
workflow_id: record.workflow_id,
file_id: record.file_id,
entity_id: record.entity_id,
entity_type: record.entity_type,
metadata: record.metadata,
};
Ok(GetRoCrateEntityResponse::SuccessfulResponse(model))
}
async fn list_ro_crate_entities(
&self,
workflow_id: i64,
offset: i64,
limit: i64,
context: &C,
) -> Result<ListRoCrateEntitiesResponse, ApiError> {
debug!(
"list_ro_crate_entities({}, {}, {}) - X-Span-ID: {:?}",
workflow_id,
offset,
limit,
context.get().0.clone()
);
let limit = std::cmp::min(limit, MAX_RECORD_TRANSFER_COUNT);
let records = match sqlx::query!(
r#"
SELECT id, workflow_id, file_id, entity_id, entity_type, metadata
FROM ro_crate_entity
WHERE workflow_id = $1
ORDER BY id
LIMIT $2 OFFSET $3
"#,
workflow_id,
limit,
offset
)
.fetch_all(self.context.pool.as_ref())
.await
{
Ok(records) => records,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to list RO-Crate entities",
));
}
};
let items: Vec<models::RoCrateEntityModel> = records
.into_iter()
.map(|record| models::RoCrateEntityModel {
id: Some(record.id),
workflow_id: record.workflow_id,
file_id: record.file_id,
entity_id: record.entity_id,
entity_type: record.entity_type,
metadata: record.metadata,
})
.collect();
let count = items.len() as i64;
let total_count = match sqlx::query!(
r#"SELECT COUNT(*) as total FROM ro_crate_entity WHERE workflow_id = $1"#,
workflow_id
)
.fetch_one(self.context.pool.as_ref())
.await
{
Ok(row) => row.total,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to count RO-Crate entities",
));
}
};
let has_more = offset + count < total_count;
Ok(ListRoCrateEntitiesResponse::SuccessfulResponse(
models::ListRoCrateEntitiesResponse {
items: Some(items),
offset,
max_limit: MAX_RECORD_TRANSFER_COUNT,
count,
total_count,
has_more,
},
))
}
async fn update_ro_crate_entity(
&self,
id: i64,
mut body: models::RoCrateEntityModel,
context: &C,
) -> Result<UpdateRoCrateEntityResponse, ApiError> {
debug!(
"update_ro_crate_entity({}, {:?}) - X-Span-ID: {:?}",
id,
body,
context.get().0.clone()
);
let result = match sqlx::query!(
r#"
UPDATE ro_crate_entity
SET file_id = $1, entity_id = $2, entity_type = $3, metadata = $4
WHERE id = $5
"#,
body.file_id,
body.entity_id,
body.entity_type,
body.metadata,
id,
)
.execute(self.context.pool.as_ref())
.await
{
Ok(result) => result,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to update RO-Crate entity",
));
}
};
if result.rows_affected() == 0 {
let error_response = models::ErrorResponse::new(serde_json::json!({
"message": format!("RO-Crate entity not found with ID: {}", id)
}));
return Ok(UpdateRoCrateEntityResponse::NotFoundErrorResponse(
error_response,
));
}
body.id = Some(id);
debug!("Updated RO-Crate entity with ID: {}", id);
Ok(UpdateRoCrateEntityResponse::SuccessfulResponse(body))
}
async fn delete_ro_crate_entity(
&self,
id: i64,
_body: Option<serde_json::Value>,
context: &C,
) -> Result<DeleteRoCrateEntityResponse, ApiError> {
debug!(
"delete_ro_crate_entity({}) - X-Span-ID: {:?}",
id,
context.get().0.clone()
);
let result = match sqlx::query!("DELETE FROM ro_crate_entity WHERE id = $1", id)
.execute(self.context.pool.as_ref())
.await
{
Ok(result) => result,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to delete RO-Crate entity",
));
}
};
if result.rows_affected() == 0 {
let error_response = models::ErrorResponse::new(serde_json::json!({
"message": format!("RO-Crate entity not found with ID: {}", id)
}));
return Ok(DeleteRoCrateEntityResponse::NotFoundErrorResponse(
error_response,
));
}
info!("Deleted RO-Crate entity with ID: {}", id);
Ok(DeleteRoCrateEntityResponse::SuccessfulResponse(
serde_json::json!({"message": "RO-Crate entity deleted successfully"}),
))
}
async fn delete_ro_crate_entities(
&self,
workflow_id: i64,
_body: Option<serde_json::Value>,
context: &C,
) -> Result<DeleteRoCrateEntitiesResponse, ApiError> {
debug!(
"delete_ro_crate_entities(workflow_id={}) - X-Span-ID: {:?}",
workflow_id,
context.get().0.clone()
);
let result = match sqlx::query!(
"DELETE FROM ro_crate_entity WHERE workflow_id = $1",
workflow_id
)
.execute(self.context.pool.as_ref())
.await
{
Ok(result) => result,
Err(e) => {
return Err(database_error_with_msg(
e,
"Failed to delete RO-Crate entities",
));
}
};
let deleted_count = result.rows_affected();
info!(
"Deleted {} RO-Crate entities for workflow_id={}",
deleted_count, workflow_id
);
Ok(DeleteRoCrateEntitiesResponse::SuccessfulResponse(
serde_json::json!({
"message": format!("Deleted {} RO-Crate entities", deleted_count),
"deleted_count": deleted_count
}),
))
}
}