use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use aion::signal::ConcreteSignalRouter;
use aion::{Engine, EngineBuilder, RuntimeHandle, SignalRouter};
use aion_core::{Payload, RunId, WorkflowId};
use aion_package::{
BeamModule, BeamSet, CURRENT_FORMAT_VERSION, ExtractionLimits, Manifest, ManifestVersion,
Package, PackageBuilder,
};
use aion_proto::{
ProtoListVersionsResponse, ProtoLoadPackageResponse, ProtoWireError, WireError, WireErrorCode,
generated,
};
use aion_server::api::http::http_router;
use aion_server::config::{
AuthConfig, DashboardAssetSource, DashboardConfig, DeployConfig, ListenConfig, MetricsConfig,
NamespaceConfig, NamespaceMode, RuntimeConfig, WebSocketConfig, WorkerConfig,
};
use aion_server::{NamespaceResolver, ServerState};
use axum::{body, http::Request, http::StatusCode, response::Response};
use prost::Message as _;
use serde_json::json;
use tower::ServiceExt;
type TestError = Box<dyn std::error::Error>;
const RELOAD_MODULE: &str = "aion_reload_fixture";
const NAMESPACE: &str = "default";
const MAX_ARCHIVE_BYTES: u64 = 1_048_576;
const MAX_INFLATED_BYTES: u64 = 2_097_152;
fn compile_reload_beam(version: u32) -> Result<Vec<u8>, TestError> {
let temp_dir = std::env::temp_dir().join(format!("aion-deploy-e2e-{}", uuid::Uuid::new_v4()));
std::fs::create_dir(&temp_dir)?;
let source_path = temp_dir.join(format!("{RELOAD_MODULE}.erl"));
let beam_path = temp_dir.join(format!("{RELOAD_MODULE}.beam"));
std::fs::write(
&source_path,
format!(
"-module({RELOAD_MODULE}).\n\
-export([run/1, park/1]).\n\
run(_Input) -> {version}.\n\
park(_Input) -> receive _Any -> {version} end.\n"
),
)?;
let status = Command::new("erlc")
.arg("-o")
.arg(&temp_dir)
.arg(&source_path)
.status()?;
if !status.success() {
let cleanup = std::fs::remove_dir_all(&temp_dir);
drop(cleanup);
return Err(format!("erlc failed with status {status}").into());
}
let bytes = std::fs::read(beam_path)?;
std::fs::remove_dir_all(temp_dir)?;
Ok(bytes)
}
fn archive_bytes(beam: &[u8], entry_function: &str) -> Result<Vec<u8>, TestError> {
let beams = BeamSet::new(vec![BeamModule::new(RELOAD_MODULE, beam.to_vec())])?;
let manifest = Manifest {
entry_module: RELOAD_MODULE.to_owned(),
entry_function: entry_function.to_owned(),
input_schema: json!({ "type": "object" }),
output_schema: json!({ "type": "integer" }),
timeout: Duration::from_secs(30),
activities: vec![],
version: ManifestVersion::new("test"),
format_version: CURRENT_FORMAT_VERSION,
};
Ok(PackageBuilder::new(manifest, beams).write_to_bytes()?)
}
fn content_hash_of(archive: &[u8]) -> Result<String, TestError> {
Ok(
Package::load_from_bytes(archive, ExtractionLimits::unbounded())?
.content_hash()
.to_string(),
)
}
fn runtime_config(deploy: DeployConfig) -> RuntimeConfig {
RuntimeConfig {
listen: ListenConfig {
grpc: std::net::SocketAddr::from(([127, 0, 0, 1], 0)),
http: std::net::SocketAddr::from(([127, 0, 0, 1], 0)),
},
tls: None,
auth: AuthConfig {
enabled: false,
jwks_url: None,
jwks_refresh_seconds: 300,
},
dashboard: DashboardConfig {
source: DashboardAssetSource::Embedded,
},
namespace: NamespaceConfig {
mode: NamespaceMode::SharedEngine,
},
worker: WorkerConfig {
heartbeat_window: Duration::from_millis(30_000),
},
websocket: WebSocketConfig {
outbound_buffer_bound: 32,
event_broadcast_capacity: Some(64),
},
workflow_packages: Vec::new(),
deploy,
scheduler_threads: 1,
query_timeout: Some(Duration::from_millis(10_000)),
default_namespace: NAMESPACE.to_owned(),
drain_timeout: Duration::from_secs(30),
metrics: MetricsConfig { enabled: true },
}
}
fn enabled_deploy() -> DeployConfig {
DeployConfig {
enabled: true,
max_archive_bytes: Some(MAX_ARCHIVE_BYTES),
max_inflated_bytes: Some(MAX_INFLATED_BYTES),
}
}
async fn engine_state(deploy: DeployConfig) -> Result<(Arc<Engine>, ServerState), TestError> {
let mut search_attribute_schema = aion_core::SearchAttributeSchema::new();
search_attribute_schema.register(
aion_server::NAMESPACE_ATTRIBUTE,
aion_core::SearchAttributeType::String,
)?;
let engine = Arc::new(
EngineBuilder::new()
.store_arc(
Arc::new(aion_store::InMemoryStore::default()) as Arc<dyn aion_store::EventStore>
)
.in_memory_visibility()
.search_attribute_schema(search_attribute_schema)
.scheduler_threads(1)
.signal_router_factory(|runtime: Arc<RuntimeHandle>, handoff| {
Arc::new(ConcreteSignalRouter::new(runtime, handoff)) as Arc<dyn SignalRouter>
})
.build()
.await?,
);
let resolver = NamespaceResolver::from_config(
NamespaceConfig {
mode: NamespaceMode::SharedEngine,
},
Arc::clone(&engine),
);
let state = ServerState::from_parts(resolver, runtime_config(deploy));
Ok((engine, state))
}
fn deploy_headers(builder: axum::http::request::Builder) -> axum::http::request::Builder {
builder
.header("x-aion-subject", "ci")
.header("x-aion-namespaces", NAMESPACE)
.header("x-aion-deploy", "true")
}
fn post_archive(archive: Vec<u8>) -> Result<Request<body::Body>, TestError> {
Ok(deploy_headers(
Request::builder()
.uri("/deploy/packages")
.method("POST")
.header("content-type", "application/octet-stream"),
)
.body(body::Body::from(archive))?)
}
fn post_json(uri: &str, value: &serde_json::Value) -> Result<Request<body::Body>, TestError> {
Ok(deploy_headers(
Request::builder()
.uri(uri)
.method("POST")
.header("content-type", "application/json"),
)
.body(body::Body::from(serde_json::to_vec(value)?))?)
}
fn get_versions() -> Result<Request<body::Body>, TestError> {
Ok(
deploy_headers(Request::builder().uri("/deploy/versions").method("GET"))
.body(body::Body::empty())?,
)
}
async fn read_json<T>(response: Response) -> Result<T, TestError>
where
T: serde::de::DeserializeOwned,
{
let bytes = body::to_bytes(response.into_body(), usize::MAX).await?;
Ok(serde_json::from_slice(&bytes)?)
}
async fn start_over_http(router: &axum::Router) -> Result<(WorkflowId, RunId), TestError> {
let request = deploy_headers(
Request::builder()
.uri("/workflows/start")
.method("POST")
.header("content-type", "application/json"),
)
.body(body::Body::from(serde_json::to_vec(&json!({
"namespace": NAMESPACE,
"workflow_type": RELOAD_MODULE,
"input": { "reload": true },
}))?))?;
let response = router.clone().oneshot(request).await?;
assert_eq!(response.status(), StatusCode::OK, "start must succeed");
let body: serde_json::Value = read_json(response).await?;
let workflow_id = body["workflow_id"]["uuid"]
.as_str()
.ok_or("start response missing workflow id")?
.parse::<uuid::Uuid>()?;
let run_id = body["run_id"]["uuid"]
.as_str()
.ok_or("start response missing run id")?
.parse::<uuid::Uuid>()?;
Ok((WorkflowId::new(workflow_id), RunId::new(run_id)))
}
async fn result_int(engine: &Engine, id: &WorkflowId, run: &RunId) -> Result<i64, TestError> {
let payload = engine
.result(id, run)
.await?
.map_err(|error| format!("workflow failed: {error:?}"))?;
let value: serde_json::Value = serde_json::from_slice(payload.bytes())?;
value
.as_i64()
.ok_or_else(|| format!("expected integer result, got {value}").into())
}
fn granted<T>(message: T) -> Result<tonic::Request<T>, TestError> {
let mut request = tonic::Request::new(message);
request
.metadata_mut()
.insert("x-aion-subject", "ci".parse()?);
request
.metadata_mut()
.insert("x-aion-deploy", "true".parse()?);
Ok(request)
}
async fn post_route(
router: &axum::Router,
content_hash: &str,
expected: StatusCode,
) -> Result<Response, TestError> {
let response = router
.clone()
.oneshot(post_json(
"/deploy/route",
&json!({ "workflow_type": RELOAD_MODULE, "content_hash": content_hash }),
)?)
.await?;
assert_eq!(response.status(), expected);
Ok(response)
}
async fn post_unload(
router: &axum::Router,
content_hash: &str,
expected: StatusCode,
) -> Result<Response, TestError> {
let response = router
.clone()
.oneshot(post_json(
"/deploy/unload",
&json!({ "workflow_type": RELOAD_MODULE, "content_hash": content_hash }),
)?)
.await?;
assert_eq!(response.status(), expected);
Ok(response)
}
fn route_active_hashes(listing: &ProtoListVersionsResponse) -> Vec<&str> {
listing
.versions
.iter()
.filter(|version| version.route_active)
.map(|version| version.content_hash.as_str())
.collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http_deploy_lifecycle_over_a_running_engine() -> Result<(), TestError> {
let (engine, state) = engine_state(enabled_deploy()).await?;
let router = http_router(state)?;
let beam_v1 = compile_reload_beam(1)?;
let beam_v2 = compile_reload_beam(2)?;
let v1 = archive_bytes(&beam_v1, "run")?;
let v2 = archive_bytes(&beam_v2, "run")?;
let v1_hash = content_hash_of(&v1)?;
let v2_hash = content_hash_of(&v2)?;
let response = router.clone().oneshot(post_archive(v1.clone())?).await?;
assert_eq!(response.status(), StatusCode::OK);
let loaded: ProtoLoadPackageResponse = read_json(response).await?;
assert_eq!(loaded.workflow_type, RELOAD_MODULE);
assert_eq!(loaded.content_hash, v1_hash);
assert!(loaded.freshly_loaded);
assert!(loaded.route_changed);
let (id, run) = start_over_http(&router).await?;
assert_eq!(result_int(&engine, &id, &run).await?, 1);
let again: ProtoLoadPackageResponse =
read_json(router.clone().oneshot(post_archive(v1.clone())?).await?).await?;
assert!(!again.freshly_loaded);
assert!(!again.route_changed);
let loaded_v2: ProtoLoadPackageResponse =
read_json(router.clone().oneshot(post_archive(v2.clone())?).await?).await?;
assert!(loaded_v2.freshly_loaded);
assert!(loaded_v2.route_changed);
let (id, run) = start_over_http(&router).await?;
assert_eq!(result_int(&engine, &id, &run).await?, 2);
let listing: ProtoListVersionsResponse =
read_json(router.clone().oneshot(get_versions()?).await?).await?;
assert_eq!(listing.versions.len(), 2);
assert_eq!(route_active_hashes(&listing), vec![v2_hash.as_str()]);
post_route(&router, &v1_hash, StatusCode::OK).await?;
let listing: ProtoListVersionsResponse =
read_json(router.clone().oneshot(get_versions()?).await?).await?;
assert_eq!(route_active_hashes(&listing), vec![v1_hash.as_str()]);
let (id, run) = start_over_http(&router).await?;
assert_eq!(result_int(&engine, &id, &run).await?, 1);
let redeployed: ProtoLoadPackageResponse =
read_json(router.clone().oneshot(post_archive(v2.clone())?).await?).await?;
assert!(!redeployed.freshly_loaded);
assert!(redeployed.route_changed);
engine.shutdown()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http_unload_refusals_success_and_manifest_mismatch() -> Result<(), TestError> {
let (engine, state) = engine_state(enabled_deploy()).await?;
let router = http_router(state)?;
let beam_v2 = compile_reload_beam(2)?;
let v2 = archive_bytes(&beam_v2, "run")?;
let v2_hash = content_hash_of(&v2)?;
let loaded: ProtoLoadPackageResponse =
read_json(router.clone().oneshot(post_archive(v2)?).await?).await?;
assert!(loaded.freshly_loaded);
let response = post_unload(&router, &v2_hash, StatusCode::CONFLICT).await?;
let error: WireError = read_json(response).await?;
assert_eq!(error.code, WireErrorCode::VersionPinned);
assert_eq!(error.error_type.as_deref(), Some("RouteActive"));
let beam_v3 = compile_reload_beam(3)?;
let v3 = archive_bytes(&beam_v3, "park")?;
let v3_hash = content_hash_of(&v3)?;
let parked: ProtoLoadPackageResponse =
read_json(router.clone().oneshot(post_archive(v3)?).await?).await?;
assert!(parked.freshly_loaded);
let (parked_id, parked_run) = start_over_http(&router).await?;
post_route(&router, &v2_hash, StatusCode::OK).await?;
let response = post_unload(&router, &v3_hash, StatusCode::CONFLICT).await?;
let error: WireError = read_json(response).await?;
assert_eq!(error.code, WireErrorCode::VersionPinned);
assert!(
error.message.contains(&parked_id.to_string()),
"pin refusal must name the live run: {}",
error.message
);
engine
.signal(
&parked_id,
&parked_run,
"release",
Payload::from_json(&json!({}))?,
)
.await?;
assert_eq!(result_int(&engine, &parked_id, &parked_run).await?, 3);
post_unload(&router, &v3_hash, StatusCode::OK).await?;
let listing: ProtoListVersionsResponse =
read_json(router.clone().oneshot(get_versions()?).await?).await?;
assert!(
listing
.versions
.iter()
.all(|version| version.content_hash != v3_hash),
"unloaded version must leave the listing"
);
let response = post_route(&router, &v3_hash, StatusCode::NOT_FOUND).await?;
let error: WireError = read_json(response).await?;
assert_eq!(error.code, WireErrorCode::NotFound);
assert_eq!(error.error_type.as_deref(), Some("UnknownVersion"));
let mismatched = archive_bytes(&beam_v2, "park")?;
assert_eq!(content_hash_of(&mismatched)?, v2_hash);
let response = router.clone().oneshot(post_archive(mismatched)?).await?;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let error: WireError = read_json(response).await?;
assert_eq!(error.code, WireErrorCode::InvalidInput);
assert_eq!(error.error_type.as_deref(), Some("ManifestMismatch"));
let (id, run) = start_over_http(&router).await?;
assert_eq!(result_int(&engine, &id, &run).await?, 2);
engine.shutdown()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http_drain_refuses_mutations_but_serves_listing() -> Result<(), TestError> {
let (engine, state) = engine_state(enabled_deploy()).await?;
assert!(state.drain_state().begin());
let router = http_router(state)?;
let response = router
.clone()
.oneshot(post_json(
"/deploy/route",
&json!({ "workflow_type": RELOAD_MODULE, "content_hash": "a".repeat(64) }),
)?)
.await?;
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
let error: WireError = read_json(response).await?;
assert!(
error.message.contains("draining"),
"drain refusal must be explicit: {}",
error.message
);
let listing = router.clone().oneshot(get_versions()?).await?;
assert_eq!(listing.status(), StatusCode::OK);
engine.shutdown()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn deploy_metrics_count_operations_and_denials() -> Result<(), TestError> {
let mut config = runtime_config(enabled_deploy());
config.metrics = MetricsConfig { enabled: true };
let state = ServerState::build_with_store(aion_store::InMemoryStore::default(), config).await?;
let router = http_router(state)?;
let beam = compile_reload_beam(1)?;
let archive = archive_bytes(&beam, "run")?;
let response = router.clone().oneshot(post_archive(archive)?).await?;
assert_eq!(response.status(), StatusCode::OK);
let denied = router
.clone()
.oneshot(
Request::builder()
.uri("/deploy/versions")
.method("GET")
.header("x-aion-subject", "ci")
.body(body::Body::empty())?,
)
.await?;
assert_eq!(denied.status(), StatusCode::FORBIDDEN);
let refused = router
.clone()
.oneshot(post_json(
"/deploy/route",
&json!({ "workflow_type": "missing", "content_hash": "a".repeat(64) }),
)?)
.await?;
assert_eq!(refused.status(), StatusCode::NOT_FOUND);
let metrics = router
.clone()
.oneshot(
Request::builder()
.uri("/metrics")
.body(body::Body::empty())?,
)
.await?;
assert_eq!(metrics.status(), StatusCode::OK);
let bytes = body::to_bytes(metrics.into_body(), usize::MAX).await?;
let text = String::from_utf8(bytes.to_vec())?;
assert!(
text.contains(
"aion_deploy_operations_total{operation=\"deploy.load\",outcome=\"loaded\"} 1"
),
"load counter must record the loaded outcome: {text}"
);
assert!(
text.contains("aion_deploy_denied_total{transport=\"http\"} 1"),
"denial counter must record the http denial: {text}"
);
assert!(
text.contains(
"aion_deploy_operations_total{operation=\"deploy.route\",outcome=\"not_found\"} 1"
),
"refusal counter must record the refusal class: {text}"
);
assert!(
text.contains(&format!(
"aion_loaded_workflow_versions{{workflow_type=\"{RELOAD_MODULE}\"}} 1"
)),
"loaded-version gauge must track the listing: {text}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn grpc_unload_route_active_carries_version_pinned_detail() -> Result<(), TestError> {
use aion_proto::generated::deploy_service_client::DeployServiceClient;
use tokio_stream::wrappers::TcpListenerStream;
let (engine, state) = engine_state(enabled_deploy()).await?;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let address = listener.local_addr()?;
let deploy = aion_server::api::deploy_grpc::deploy_service(state.clone())?;
let server = tokio::spawn(
tonic::transport::Server::builder()
.add_service(deploy)
.serve_with_incoming(TcpListenerStream::new(listener)),
);
let channel = tonic::transport::Endpoint::try_from(format!("http://{address}"))?
.connect()
.await?;
let mut client = DeployServiceClient::new(channel);
let beam = compile_reload_beam(1)?;
let archive = archive_bytes(&beam, "run")?;
let hash = content_hash_of(&archive)?;
let loaded = client
.load_package(granted(generated::LoadPackageRequest { archive })?)
.await?
.into_inner();
assert!(loaded.freshly_loaded);
let beam_archive = archive_bytes(&beam, "run")?;
let again = client
.load_package(granted(generated::LoadPackageRequest {
archive: beam_archive,
})?)
.await?
.into_inner();
assert!(!again.freshly_loaded);
assert!(!again.route_changed);
let listing = client
.list_versions(granted(generated::ListVersionsRequest {})?)
.await?
.into_inner();
assert_eq!(listing.versions.len(), 1);
assert!(listing.versions[0].route_active);
let status = client
.unload_version(granted(generated::UnloadVersionRequest {
workflow_type: RELOAD_MODULE.to_owned(),
content_hash: hash,
})?)
.await
.err()
.ok_or("expected route-active unload refusal")?;
assert_eq!(status.code(), tonic::Code::FailedPrecondition);
let detail = WireError::try_from(ProtoWireError::decode(status.details())?)?;
assert_eq!(detail.code, WireErrorCode::VersionPinned);
assert_eq!(detail.error_type.as_deref(), Some("RouteActive"));
let mut request = tonic::Request::new(generated::ListVersionsRequest {});
request
.metadata_mut()
.insert("x-aion-subject", "ci".parse()?);
let status = client
.list_versions(request)
.await
.err()
.ok_or("expected deploy denial")?;
assert_eq!(status.code(), tonic::Code::PermissionDenied);
let detail = WireError::try_from(ProtoWireError::decode(status.details())?)?;
assert_eq!(detail.code, WireErrorCode::DeployDenied);
engine.shutdown()?;
server.abort();
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn disabled_grpc_deploy_surface_is_unimplemented() -> Result<(), TestError> {
use aion_proto::generated::deploy_service_client::DeployServiceClient;
use tokio_stream::wrappers::TcpListenerStream;
let (engine, state) = engine_state(DeployConfig::default()).await?;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let address = listener.local_addr()?;
let workflow = aion_server::api::grpc::workflow_service(state);
let server = tokio::spawn(
tonic::transport::Server::builder()
.add_service(workflow)
.serve_with_incoming(TcpListenerStream::new(listener)),
);
let channel = tonic::transport::Endpoint::try_from(format!("http://{address}"))?
.connect()
.await?;
let mut client = DeployServiceClient::new(channel);
let status = client
.list_versions(tonic::Request::new(generated::ListVersionsRequest {}))
.await
.err()
.ok_or("expected unimplemented service")?;
assert_eq!(status.code(), tonic::Code::Unimplemented);
engine.shutdown()?;
server.abort();
Ok(())
}
fn bomb_archive() -> Result<Vec<u8>, TestError> {
use std::io::Write as _;
let manifest = Manifest {
entry_module: RELOAD_MODULE.to_owned(),
entry_function: "run".to_owned(),
input_schema: json!({ "type": "object" }),
output_schema: json!({ "type": "integer" }),
timeout: Duration::from_secs(30),
activities: vec![],
version: ManifestVersion::new("irrelevant-never-reached"),
format_version: CURRENT_FORMAT_VERSION,
};
let manifest_bytes = serde_json::to_vec(&manifest)?;
let cursor = std::io::Cursor::new(Vec::new());
let mut archive = zip::ZipWriter::new(cursor);
let options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Deflated);
archive.start_file("manifest.json", options)?;
archive.write_all(&manifest_bytes)?;
archive.start_file(format!("beam/{RELOAD_MODULE}.beam"), options)?;
archive.write_all(&vec![0_u8; 8 * 1024 * 1024])?;
Ok(archive.finish()?.into_inner())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn http_inflate_bomb_is_413_naming_max_inflated_bytes() -> Result<(), TestError> {
let config = runtime_config(enabled_deploy());
let state = ServerState::build_with_store(aion_store::InMemoryStore::default(), config).await?;
let router = http_router(state)?;
let bomb = bomb_archive()?;
assert!(
u64::try_from(bomb.len())? < MAX_ARCHIVE_BYTES,
"bomb must pass the upload ceiling to exercise the inflate ceiling: {} bytes",
bomb.len()
);
let response = router.clone().oneshot(post_archive(bomb)?).await?;
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
let error: WireError = read_json(response).await?;
assert_eq!(error.code, WireErrorCode::InvalidInput);
assert!(
error.message.contains("deploy.max_inflated_bytes"),
"413 must name the inflate config key: {}",
error.message
);
let listing: ProtoListVersionsResponse =
read_json(router.clone().oneshot(get_versions()?).await?).await?;
assert!(listing.versions.is_empty());
let metrics = router
.clone()
.oneshot(
Request::builder()
.uri("/metrics")
.body(body::Body::empty())?,
)
.await?;
let bytes = body::to_bytes(metrics.into_body(), usize::MAX).await?;
let text = String::from_utf8(bytes.to_vec())?;
assert!(
text.contains(
"aion_deploy_operations_total{operation=\"deploy.load\",outcome=\"invalid_input\"} 1"
),
"inflate refusal must be recorded as a refused load: {text}"
);
Ok(())
}