use aion_core::{Event, Payload, RunId, WorkflowId};
use async_trait::async_trait;
use futures::stream::{self, BoxStream};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::{Engine, EngineError, SignalRouterError, WorkflowHandle};
use super::api::workflow_not_found;
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct EventFilter {
pub workflow_id: Option<WorkflowId>,
pub run: Option<RunId>,
pub family: Option<EventFamily>,
}
impl EventFilter {
#[must_use]
pub fn matches(&self, event: &Event) -> bool {
self.workflow_id
.as_ref()
.is_none_or(|workflow_id| event.workflow_id() == workflow_id)
&& self
.family
.is_none_or(|family| family == event_family(event))
}
}
#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
pub enum EventFamily {
Workflow,
Activity,
Timer,
Signal,
ChildWorkflow,
Schedule,
}
#[async_trait]
pub trait SignalRouter: Send + Sync {
async fn route(
&self,
target: &WorkflowHandle,
name: String,
payload: Payload,
) -> Result<(), EngineError>;
}
#[async_trait]
pub trait QueryService: Send + Sync {
async fn query(&self, target: &WorkflowHandle, name: String) -> Result<Payload, EngineError>;
}
#[derive(thiserror::Error, Clone, Copy, Debug, PartialEq, Eq)]
#[error("event subscription lagged behind the live stream and skipped {skipped} events")]
pub struct EventStreamLagged {
pub skipped: u64,
}
pub trait EventPublisher: Send + Sync {
fn subscribe(
&self,
filter: EventFilter,
) -> BoxStream<'static, Result<Event, EventStreamLagged>>;
}
#[derive(Clone)]
pub struct DelegatedSeams {
signal_router: Arc<dyn SignalRouter>,
query_service: Arc<dyn QueryService>,
event_publisher: Arc<dyn EventPublisher>,
}
impl DelegatedSeams {
#[must_use]
pub const fn new(
signal_router: Arc<dyn SignalRouter>,
query_service: Arc<dyn QueryService>,
event_publisher: Arc<dyn EventPublisher>,
) -> Self {
Self {
signal_router,
query_service,
event_publisher,
}
}
#[must_use]
pub fn signal_router(&self) -> &dyn SignalRouter {
self.signal_router.as_ref()
}
#[must_use]
pub fn query_service(&self) -> &dyn QueryService {
self.query_service.as_ref()
}
#[must_use]
pub fn event_publisher(&self) -> &dyn EventPublisher {
self.event_publisher.as_ref()
}
pub(crate) fn signal_router_arc(&self) -> Arc<dyn SignalRouter> {
Arc::clone(&self.signal_router)
}
pub(crate) fn query_service_arc(&self) -> Arc<dyn QueryService> {
Arc::clone(&self.query_service)
}
pub(crate) fn event_publisher_arc(&self) -> Arc<dyn EventPublisher> {
Arc::clone(&self.event_publisher)
}
}
impl Default for DelegatedSeams {
fn default() -> Self {
Self::new(
Arc::new(DeferredSignalRouter),
Arc::new(DeferredQueryService),
Arc::new(DeferredEventPublisher),
)
}
}
#[derive(Debug, Default)]
pub struct DeferredSignalRouter;
#[async_trait]
impl SignalRouter for DeferredSignalRouter {
async fn route(
&self,
target: &WorkflowHandle,
name: String,
payload: Payload,
) -> Result<(), EngineError> {
let _ = (target, name, payload);
Err(EngineError::Runtime {
reason: "signal routing seam is not configured".to_owned(),
})
}
}
#[derive(Debug, Default)]
pub struct DeferredQueryService;
#[async_trait]
impl QueryService for DeferredQueryService {
async fn query(&self, target: &WorkflowHandle, name: String) -> Result<Payload, EngineError> {
let _ = (target, name);
Err(EngineError::Runtime {
reason: "query service seam is not configured".to_owned(),
})
}
}
#[derive(Debug, Default)]
pub struct DeferredEventPublisher;
impl EventPublisher for DeferredEventPublisher {
fn subscribe(
&self,
filter: EventFilter,
) -> BoxStream<'static, Result<Event, EventStreamLagged>> {
let _ = filter;
Box::pin(stream::empty())
}
}
impl Engine {
pub async fn signal(
&self,
id: &WorkflowId,
run: &RunId,
name: impl Into<String>,
payload: Payload,
) -> Result<(), EngineError> {
let handle = if let Some(handle) = self.registry().get(id, run)? {
handle
} else {
let history = self.store().read_history(id).await?;
if run_has_terminal_history(&history, run) {
return Err(SignalRouterError::Terminal {
workflow_id: id.clone(),
run_id: run.clone(),
}
.into());
}
self.handle_after_birth_window(id, run, &history)
.await?
.ok_or_else(|| workflow_not_found(id, run))?
};
self.delegated()
.signal_router()
.route(&handle, name.into(), payload)
.await
}
pub async fn query(
&self,
id: &WorkflowId,
run: &RunId,
name: impl Into<String>,
) -> Result<Payload, EngineError> {
let handle = if let Some(handle) = self.registry().get(id, run)? {
handle
} else {
let history = self.store().read_history(id).await?;
if run_has_terminal_history(&history, run) {
return Err(EngineError::Query(crate::query::QueryError::NotRunning(
id.clone(),
)));
}
self.handle_after_birth_window(id, run, &history)
.await?
.ok_or_else(|| workflow_not_found(id, run))?
};
self.delegated()
.query_service()
.query(&handle, name.into())
.await
}
pub(crate) async fn handle_after_birth_window(
&self,
id: &WorkflowId,
run: &RunId,
history: &[Event],
) -> Result<Option<WorkflowHandle>, EngineError> {
let started = history
.iter()
.any(|event| matches!(event, Event::WorkflowStarted { run_id, .. } if run_id == run));
if !started {
return Ok(None);
}
wait_for_registered_handle(self.registry(), id, run, self.runtime().signal_delivery()).await
}
#[must_use]
pub fn subscribe(
&self,
filter: EventFilter,
) -> BoxStream<'static, Result<Event, EventStreamLagged>> {
self.delegated().event_publisher().subscribe(filter)
}
}
pub(crate) fn run_has_terminal_history(history: &[Event], run: &RunId) -> bool {
let mut in_requested_run = false;
for event in history {
match event {
Event::WorkflowStarted { run_id, .. } => {
if in_requested_run {
return false;
}
in_requested_run = run_id == run;
}
Event::WorkflowCompleted { .. }
| Event::WorkflowFailed { .. }
| Event::WorkflowCancelled { .. }
| Event::WorkflowTimedOut { .. }
| Event::WorkflowContinuedAsNew { .. }
if in_requested_run =>
{
return true;
}
Event::SearchAttributesUpdated { .. }
| Event::ActivityScheduled { .. }
| Event::ActivityStarted { .. }
| Event::ActivityCompleted { .. }
| Event::ActivityFailed { .. }
| Event::ActivityCancelled { .. }
| Event::TimerStarted { .. }
| Event::TimerFired { .. }
| Event::TimerCancelled { .. }
| Event::WithTimeoutCompleted { .. }
| Event::SignalReceived { .. }
| Event::SignalSent { .. }
| Event::ChildWorkflowStarted { .. }
| Event::ChildWorkflowCompleted { .. }
| Event::ChildWorkflowFailed { .. }
| Event::ChildWorkflowCancelled { .. }
| Event::ScheduleCreated { .. }
| Event::ScheduleUpdated { .. }
| Event::SchedulePaused { .. }
| Event::ScheduleResumed { .. }
| Event::ScheduleDeleted { .. }
| Event::ScheduleTriggered { .. }
| Event::WorkflowCompleted { .. }
| Event::WorkflowFailed { .. }
| Event::WorkflowCancelled { .. }
| Event::WorkflowTimedOut { .. }
| Event::WorkflowContinuedAsNew { .. } => {}
}
}
false
}
pub(crate) async fn wait_for_registered_handle(
registry: &crate::registry::Registry,
id: &WorkflowId,
run: &RunId,
policy: crate::runtime::SignalDeliveryConfig,
) -> Result<Option<WorkflowHandle>, EngineError> {
let budget = policy
.ready_timeout
.saturating_mul(policy.max_enqueue_attempts.max(1));
let deadline = std::time::Instant::now() + budget;
let mut backoff = policy.initial_backoff;
loop {
if let Some(handle) = registry.get(id, run)? {
return Ok(Some(handle));
}
if std::time::Instant::now() >= deadline {
return Ok(None);
}
tokio::time::sleep(backoff).await;
let doubled = backoff.saturating_mul(2);
backoff = if doubled > policy.max_backoff {
policy.max_backoff
} else {
doubled
};
}
}
const fn event_family(event: &Event) -> EventFamily {
match event {
Event::WorkflowStarted { .. }
| Event::WorkflowCompleted { .. }
| Event::WorkflowFailed { .. }
| Event::WorkflowCancelled { .. }
| Event::WorkflowTimedOut { .. }
| Event::WorkflowContinuedAsNew { .. }
| Event::SearchAttributesUpdated { .. } => EventFamily::Workflow,
Event::ActivityScheduled { .. }
| Event::ActivityStarted { .. }
| Event::ActivityCompleted { .. }
| Event::ActivityFailed { .. }
| Event::ActivityCancelled { .. } => EventFamily::Activity,
Event::TimerStarted { .. }
| Event::TimerFired { .. }
| Event::TimerCancelled { .. }
| Event::WithTimeoutCompleted { .. } => EventFamily::Timer,
Event::SignalReceived { .. } | Event::SignalSent { .. } => EventFamily::Signal,
Event::ChildWorkflowStarted { .. }
| Event::ChildWorkflowCompleted { .. }
| Event::ChildWorkflowFailed { .. }
| Event::ChildWorkflowCancelled { .. } => EventFamily::ChildWorkflow,
Event::ScheduleCreated { .. }
| Event::ScheduleUpdated { .. }
| Event::SchedulePaused { .. }
| Event::ScheduleResumed { .. }
| Event::ScheduleDeleted { .. }
| Event::ScheduleTriggered { .. } => EventFamily::Schedule,
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use aion_core::{EventEnvelope, WorkflowStatus};
use aion_package::ContentHash;
use aion_store::visibility::VisibilityStore;
use aion_store::{EventStore, InMemoryStore};
use futures::{StreamExt, stream};
use serde_json::json;
use crate::durability::Recorder;
use crate::engine::api::EngineComponents;
use crate::registry::{CompletionNotifier, HandleResidency, WorkflowHandleParts};
use crate::{
Registry, RuntimeConfig, RuntimeHandle, SupervisionTree, WorkflowCatalog, WorkflowHandle,
};
use super::*;
#[derive(Debug, Default)]
struct SignalCapture {
calls: Mutex<Vec<(u64, String, Payload)>>,
}
#[async_trait]
impl SignalRouter for SignalCapture {
async fn route(
&self,
target: &WorkflowHandle,
name: String,
payload: Payload,
) -> Result<(), EngineError> {
self.calls
.lock()
.map_err(|_| EngineError::RegistryPoisoned)?
.push((target.pid(), name, payload));
Ok(())
}
}
#[derive(Debug)]
struct QueryCapture {
calls: Mutex<Vec<(u64, String)>>,
reply: Payload,
}
#[async_trait]
impl QueryService for QueryCapture {
async fn query(
&self,
target: &WorkflowHandle,
name: String,
) -> Result<Payload, EngineError> {
self.calls
.lock()
.map_err(|_| EngineError::RegistryPoisoned)?
.push((target.pid(), name));
Ok(self.reply.clone())
}
}
#[derive(Debug)]
struct FakePublisher {
events: Vec<Event>,
}
impl EventPublisher for FakePublisher {
fn subscribe(
&self,
filter: EventFilter,
) -> BoxStream<'static, Result<Event, EventStreamLagged>> {
let events = self
.events
.iter()
.filter(|event| filter.matches(event))
.cloned()
.map(Ok)
.collect::<Vec<_>>();
stream::iter(events).boxed()
}
}
fn payload(label: &str) -> Result<Payload, aion_core::PayloadError> {
Payload::from_json(&json!({ "label": label }))
}
fn engine_with_seams(
signal_router: Arc<dyn SignalRouter>,
query_service: Arc<dyn QueryService>,
event_publisher: Arc<dyn EventPublisher>,
) -> Result<Engine, EngineError> {
let backing = Arc::new(InMemoryStore::default());
let store: Arc<dyn EventStore> = Arc::clone(&backing) as _;
let visibility_store: Arc<dyn VisibilityStore> = backing;
Ok(Engine::new(EngineComponents {
store,
visibility_store,
runtime: Arc::new(RuntimeHandle::new(RuntimeConfig::new(Some(1)))?),
catalog: Arc::new(WorkflowCatalog::new()),
registry: Arc::new(Registry::default()),
supervision: Arc::new(SupervisionTree::new()),
delegated: DelegatedSeams::new(signal_router, query_service, event_publisher),
signal_handoff: Arc::new(crate::signal::SignalResumeHandoff::new()),
search_attribute_schema: Arc::new(aion_core::SearchAttributeSchema::new()),
visibility_reconciliation_task: None,
}))
}
async fn recorded_active_handle(
engine: &Engine,
) -> Result<WorkflowHandle, Box<dyn std::error::Error>> {
let workflow_id = WorkflowId::new_v4();
let run_id = RunId::new_v4();
let store = engine.store();
let mut recorder = Recorder::new(workflow_id.clone(), Arc::clone(&store));
recorder
.record_workflow_started(
chrono::Utc::now(),
crate::durability::WorkflowStartRecord {
workflow_type: "checkout".to_owned(),
input: payload("input")?,
run_id: run_id.clone(),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
Ok(WorkflowHandle::new(WorkflowHandleParts {
workflow_id,
run_id,
pid: engine.runtime().spawn_test_process_with_trap_exit(true)?,
workflow_type: "checkout".to_owned(),
loaded_version: ContentHash::from_bytes([1; 32]),
cached_status: WorkflowStatus::Running,
residency: HandleResidency::Resident,
recorder,
completion: CompletionNotifier::new(),
}))
}
async fn insert_active_handle(
engine: &Engine,
) -> Result<WorkflowHandle, Box<dyn std::error::Error>> {
let handle = recorded_active_handle(engine).await?;
engine.registry().insert(
(handle.workflow_id().clone(), handle.run_id().clone()),
handle.clone(),
)?;
Ok(handle)
}
fn envelope(seq: u64, workflow_id: &WorkflowId) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn signal_inside_the_registration_birth_window_waits_for_the_handle()
-> Result<(), Box<dyn std::error::Error>> {
let signal = Arc::new(SignalCapture::default());
let engine = Arc::new(engine_with_seams(
signal.clone(),
Arc::new(DeferredQueryService),
Arc::new(DeferredEventPublisher),
)?);
let handle = recorded_active_handle(&engine).await?;
let late_engine = Arc::clone(&engine);
let late_handle = handle.clone();
let inserter = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
late_engine.registry().insert(
(
late_handle.workflow_id().clone(),
late_handle.run_id().clone(),
),
late_handle,
)
});
engine
.signal(
handle.workflow_id(),
handle.run_id(),
"approve",
payload("birth")?,
)
.await?;
inserter.await??;
let calls = signal
.calls
.lock()
.map_err(|_| EngineError::RegistryPoisoned)?;
assert_eq!(calls.len(), 1, "the signal must reach the routed handle");
drop(calls);
engine.shutdown()?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn signal_for_a_started_run_with_no_handle_fails_typed_after_the_budget()
-> Result<(), Box<dyn std::error::Error>> {
let engine = engine_with_seams(
Arc::new(SignalCapture::default()),
Arc::new(DeferredQueryService),
Arc::new(DeferredEventPublisher),
)?;
let handle = recorded_active_handle(&engine).await?;
let outcome = engine
.signal(
handle.workflow_id(),
handle.run_id(),
"approve",
payload("never")?,
)
.await;
assert!(matches!(outcome, Err(EngineError::WorkflowNotFound { .. })));
engine.shutdown()?;
Ok(())
}
#[tokio::test]
async fn signal_delegates_to_router_and_unknown_returns_not_found()
-> Result<(), Box<dyn std::error::Error>> {
let signal = Arc::new(SignalCapture::default());
let engine = engine_with_seams(
signal.clone(),
Arc::new(DeferredQueryService),
Arc::new(DeferredEventPublisher),
)?;
let handle = insert_active_handle(&engine).await?;
let sent_payload = payload("signal")?;
engine
.signal(
handle.workflow_id(),
handle.run_id(),
"approve",
sent_payload.clone(),
)
.await?;
{
let calls = signal
.calls
.lock()
.map_err(|_| EngineError::RegistryPoisoned)?;
assert_eq!(
calls.as_slice(),
&[(handle.pid(), "approve".to_owned(), sent_payload)]
);
}
let unknown = engine
.signal(
&WorkflowId::new_v4(),
&RunId::new_v4(),
"approve",
payload("unknown")?,
)
.await;
assert!(matches!(unknown, Err(EngineError::WorkflowNotFound { .. })));
engine.shutdown()?;
Ok(())
}
#[tokio::test]
async fn query_delegates_to_service_and_returns_payload()
-> Result<(), Box<dyn std::error::Error>> {
let reply = payload("reply")?;
let query = Arc::new(QueryCapture {
calls: Mutex::new(Vec::new()),
reply: reply.clone(),
});
let engine = engine_with_seams(
Arc::new(DeferredSignalRouter),
query.clone(),
Arc::new(DeferredEventPublisher),
)?;
let handle = insert_active_handle(&engine).await?;
let returned = engine
.query(handle.workflow_id(), handle.run_id(), "state")
.await?;
assert_eq!(returned, reply);
let calls = query
.calls
.lock()
.map_err(|_| EngineError::RegistryPoisoned)?;
assert_eq!(calls.as_slice(), &[(handle.pid(), "state".to_owned())]);
drop(calls);
engine.shutdown()?;
Ok(())
}
#[tokio::test]
async fn query_terminal_run_is_not_running_and_unknown_is_not_found()
-> Result<(), Box<dyn std::error::Error>> {
let engine = engine_with_seams(
Arc::new(DeferredSignalRouter),
Arc::new(DeferredQueryService),
Arc::new(DeferredEventPublisher),
)?;
let workflow_id = WorkflowId::new_v4();
let run_id = aion_core::RunId::new_v4();
let mut recorder = crate::durability::Recorder::new(workflow_id.clone(), engine.store());
recorder
.record_workflow_started(
chrono::Utc::now(),
crate::durability::WorkflowStartRecord {
workflow_type: "checkout".to_owned(),
input: payload("input")?,
run_id: run_id.clone(),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
},
)
.await?;
recorder
.record_workflow_completed(chrono::Utc::now(), payload("result")?)
.await?;
let terminal = engine.query(&workflow_id, &run_id, "state").await;
assert!(matches!(
terminal,
Err(EngineError::Query(crate::query::QueryError::NotRunning(id))) if id == workflow_id
));
let unknown = engine
.query(&WorkflowId::new_v4(), &RunId::new_v4(), "state")
.await;
assert!(matches!(unknown, Err(EngineError::WorkflowNotFound { .. })));
engine.shutdown()?;
Ok(())
}
#[tokio::test]
async fn subscribe_delegates_to_publisher_stream_with_filter()
-> Result<(), Box<dyn std::error::Error>> {
let workflow_id = WorkflowId::new_v4();
let other_id = WorkflowId::new_v4();
let matching = Event::SignalReceived {
envelope: envelope(1, &workflow_id),
name: "approved".to_owned(),
payload: payload("signal")?,
};
let filtered = Event::WorkflowStarted {
envelope: envelope(1, &other_id),
workflow_type: "checkout".to_owned(),
input: payload("input")?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
};
let engine = engine_with_seams(
Arc::new(DeferredSignalRouter),
Arc::new(DeferredQueryService),
Arc::new(FakePublisher {
events: vec![matching.clone(), filtered],
}),
)?;
let events = engine
.subscribe(EventFilter {
workflow_id: Some(workflow_id),
run: None,
family: Some(EventFamily::Signal),
})
.collect::<Vec<_>>()
.await;
assert_eq!(events, vec![Ok(matching)]);
engine.shutdown()?;
Ok(())
}
}