use aion::EngineError;
use aion_package::{ContentHash, ExtractionLimits, Package, PackageError};
use aion_proto::{
ProtoListVersionsResponse, ProtoLoadPackageResponse, ProtoRouteVersionRequest,
ProtoRouteVersionResponse, ProtoUnloadVersionRequest, ProtoUnloadVersionResponse,
ProtoWorkflowVersion, WireError,
};
use crate::config::{DEPLOY_MAX_ARCHIVE_BYTES_REQUIRED, DEPLOY_MAX_INFLATED_BYTES_REQUIRED};
use crate::{CallerIdentity, ServerState};
#[derive(Debug)]
pub enum DeployApiError {
Unavailable(WireError),
ArchiveTooLarge(WireError),
Wire(WireError),
}
impl DeployApiError {
#[must_use]
pub const fn wire(&self) -> &WireError {
match self {
Self::Unavailable(wire) | Self::ArchiveTooLarge(wire) | Self::Wire(wire) => wire,
}
}
}
pub async fn load_package(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
archive: Vec<u8>,
) -> Result<ProtoLoadPackageResponse, DeployApiError> {
authorize_mutation(state, caller, transport, "deploy.load")?;
enforce_archive_ceiling(state, archive.len())?;
let inflate_ceiling = inflate_ceiling(state)?;
let package = match Package::load_from_bytes(
&archive,
ExtractionLimits::bounded(inflate_ceiling),
) {
Ok(package) => package,
Err(PackageError::InflatedSizeExceeded { limit }) => {
let wire = WireError::invalid_input(format!(
"archive contents inflate past the deploy.max_inflated_bytes limit of {limit} bytes; raise deploy.max_inflated_bytes (or AION_DEPLOY_MAX_INFLATED_BYTES) if this package size is intended"
))
.with_error_type("Package");
return Err(refused(
state,
caller,
transport,
"deploy.load",
None,
DeployApiError::ArchiveTooLarge(wire),
));
}
Err(error) => {
let wire = WireError::invalid_input(format!("archive rejected: {error}"))
.with_error_type("Package");
return Err(refused(
state,
caller,
transport,
"deploy.load",
None,
DeployApiError::Wire(wire),
));
}
};
let engine = engine_handle(state)?;
match engine.load_package(package).await {
Ok(outcome) => {
let workflow_type = outcome.record.workflow_type().to_owned();
let content_hash = outcome.record.version().to_string();
let audit_outcome = if outcome.freshly_loaded {
"loaded"
} else {
"idempotent"
};
tracing::info!(
operation = "deploy.load",
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
workflow_type = %workflow_type,
content_hash = %content_hash,
outcome = audit_outcome,
freshly_loaded = outcome.freshly_loaded,
route_changed = outcome.route_changed,
"deploy mutation applied"
);
record_mutation_metrics(state, "deploy.load", audit_outcome, &workflow_type);
Ok(ProtoLoadPackageResponse {
workflow_type,
content_hash,
deployed_entry_module: outcome.record.deployed_entry_module().to_owned(),
entry_function: outcome.record.entry_function().to_owned(),
freshly_loaded: outcome.freshly_loaded,
route_changed: outcome.route_changed,
})
}
Err(error) => Err(map_engine_refusal(
state,
caller,
transport,
"deploy.load",
None,
error,
)),
}
}
pub fn list_versions(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
) -> Result<ProtoListVersionsResponse, DeployApiError> {
let guard = state.deploy_guard();
if let Err(error) = guard.authorize(caller) {
return Err(denied(state, caller, transport, "deploy.list", &error));
}
let engine = engine_handle(state)?;
let versions = engine
.list_workflow_versions()
.map_err(|error| DeployApiError::Wire(crate::ServerError::from(error).to_wire_error()))?;
Ok(ProtoListVersionsResponse {
versions: versions
.into_iter()
.map(|info| ProtoWorkflowVersion {
workflow_type: info.workflow_type,
content_hash: info.content_hash.to_string(),
deployed_entry_module: info.deployed_entry_module,
entry_function: info.entry_function,
manifest_version: info.manifest_version.as_str().to_owned(),
loaded_at: info.loaded_at.to_rfc3339(),
route_active: info.route_active,
})
.collect(),
})
}
pub async fn route_version(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
request: ProtoRouteVersionRequest,
) -> Result<ProtoRouteVersionResponse, DeployApiError> {
authorize_mutation(state, caller, transport, "deploy.route")?;
let (workflow_type, version) = decode_version_target(
state,
caller,
transport,
"deploy.route",
&request.workflow_type,
&request.content_hash,
)?;
let engine = engine_handle(state)?;
match engine
.route_workflow_version(&workflow_type, &version)
.await
{
Ok(()) => {
tracing::info!(
operation = "deploy.route",
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
workflow_type = %workflow_type,
content_hash = %version,
outcome = "rerouted",
"deploy mutation applied"
);
record_mutation_metrics(state, "deploy.route", "rerouted", &workflow_type);
Ok(ProtoRouteVersionResponse {})
}
Err(error) => Err(map_engine_refusal(
state,
caller,
transport,
"deploy.route",
Some((&workflow_type, &version)),
error,
)),
}
}
pub async fn unload_version(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
request: ProtoUnloadVersionRequest,
) -> Result<ProtoUnloadVersionResponse, DeployApiError> {
authorize_mutation(state, caller, transport, "deploy.unload")?;
let (workflow_type, version) = decode_version_target(
state,
caller,
transport,
"deploy.unload",
&request.workflow_type,
&request.content_hash,
)?;
let engine = engine_handle(state)?;
match engine
.unload_workflow_version(&workflow_type, &version)
.await
{
Ok(()) => {
tracing::info!(
operation = "deploy.unload",
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
workflow_type = %workflow_type,
content_hash = %version,
outcome = "unloaded",
"deploy mutation applied"
);
record_mutation_metrics(state, "deploy.unload", "unloaded", &workflow_type);
Ok(ProtoUnloadVersionResponse {})
}
Err(error) => Err(map_engine_refusal(
state,
caller,
transport,
"deploy.unload",
Some((&workflow_type, &version)),
error,
)),
}
}
fn authorize_mutation(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
operation: &'static str,
) -> Result<(), DeployApiError> {
let guard = state.deploy_guard();
if let Err(error) = guard.authorize(caller) {
return Err(denied(state, caller, transport, operation, &error));
}
if state.drain_state().is_draining() {
return Err(DeployApiError::Unavailable(WireError::backend(
"server is draining and not accepting deploy mutations",
)));
}
Ok(())
}
fn denied(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
operation: &'static str,
error: &crate::ServerError,
) -> DeployApiError {
let wire = error.to_wire_error();
tracing::warn!(
operation,
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
reason = %wire.message,
"deploy operation denied"
);
if let Some(metrics) = state.metrics() {
metrics.deploy_denied(transport);
}
DeployApiError::Wire(wire)
}
fn enforce_archive_ceiling(state: &ServerState, archive_len: usize) -> Result<(), DeployApiError> {
let Some(limit) = state.runtime_config().deploy.max_archive_bytes else {
return Err(DeployApiError::Wire(WireError::backend(
DEPLOY_MAX_ARCHIVE_BYTES_REQUIRED,
)));
};
if archive_len as u64 > limit {
return Err(DeployApiError::ArchiveTooLarge(WireError::invalid_input(
format!(
"archive is {archive_len} bytes, exceeding the deploy.max_archive_bytes limit of {limit} bytes; raise deploy.max_archive_bytes (or AION_DEPLOY_MAX_ARCHIVE_BYTES) if this package size is intended"
),
)));
}
Ok(())
}
fn inflate_ceiling(state: &ServerState) -> Result<u64, DeployApiError> {
state
.runtime_config()
.deploy
.max_inflated_bytes
.ok_or_else(|| {
DeployApiError::Wire(WireError::backend(DEPLOY_MAX_INFLATED_BYTES_REQUIRED))
})
}
fn decode_version_target(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
operation: &'static str,
workflow_type: &str,
content_hash: &str,
) -> Result<(String, ContentHash), DeployApiError> {
if workflow_type.is_empty() {
let wire = WireError::invalid_input("workflow_type must not be empty");
return Err(refused(
state,
caller,
transport,
operation,
None,
DeployApiError::Wire(wire),
));
}
match content_hash.parse::<ContentHash>() {
Ok(version) => Ok((workflow_type.to_owned(), version)),
Err(error) => {
let wire = WireError::invalid_input(format!(
"content_hash `{content_hash}` is not a canonical content hash: {error}"
));
Err(refused(
state,
caller,
transport,
operation,
None,
DeployApiError::Wire(wire),
))
}
}
}
fn map_engine_refusal(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
operation: &'static str,
target: Option<(&str, &ContentHash)>,
error: EngineError,
) -> DeployApiError {
let mapped = match error {
EngineError::ShuttingDown => DeployApiError::Unavailable(
WireError::backend(error.to_string()).with_error_type("ShuttingDown"),
),
EngineError::Load { .. } => DeployApiError::Wire(
WireError::invalid_input(error.to_string()).with_error_type("Load"),
),
EngineError::Package(_) => DeployApiError::Wire(
WireError::invalid_input(error.to_string()).with_error_type("Package"),
),
other => DeployApiError::Wire(crate::ServerError::from(other).to_wire_error()),
};
let wire = mapped.wire();
let outcome = refusal_outcome(&mapped);
tracing::info!(
operation,
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
workflow_type = target.map(|(workflow_type, _)| workflow_type),
content_hash = target.map(|(_, version)| version.to_string()).as_deref(),
outcome,
reason = %wire.message,
"deploy mutation refused"
);
if let Some(metrics) = state.metrics() {
metrics.deploy_operation(operation, outcome);
}
mapped
}
fn refused(
state: &ServerState,
caller: &CallerIdentity,
transport: &'static str,
operation: &'static str,
target: Option<(&str, &ContentHash)>,
mapped: DeployApiError,
) -> DeployApiError {
let outcome = refusal_outcome(&mapped);
tracing::info!(
operation,
subject = caller.subject(),
grant_source = caller.grant_source().label(),
transport,
workflow_type = target.map(|(workflow_type, _)| workflow_type),
content_hash = target.map(|(_, version)| version.to_string()).as_deref(),
outcome,
reason = %mapped.wire().message,
"deploy mutation refused"
);
if let Some(metrics) = state.metrics() {
metrics.deploy_operation(operation, outcome);
}
mapped
}
fn refusal_outcome(error: &DeployApiError) -> &'static str {
match error {
DeployApiError::Unavailable(_) => "unavailable",
DeployApiError::ArchiveTooLarge(_) | DeployApiError::Wire(_) => error.wire().code.as_str(),
}
}
fn engine_handle(state: &ServerState) -> Result<std::sync::Arc<aion::Engine>, DeployApiError> {
state
.deploy_guard()
.engine()
.map(std::sync::Arc::clone)
.map_err(|error| DeployApiError::Wire(error.to_wire_error()))
}
fn record_mutation_metrics(
state: &ServerState,
operation: &'static str,
outcome: &'static str,
workflow_type: &str,
) {
let Some(metrics) = state.metrics() else {
return;
};
metrics.deploy_operation(operation, outcome);
let Ok(engine) = state.deploy_guard().engine().map(std::sync::Arc::clone) else {
return;
};
match engine.list_workflow_versions() {
Ok(versions) => {
let count = versions
.iter()
.filter(|info| info.workflow_type == workflow_type)
.count();
let count = i64::try_from(count).unwrap_or(i64::MAX);
metrics.set_loaded_workflow_versions(workflow_type, count);
}
Err(error) => {
tracing::warn!(
operation,
workflow_type,
%error,
"post-operation version listing failed; loaded-version gauge not updated"
);
}
}
}