use aion_package::{ExtractionLimits, Package};
use aion_store::{EventStore, PackageRecord};
use chrono::Utc;
use crate::error::EngineError;
use crate::runtime::RuntimeHandle;
use super::WorkflowCatalog;
pub(crate) async fn persist_deployed_package(
store: &dyn EventStore,
package: &Package,
) -> Result<(), EngineError> {
let workflow_type = package.manifest().entry_module.clone();
let content_hash = package.content_hash().to_string();
let archive = package
.to_archive_bytes()
.map_err(|error| EngineError::Load {
reason: format!(
"failed to serialise deployed package `{workflow_type}` version `{content_hash}` for persistence: {error}"
),
})?;
store
.put_package(PackageRecord {
workflow_type,
content_hash,
archive,
deployed_at: Utc::now(),
})
.await?;
Ok(())
}
pub(crate) async fn reload_persisted_packages(
runtime: &RuntimeHandle,
catalog: &WorkflowCatalog,
store: &dyn EventStore,
) -> Result<(), EngineError> {
for record in store.list_packages().await? {
let package = match Package::load_from_bytes(&record.archive, ExtractionLimits::unbounded())
{
Ok(package) => package,
Err(error) => {
tracing::error!(
workflow_type = %record.workflow_type,
content_hash = %record.content_hash,
error = %error,
"persisted deployed package failed validation on reload (store corruption or manual tampering); skipping it — runs pinned to it will fail recovery loudly"
);
continue;
}
};
let computed_hash = package.content_hash().to_string();
if computed_hash != record.content_hash
|| package.manifest().entry_module != record.workflow_type
{
tracing::error!(
workflow_type = %record.workflow_type,
content_hash = %record.content_hash,
computed_hash = %computed_hash,
computed_type = %package.manifest().entry_module,
"persisted deployed package does not match its store key; skipping it — runs pinned to it will fail recovery loudly"
);
continue;
}
match catalog.load_package(runtime, &package).await {
Ok(outcome) => {
tracing::info!(
workflow_type = outcome.record.workflow_type(),
content_hash = %outcome.record.version(),
"reloaded persisted deployed package"
);
}
Err(error) => {
tracing::error!(
workflow_type = %record.workflow_type,
content_hash = %record.content_hash,
error = %error,
"persisted deployed package failed catalog load on reload; skipping it — runs pinned to it will fail recovery loudly"
);
}
}
}
restore_persisted_routes(catalog, store).await
}
async fn restore_persisted_routes(
catalog: &WorkflowCatalog,
store: &dyn EventStore,
) -> Result<(), EngineError> {
for route in store.list_package_routes().await? {
let version = match route.content_hash.parse::<aion_package::ContentHash>() {
Ok(version) => version,
Err(error) => {
tracing::error!(
workflow_type = %route.workflow_type,
content_hash = %route.content_hash,
error = %error,
"persisted route pointer is not a canonical content hash; skipping it"
);
continue;
}
};
match catalog.route_version(&route.workflow_type, &version).await {
Ok(()) => {}
Err(EngineError::UnknownVersion { .. }) => {
tracing::warn!(
workflow_type = %route.workflow_type,
content_hash = %route.content_hash,
"persisted route pointer names a version that is not loaded (an operator-file version absent this boot, or a skipped corrupt row); keeping the route committed by reloads"
);
}
Err(error) => return Err(error),
}
}
Ok(())
}