use aion_core::{ActivityError, Payload};
use crate::{EngineError, Pid, RuntimeHandle, RuntimeInput};
pub fn dispatch_activity(
runtime: &RuntimeHandle,
parent_pid: Pid,
deployed_module: &str,
function: &str,
input: &Payload,
) -> Result<Pid, EngineError> {
runtime.spawn_activity(
parent_pid,
deployed_module,
function,
RuntimeInput::from_payload(input)?,
)
}
pub fn propagate_activity_outcome(
runtime: &RuntimeHandle,
parent_pid: Pid,
activity_pid: Pid,
) -> Result<(), EngineError> {
runtime.propagate_activity_outcome(parent_pid, activity_pid)
}
pub fn surface_activity_error(
runtime: &RuntimeHandle,
parent_pid: Pid,
activity_pid: Pid,
error: ActivityError,
) -> Result<(), EngineError> {
runtime.deliver_activity_error(parent_pid, activity_pid, error)
}
#[cfg(test)]
mod tests {
use aion_core::{ActivityErrorKind, ContentType};
use serde_json::json;
use super::{dispatch_activity, propagate_activity_outcome, surface_activity_error};
use crate::runtime::RuntimeConfig;
use crate::{EngineError, RuntimeHandle};
fn runtime() -> Result<RuntimeHandle, EngineError> {
RuntimeHandle::new(RuntimeConfig::new(Some(1)))
}
fn payload() -> Result<aion_core::Payload, aion_core::PayloadError> {
aion_core::Payload::from_json(&json!(null))
}
fn fixture_workflow_beam() -> &'static [u8] {
include_bytes!("../../tests/fixtures/aion_fixture_workflow.beam")
}
#[test]
fn dispatch_spawns_linked_child_and_uses_dirty_registration()
-> Result<(), Box<dyn std::error::Error>> {
let runtime = runtime()?;
runtime.install_test_activity_nif("activity_host", "answer", true, true)?;
runtime.register_native_call_module_for_test(
"activity_mod",
"run",
"activity_host",
"answer",
);
let workflow = runtime.spawn_test_process_with_trap_exit(true)?;
let activity = dispatch_activity(&runtime, workflow, "activity_mod", "run", &payload()?)?;
if let Ok(linked) = runtime.is_linked(workflow, activity) {
assert!(linked);
}
assert!(runtime.is_dirty("activity_host", "answer"));
runtime.shutdown()?;
Ok(())
}
#[test]
fn successful_activity_result_is_delivered_to_workflow()
-> Result<(), Box<dyn std::error::Error>> {
let runtime = runtime()?;
runtime.install_test_activity_nif("activity_host", "answer", false, true)?;
runtime.register_native_call_module_for_test(
"activity_ok",
"run",
"activity_host",
"answer",
);
let workflow = runtime.spawn_test_process_with_trap_exit(true)?;
let activity = dispatch_activity(&runtime, workflow, "activity_ok", "run", &payload()?)?;
propagate_activity_outcome(&runtime, workflow, activity)?;
let result = runtime.activity_result(workflow, activity);
assert_eq!(result, Some(aion_core::Payload::from_json(&json!(42))?));
runtime.shutdown()?;
Ok(())
}
#[test]
fn failing_activity_surfaces_typed_error_with_trapped_exit()
-> Result<(), Box<dyn std::error::Error>> {
let runtime = runtime()?;
runtime.register_module("aion_fixture_workflow", fixture_workflow_beam())?;
let workflow = runtime.spawn_test_process_with_trap_exit(true)?;
let activity = dispatch_activity(
&runtime,
workflow,
"aion_fixture_workflow",
"activity",
&payload()?,
)?;
assert!(runtime.is_linked(workflow, activity)?);
let details = aion_core::Payload::new(ContentType::Json, br#"{"code":"boom"}"#.to_vec());
let error = aion_core::ActivityError {
kind: ActivityErrorKind::Retryable,
message: String::from("boom"),
details: Some(details),
};
surface_activity_error(&runtime, workflow, activity, error.clone())?;
runtime.wait_for_process_ready(workflow)?;
runtime.wait_for_process_ready(activity)?;
runtime.terminate_test_process_with_error(activity)?;
propagate_activity_outcome(&runtime, workflow, activity)?;
runtime.wait_for_trapped_exit(workflow, activity)?;
assert!(runtime.has_trapped_exit_message(workflow, activity)?);
assert_eq!(runtime.activity_error(workflow, activity), Some(error));
runtime.shutdown()?;
Ok(())
}
}