use std::sync::Arc;
use aion_core::{Event, WorkflowId};
use aion_store::EventStore;
use aion_store::visibility::VisibilityStore;
use tokio::runtime::Handle;
use crate::EngineError;
use crate::durability::DurabilityError;
use crate::engine_seam::{
ChildWorkflowSpawnRequest, ChildWorkflowSpawnResult, EngineHandle, EngineSeamError,
TimerWheelEntry, WorkflowMailboxMessage, WorkflowProcessHandle, WorkflowResidency,
};
use crate::lifecycle::start::{
StartWorkflowContext, StartWorkflowOptions, start_workflow_with_options,
};
use crate::loader::LoadedWorkflows;
use crate::registry::{HandleResidency, Registry, WorkflowHandle};
use crate::runtime::nif_child_tasks::ChildTaskRuntime;
use crate::runtime::{RuntimeHandle, SignalDeliveryConfig};
use crate::signal::SignalResumeHandoff;
use crate::supervision::SupervisionTree;
pub(crate) struct ChildNifBridge {
store: Arc<dyn EventStore>,
visibility_store: Arc<dyn VisibilityStore>,
runtime: Arc<RuntimeHandle>,
loaded_workflows: LoadedWorkflows,
registry: Arc<Registry>,
supervision: Arc<SupervisionTree>,
signal_handoff: Arc<SignalResumeHandoff>,
search_attribute_schema: Arc<aion_core::SearchAttributeSchema>,
tokio_handle: Handle,
child_tasks: Arc<ChildTaskRuntime>,
watch_backoff: SignalDeliveryConfig,
}
pub(crate) struct ChildNifBridgeParts {
pub(crate) store: Arc<dyn EventStore>,
pub(crate) visibility_store: Arc<dyn VisibilityStore>,
pub(crate) runtime: Arc<RuntimeHandle>,
pub(crate) loaded_workflows: LoadedWorkflows,
pub(crate) registry: Arc<Registry>,
pub(crate) supervision: Arc<SupervisionTree>,
pub(crate) signal_handoff: Arc<SignalResumeHandoff>,
pub(crate) search_attribute_schema: Arc<aion_core::SearchAttributeSchema>,
pub(crate) tokio_handle: Handle,
pub(crate) watch_backoff: SignalDeliveryConfig,
}
impl ChildNifBridge {
pub(crate) fn new(parts: ChildNifBridgeParts) -> Result<Self, EngineError> {
let ChildNifBridgeParts {
store,
visibility_store,
runtime,
loaded_workflows,
registry,
supervision,
signal_handoff,
search_attribute_schema,
tokio_handle,
watch_backoff,
} = parts;
Ok(Self {
store,
visibility_store,
runtime,
loaded_workflows,
registry,
supervision,
signal_handoff,
search_attribute_schema,
tokio_handle,
child_tasks: Arc::new(ChildTaskRuntime::new()?),
watch_backoff,
})
}
pub(crate) fn registry(&self) -> &Registry {
self.registry.as_ref()
}
pub(crate) fn registry_arc(&self) -> Arc<Registry> {
Arc::clone(&self.registry)
}
pub(crate) fn store(&self) -> Arc<dyn EventStore> {
Arc::clone(&self.store)
}
pub(crate) fn runtime(&self) -> Arc<RuntimeHandle> {
Arc::clone(&self.runtime)
}
pub(crate) fn tokio_handle(&self) -> Handle {
self.tokio_handle.clone()
}
pub(crate) fn child_tasks(&self) -> Arc<ChildTaskRuntime> {
Arc::clone(&self.child_tasks)
}
pub(crate) fn watch_backoff(&self) -> SignalDeliveryConfig {
self.watch_backoff
}
pub(crate) fn abort_child_terminal_watches_for_parent(&self, parent_pid: u64) {
self.child_tasks.abort_watches_for_parent(parent_pid);
}
pub(crate) fn shutdown_child_tasks(&self) {
self.child_tasks.shutdown();
}
pub(super) async fn start_child_under_recorded_id(
&self,
parent_workflow_id: &WorkflowId,
request: ChildWorkflowSpawnRequest,
) -> Result<WorkflowHandle, EngineError> {
let parent_history = self.store.read_history(parent_workflow_id).await?;
let inherited = aion_core::search_attributes_from_events(&parent_history);
start_workflow_with_options(
StartWorkflowContext {
store: Arc::clone(&self.store),
visibility_store: Arc::clone(&self.visibility_store),
loaded_workflows: &self.loaded_workflows,
runtime: Arc::clone(&self.runtime),
supervision: Arc::clone(&self.supervision),
registry: Arc::clone(&self.registry),
signal_handoff: Some(Arc::clone(&self.signal_handoff)),
search_attribute_schema: Arc::clone(&self.search_attribute_schema),
},
&request.workflow_type,
request.input,
StartWorkflowOptions {
workflow_id: Some(request.child_workflow_id),
search_attributes: inherited,
..StartWorkflowOptions::default()
},
)
.await
}
}
pub(crate) struct NifChildEngine {
bridge: Arc<ChildNifBridge>,
parent: WorkflowHandle,
}
impl NifChildEngine {
#[must_use]
pub(crate) fn new(bridge: Arc<ChildNifBridge>, parent: WorkflowHandle) -> Self {
Self { bridge, parent }
}
}
impl EngineHandle for NifChildEngine {
fn resolve_workflow(
&self,
workflow_id: &WorkflowId,
) -> Result<WorkflowResidency, EngineSeamError> {
let handle = self
.bridge
.registry
.list()
.map_err(|error| EngineSeamError::Delivery {
reason: error.to_string(),
})?
.into_iter()
.find(|handle| handle.workflow_id() == workflow_id);
match handle {
Some(handle) if handle.residency() == HandleResidency::Resident => Ok(
WorkflowResidency::Resident(WorkflowProcessHandle::new(handle.pid())),
),
Some(_) => Ok(WorkflowResidency::NonResident),
None => Ok(WorkflowResidency::Unknown),
}
}
fn deliver_workflow_message(
&self,
process: WorkflowProcessHandle,
message: WorkflowMailboxMessage,
) -> Result<(), EngineSeamError> {
match message {
WorkflowMailboxMessage::SignalReceived { .. } => self
.bridge
.runtime
.deliver_signal_received(process.pid())
.map_err(|error| EngineSeamError::Delivery {
reason: error.to_string(),
}),
other => Err(EngineSeamError::Delivery {
reason: format!("unsupported child NIF message: {other:?}"),
}),
}
}
fn spawn_child_workflow(
&self,
request: ChildWorkflowSpawnRequest,
) -> Result<ChildWorkflowSpawnResult, EngineSeamError> {
let parent_workflow_id = self.parent.workflow_id().clone();
let child = self
.bridge
.tokio_handle
.block_on(
self.bridge
.start_child_under_recorded_id(&parent_workflow_id, request),
)
.map_err(|error| EngineSeamError::ChildSpawn {
reason: error.to_string(),
})?;
Ok(ChildWorkflowSpawnResult {
child_workflow_id: child.workflow_id().clone(),
child_process: WorkflowProcessHandle::new(child.pid()),
})
}
fn terminate_linked_child_workflow(
&self,
_parent_workflow_id: &WorkflowId,
child_process: WorkflowProcessHandle,
_correlation: u64,
) -> Result<(), EngineSeamError> {
self.bridge
.runtime
.cancel_pid(child_process.pid())
.map_err(|error| EngineSeamError::ChildTermination {
reason: error.to_string(),
})
}
fn terminate_linked_activity(
&self,
_parent_workflow_id: &WorkflowId,
activity_process: crate::Pid,
_correlation: u64,
) -> Result<(), EngineSeamError> {
self.bridge
.runtime
.cancel_pid(activity_process)
.map_err(|error| EngineSeamError::ChildTermination {
reason: error.to_string(),
})
}
fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError> {
let _ = entry;
Err(EngineSeamError::TimerWheel {
reason: "child NIF engine cannot arm timers".to_owned(),
})
}
fn disarm_timer(
&self,
process: WorkflowProcessHandle,
timer_id: &aion_core::TimerId,
) -> Result<(), EngineSeamError> {
let _ = (process, timer_id);
Err(EngineSeamError::TimerWheel {
reason: "child NIF engine cannot disarm timers".to_owned(),
})
}
fn record_workflow_event(
&self,
workflow_id: &WorkflowId,
event: Event,
) -> Result<(), EngineSeamError> {
if workflow_id != self.parent.workflow_id() {
return Err(EngineSeamError::Recorder {
reason: format!("cannot record child event for unrelated workflow {workflow_id}"),
});
}
record_child_event(&self.bridge.tokio_handle, &self.parent, event)
}
}
fn record_child_event(
tokio_handle: &Handle,
parent: &WorkflowHandle,
event: Event,
) -> Result<(), EngineSeamError> {
let recorder = parent.recorder();
tokio_handle
.block_on(async {
let mut recorder = recorder.lock().await;
match event {
Event::ChildWorkflowStarted {
child_workflow_id,
workflow_type,
input,
envelope,
} => {
recorder
.record_child_workflow_started(
envelope.recorded_at,
child_workflow_id,
workflow_type,
input,
)
.await
}
Event::ChildWorkflowCompleted {
child_workflow_id,
result,
envelope,
} => {
recorder
.record_child_workflow_completed(
envelope.recorded_at,
child_workflow_id,
result,
)
.await
}
Event::ChildWorkflowFailed {
child_workflow_id,
error,
envelope,
} => {
recorder
.record_child_workflow_failed(
envelope.recorded_at,
child_workflow_id,
error,
)
.await
}
other => Err(DurabilityError::HistoryShape {
reason: format!("child NIF cannot record non-child event: {other:?}"),
}),
}
})
.map_err(|error| EngineSeamError::Recorder {
reason: error.to_string(),
})
}