use std::sync::Arc;
use aion_core::{ActivityError, ActivityErrorKind, ActivityId, Event, Payload};
use chrono::Utc;
use serde::Deserialize;
use crate::activity::bridge::ActivityDispatcher;
use crate::registry::Registry;
use crate::runtime::RuntimeHandle;
use crate::runtime::nif_activity_dispatch::{
ActivityCall, FIRST_DELIVERY_ATTEMPT, spawn_completion_task,
};
use crate::runtime::nif_context::NifContext;
use crate::runtime::nif_state::{CollectKind, EngineNifState, PendingAwait};
use crate::runtime::nif_timeout::SCOPE_EXPIRED_MESSAGE;
#[derive(Deserialize)]
pub(super) struct ActivitySpec {
name: String,
input: String,
config: String,
}
pub(super) struct CollectDeps {
pub(super) registry: Arc<Registry>,
pub(super) runtime: Arc<RuntimeHandle>,
pub(super) tokio_handle: tokio::runtime::Handle,
pub(super) dispatcher: Option<Arc<dyn ActivityDispatcher>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum OrdinalState {
Completed(String),
Failed(String),
Cancelled,
Pending,
}
type RaceSettlement = (u64, Result<String, String>);
#[derive(Debug, PartialEq, Eq)]
pub(super) enum CollectStep {
QuerySentinel(String),
AllCompleted(Vec<String>),
RaceWon(Result<String, String>),
FailFast(String),
ScopeExpired(String),
Suspend,
}
pub(super) fn collect_step(
state: &EngineNifState,
deps: &CollectDeps,
pid: u64,
kind: CollectKind,
specs: &[ActivitySpec],
label: &str,
) -> Result<CollectStep, String> {
if let Some(sentinel) = super::nif_query_pump::take_pending_query_sentinel(state, pid) {
return Ok(CollectStep::QuerySentinel(sentinel));
}
if specs.is_empty() {
return match kind {
CollectKind::All => Ok(CollectStep::AllCompleted(Vec::new())),
CollectKind::Race => Err("expected at least one activity".to_owned()),
};
}
let count =
u64::try_from(specs.len()).map_err(|_| "activity list length overflows u64".to_owned())?;
let context = NifContext::new(
pid,
deps.registry.as_ref(),
deps.tokio_handle.clone(),
deps.runtime.signal_delivery(),
)
.map_err(|error| error.error_reason())?;
let base_ordinal = pin_or_allocate(state, &context, pid, kind, count)?;
dispatch_unscheduled(deps, &context, specs, base_ordinal, label)?;
match kind {
CollectKind::All => settle_all(state, deps, &context, pid, base_ordinal, count),
CollectKind::Race => settle_race(state, deps, &context, pid, base_ordinal, count),
}
}
fn pin_or_allocate(
state: &EngineNifState,
context: &NifContext,
pid: u64,
kind: CollectKind,
count: u64,
) -> Result<u64, String> {
match state.pending_awaits.get(&pid).map(|entry| entry.clone()) {
Some(PendingAwait::Collect {
base_ordinal,
count: pinned_count,
kind: pinned_kind,
}) => {
if pinned_count != count || pinned_kind != kind {
return Err(format!(
"process is pinned to a different collect await \
(pinned {pinned_kind:?} of {pinned_count}, called {kind:?} of {count})"
));
}
Ok(base_ordinal)
}
Some(PendingAwait::Sleep { .. }) => {
Err("process is pinned to a pending sleep await".to_owned())
}
Some(PendingAwait::Signal { .. }) => {
Err("process is pinned to a pending signal await".to_owned())
}
Some(PendingAwait::Child { .. }) => {
Err("process is pinned to a pending child await".to_owned())
}
None => {
let base_ordinal = context.allocate_activity_ordinals(count);
state.pending_awaits.insert(
pid,
PendingAwait::Collect {
base_ordinal,
count,
kind,
},
);
Ok(base_ordinal)
}
}
}
fn dispatch_unscheduled(
deps: &CollectDeps,
context: &NifContext,
specs: &[ActivitySpec],
base_ordinal: u64,
label: &str,
) -> Result<(), String> {
let mut fresh: Vec<(u64, &ActivitySpec)> = Vec::new();
for (offset, spec) in specs.iter().enumerate() {
let ordinal = base_ordinal + offset_to_u64(offset)?;
match scheduled_activity_type(context.history(), ordinal) {
Some(recorded) => {
if recorded != spec.name {
return Err(format!(
"determinism violation: ordinal {ordinal} recorded activity type \
{recorded:?} but workflow code supplied {:?}",
spec.name
));
}
}
None => fresh.push((ordinal, spec)),
}
}
if fresh.is_empty() {
return Ok(());
}
let Some(dispatcher) = deps.dispatcher.as_ref() else {
return Err(
"no activity dispatcher configured — set one via EngineBuilder::activity_dispatcher"
.to_owned(),
);
};
for (ordinal, spec) in &fresh {
let input = payload_from_json_text(&spec.input, label)?;
context
.record_activity_scheduled_started(
Utc::now(),
ActivityId::from_sequence_position(*ordinal),
spec.name.clone(),
input,
)
.map_err(|error| error.error_reason())?;
}
for (ordinal, spec) in fresh {
spawn_completion_task(
&deps.tokio_handle,
Arc::clone(&deps.runtime),
Arc::clone(dispatcher),
context.pid(),
super::nif_activity::correlation_id(ordinal),
ActivityCall {
name: spec.name.clone(),
input: spec.input.clone(),
config: spec.config.clone(),
attempt: FIRST_DELIVERY_ATTEMPT,
},
);
}
Ok(())
}
fn settle_all(
state: &EngineNifState,
deps: &CollectDeps,
context: &NifContext,
pid: u64,
base_ordinal: u64,
count: u64,
) -> Result<CollectStep, String> {
let mut states = Vec::with_capacity(usize::try_from(count).unwrap_or(0));
for ordinal in base_ordinal..base_ordinal + count {
let recorded = match recorded_terminal(context.history(), ordinal)? {
Some(recorded) => recorded,
None => take_and_record(deps, context, pid, ordinal)?,
};
states.push(recorded);
}
let lowest_failure = states.iter().find_map(|recorded| match recorded {
OrdinalState::Failed(message) => Some(message.clone()),
_ => None,
});
if let Some(message) = lowest_failure {
cancel_pending(deps, context, pid, base_ordinal, &states)?;
state.pending_awaits.remove(&pid);
return Ok(CollectStep::FailFast(message));
}
if states
.iter()
.all(|recorded| matches!(recorded, OrdinalState::Completed(_)))
{
let results = states
.into_iter()
.filter_map(|recorded| match recorded {
OrdinalState::Completed(payload) => Some(payload),
_ => None,
})
.collect();
state.pending_awaits.remove(&pid);
return Ok(CollectStep::AllCompleted(results));
}
if super::nif_timeout::expired_scope_deadline(state, pid, context.history()).is_some() {
cancel_pending(deps, context, pid, base_ordinal, &states)?;
state.pending_awaits.remove(&pid);
return Ok(CollectStep::ScopeExpired(SCOPE_EXPIRED_MESSAGE.to_owned()));
}
if !states
.iter()
.any(|recorded| matches!(recorded, OrdinalState::Pending))
{
state.pending_awaits.remove(&pid);
return Ok(CollectStep::ScopeExpired(SCOPE_EXPIRED_MESSAGE.to_owned()));
}
Ok(CollectStep::Suspend)
}
fn settle_race(
state: &EngineNifState,
deps: &CollectDeps,
context: &NifContext,
pid: u64,
base_ordinal: u64,
count: u64,
) -> Result<CollectStep, String> {
let history = context.history();
let mut winner = recorded_race_winner(history, base_ordinal, count)?;
if winner.is_none() {
for ordinal in base_ordinal..base_ordinal + count {
if recorded_terminal(history, ordinal)?.is_some() {
continue;
}
match take_and_record(deps, context, pid, ordinal)? {
OrdinalState::Completed(payload) => {
winner = Some((ordinal, Ok(payload)));
break;
}
OrdinalState::Failed(message) => {
winner = Some((ordinal, Err(message)));
break;
}
OrdinalState::Cancelled | OrdinalState::Pending => {}
}
}
}
if let Some((winner_ordinal, outcome)) = winner {
for ordinal in base_ordinal..base_ordinal + count {
if ordinal == winner_ordinal {
continue;
}
drop_runtime_entries(deps, pid, ordinal);
if recorded_terminal(history, ordinal)?.is_none() {
record_cancelled(context, ordinal)?;
}
}
state.pending_awaits.remove(&pid);
return Ok(CollectStep::RaceWon(outcome));
}
if super::nif_timeout::expired_scope_deadline(state, pid, history).is_some() {
for ordinal in base_ordinal..base_ordinal + count {
drop_runtime_entries(deps, pid, ordinal);
if recorded_terminal(history, ordinal)?.is_none() {
record_cancelled(context, ordinal)?;
}
}
state.pending_awaits.remove(&pid);
return Ok(CollectStep::ScopeExpired(SCOPE_EXPIRED_MESSAGE.to_owned()));
}
let mut all_cancelled = true;
for ordinal in base_ordinal..base_ordinal + count {
if recorded_terminal(history, ordinal)? != Some(OrdinalState::Cancelled) {
all_cancelled = false;
break;
}
}
if all_cancelled {
state.pending_awaits.remove(&pid);
return Ok(CollectStep::ScopeExpired(SCOPE_EXPIRED_MESSAGE.to_owned()));
}
Ok(CollectStep::Suspend)
}
fn cancel_pending(
deps: &CollectDeps,
context: &NifContext,
pid: u64,
base_ordinal: u64,
states: &[OrdinalState],
) -> Result<(), String> {
for (offset, recorded) in states.iter().enumerate() {
if matches!(recorded, OrdinalState::Pending) {
let ordinal = base_ordinal + offset_to_u64(offset)?;
record_cancelled(context, ordinal)?;
drop_runtime_entries(deps, pid, ordinal);
}
}
Ok(())
}
fn take_and_record(
deps: &CollectDeps,
context: &NifContext,
pid: u64,
ordinal: u64,
) -> Result<OrdinalState, String> {
let activity_id = ActivityId::from_sequence_position(ordinal);
if let Some(payload) = deps.runtime.take_activity_result(pid, ordinal) {
context
.record_activity_completed(Utc::now(), activity_id, payload.clone())
.map_err(|error| error.error_reason())?;
return Ok(OrdinalState::Completed(payload_text(&payload)?));
}
if let Some(error) = deps.runtime.take_activity_error(pid, ordinal) {
context
.record_activity_failed(Utc::now(), activity_id, terminal_error(&error.message), 1)
.map_err(|inner| inner.error_reason())?;
return Ok(OrdinalState::Failed(error.message));
}
Ok(OrdinalState::Pending)
}
fn record_cancelled(context: &NifContext, ordinal: u64) -> Result<(), String> {
context
.record_activity_cancelled(Utc::now(), ActivityId::from_sequence_position(ordinal))
.map_err(|error| error.error_reason())
}
fn drop_runtime_entries(deps: &CollectDeps, pid: u64, ordinal: u64) {
drop(deps.runtime.take_activity_result(pid, ordinal));
drop(deps.runtime.take_activity_error(pid, ordinal));
}
fn recorded_terminal(history: &[Event], ordinal: u64) -> Result<Option<OrdinalState>, String> {
let target = ActivityId::from_sequence_position(ordinal);
for event in history {
match event {
Event::ActivityCompleted {
activity_id,
result,
..
} if *activity_id == target => {
return Ok(Some(OrdinalState::Completed(payload_text(result)?)));
}
Event::ActivityFailed {
activity_id, error, ..
} if *activity_id == target => {
return Ok(Some(OrdinalState::Failed(error.message.clone())));
}
Event::ActivityCancelled { activity_id, .. } if *activity_id == target => {
return Ok(Some(OrdinalState::Cancelled));
}
_ => {}
}
}
Ok(None)
}
fn recorded_race_winner(
history: &[Event],
base_ordinal: u64,
count: u64,
) -> Result<Option<RaceSettlement>, String> {
let in_range = |activity_id: &ActivityId| {
let position = activity_id.sequence_position();
position >= base_ordinal && position < base_ordinal + count
};
for event in history {
match event {
Event::ActivityCompleted {
activity_id,
result,
..
} if in_range(activity_id) => {
return Ok(Some((
activity_id.sequence_position(),
Ok(payload_text(result)?),
)));
}
Event::ActivityFailed {
activity_id, error, ..
} if in_range(activity_id) => {
return Ok(Some((
activity_id.sequence_position(),
Err(error.message.clone()),
)));
}
_ => {}
}
}
Ok(None)
}
fn scheduled_activity_type(history: &[Event], ordinal: u64) -> Option<String> {
let target = ActivityId::from_sequence_position(ordinal);
history.iter().find_map(|event| match event {
Event::ActivityScheduled {
activity_id,
activity_type,
..
} if *activity_id == target => Some(activity_type.clone()),
_ => None,
})
}
fn payload_from_json_text(text: &str, label: &str) -> Result<Payload, String> {
let value = serde_json::from_str(text)
.map_err(|error| format!("{label}: invalid JSON payload: {error}"))?;
Payload::from_json(&value).map_err(|error| format!("{label}: {error}"))
}
fn payload_text(payload: &Payload) -> Result<String, String> {
String::from_utf8(payload.bytes().to_vec())
.map_err(|_| "recorded activity payload is not valid UTF-8".to_owned())
}
fn terminal_error(message: &str) -> ActivityError {
ActivityError {
kind: ActivityErrorKind::Terminal,
message: message.to_owned(),
details: None,
}
}
fn offset_to_u64(offset: usize) -> Result<u64, String> {
u64::try_from(offset).map_err(|_| "activity offset overflows u64".to_owned())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use aion_core::{
ActivityError, ActivityErrorKind, ActivityId, ContentType, Event, EventEnvelope, Payload,
RunId, WorkflowId, WorkflowStatus,
};
use aion_package::ContentHash;
use aion_store::{EventStore, InMemoryStore, WriteToken};
use serde_json::json;
use super::{ActivitySpec, CollectDeps, CollectStep, collect_step};
use crate::durability::Recorder;
use crate::registry::{
CompletionNotifier, HandleResidency, Registry, WorkflowHandle, WorkflowHandleParts,
};
use crate::runtime::nif_state::{CollectKind, EngineNifState, PendingAwait};
use crate::runtime::nif_timeout::TimeoutScope;
use crate::runtime::{RuntimeConfig, RuntimeHandle};
type TestResult = Result<(), Box<dyn std::error::Error>>;
struct CollectHarness {
state: Arc<EngineNifState>,
deps: CollectDeps,
store: Arc<dyn EventStore>,
workflow_id: WorkflowId,
handle: WorkflowHandle,
pid: u64,
}
async fn seed_history(
store: &Arc<dyn EventStore>,
events: &[Event],
) -> Result<(WorkflowId, RunId), Box<dyn std::error::Error>> {
let workflow_id = WorkflowId::new_v4();
let run_id = RunId::new_v4();
let mut seeded = vec![started_event(&workflow_id, &run_id)?];
seeded.extend(events.iter().cloned());
let mut sequenced = Vec::with_capacity(seeded.len());
for (index, event) in seeded.into_iter().enumerate() {
let seq = u64::try_from(index)? + 1;
sequenced.push(reenvelope(event, &workflow_id, seq));
}
store
.append(WriteToken::recorder(), &workflow_id, &sequenced, 0)
.await?;
Ok((workflow_id, run_id))
}
impl CollectHarness {
async fn over_events(events: &[Event]) -> Result<Self, Box<dyn std::error::Error>> {
let store: Arc<dyn EventStore> = Arc::new(InMemoryStore::default());
let (workflow_id, run_id) = seed_history(&store, events).await?;
Self::over_store(store, workflow_id, run_id).await
}
async fn over_store(
store: Arc<dyn EventStore>,
workflow_id: WorkflowId,
run_id: RunId,
) -> Result<Self, Box<dyn std::error::Error>> {
let head = u64::try_from(store.read_history(&workflow_id).await?.len())?;
let registry = Arc::new(Registry::default());
let runtime = Arc::new(RuntimeHandle::new(RuntimeConfig::new(Some(1)))?);
let pid = runtime.spawn_test_process()?;
let recorder = Recorder::resume_at(workflow_id.clone(), Arc::clone(&store), head);
let handle = WorkflowHandle::new(WorkflowHandleParts {
workflow_id: workflow_id.clone(),
run_id: run_id.clone(),
pid,
workflow_type: "collect-parent".to_owned(),
loaded_version: ContentHash::from_bytes([5; 32]),
cached_status: WorkflowStatus::Running,
residency: HandleResidency::Resident,
recorder,
completion: CompletionNotifier::new(),
});
registry.insert((workflow_id.clone(), run_id), handle.clone())?;
let deps = CollectDeps {
registry,
runtime: Arc::clone(&runtime),
tokio_handle: tokio::runtime::Handle::current(),
dispatcher: None,
};
Ok(Self {
state: Arc::new(EngineNifState::default()),
deps,
store,
workflow_id,
handle,
pid,
})
}
fn step(&self, kind: CollectKind, specs: &[ActivitySpec]) -> Result<CollectStep, String> {
tokio::task::block_in_place(|| {
collect_step(&self.state, &self.deps, self.pid, kind, specs, "collect")
})
}
fn pinned(&self) -> Option<PendingAwait> {
self.state.pending_awaits.get(&self.pid).map(|e| e.clone())
}
async fn cancelled_ordinals(&self) -> Result<Vec<u64>, Box<dyn std::error::Error>> {
Ok(self
.store
.read_history(&self.workflow_id)
.await?
.iter()
.filter_map(|event| match event {
Event::ActivityCancelled { activity_id, .. } => {
Some(activity_id.sequence_position())
}
_ => None,
})
.collect())
}
fn shutdown(self) -> TestResult {
self.deps.runtime.shutdown()?;
Ok(())
}
}
fn reenvelope(event: Event, workflow_id: &WorkflowId, seq: u64) -> Event {
let envelope = EventEnvelope {
seq,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
};
match event {
Event::WorkflowStarted {
workflow_type,
input,
run_id,
parent_run_id,
package_version,
..
} => Event::WorkflowStarted {
envelope,
workflow_type,
input,
run_id,
parent_run_id,
package_version,
},
Event::ActivityScheduled {
activity_id,
activity_type,
input,
..
} => Event::ActivityScheduled {
envelope,
activity_id,
activity_type,
input,
},
Event::ActivityStarted { activity_id, .. } => Event::ActivityStarted {
envelope,
activity_id,
},
Event::ActivityCompleted {
activity_id,
result,
..
} => Event::ActivityCompleted {
envelope,
activity_id,
result,
},
Event::ActivityFailed {
activity_id,
error,
attempt,
..
} => Event::ActivityFailed {
envelope,
activity_id,
error,
attempt,
},
Event::ActivityCancelled { activity_id, .. } => Event::ActivityCancelled {
envelope,
activity_id,
},
Event::TimerFired { timer_id, .. } => Event::TimerFired { envelope, timer_id },
other => other,
}
}
fn started_event(
workflow_id: &WorkflowId,
run_id: &RunId,
) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::WorkflowStarted {
envelope: EventEnvelope {
seq: 1,
recorded_at: chrono::Utc::now(),
workflow_id: workflow_id.clone(),
},
workflow_type: "collect-parent".to_owned(),
input: Payload::from_json(&json!({ "fixture": "input" }))?,
run_id: run_id.clone(),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
})
}
fn placeholder_envelope() -> EventEnvelope {
EventEnvelope {
seq: 0,
recorded_at: chrono::Utc::now(),
workflow_id: WorkflowId::new_v4(),
}
}
fn scheduled_started(ordinal: u64, name: &str) -> Vec<Event> {
vec![
Event::ActivityScheduled {
envelope: placeholder_envelope(),
activity_id: ActivityId::from_sequence_position(ordinal),
activity_type: name.to_owned(),
input: Payload::new(ContentType::Json, br#""in""#.to_vec()),
},
Event::ActivityStarted {
envelope: placeholder_envelope(),
activity_id: ActivityId::from_sequence_position(ordinal),
},
]
}
fn completed(ordinal: u64, result: &str) -> Event {
Event::ActivityCompleted {
envelope: placeholder_envelope(),
activity_id: ActivityId::from_sequence_position(ordinal),
result: Payload::new(ContentType::Json, result.as_bytes().to_vec()),
}
}
fn failed(ordinal: u64, message: &str) -> Event {
Event::ActivityFailed {
envelope: placeholder_envelope(),
activity_id: ActivityId::from_sequence_position(ordinal),
error: ActivityError {
kind: ActivityErrorKind::Terminal,
message: message.to_owned(),
details: None,
},
attempt: 1,
}
}
fn spec(name: &str) -> ActivitySpec {
ActivitySpec {
name: name.to_owned(),
input: r#""in""#.to_owned(),
config: "{}".to_owned(),
}
}
fn specs(names: &[&str]) -> Vec<ActivitySpec> {
names.iter().map(|name| spec(name)).collect()
}
fn scope_deadline_fired(ordinal: u64) -> Event {
Event::TimerFired {
envelope: placeholder_envelope(),
timer_id: aion_core::TimerId::anonymous(ordinal),
}
}
fn install_fresh_read_bridge(harness: &CollectHarness) {
crate::runtime::nif_timer_bridge::install_timer_nif_bridge(
&harness.state,
Arc::clone(&harness.deps.registry),
Arc::clone(&harness.store),
tokio::runtime::Handle::current(),
crate::runtime::SignalDeliveryConfig::default(),
);
}
fn pending_batch(names: &[&str]) -> Vec<Event> {
names
.iter()
.enumerate()
.flat_map(|(ordinal, name)| {
scheduled_started(u64::try_from(ordinal).unwrap_or(u64::MAX), name)
})
.collect()
}
#[tokio::test(flavor = "multi_thread")]
async fn pinned_base_is_reused_across_reentries_and_counter_advances_once() -> TestResult {
let harness = CollectHarness::over_events(&pending_batch(&["alpha", "beta"])).await?;
let two = specs(&["alpha", "beta"]);
assert_eq!(
harness.step(CollectKind::All, &two),
Ok(CollectStep::Suspend)
);
assert!(matches!(
harness.pinned(),
Some(PendingAwait::Collect {
base_ordinal: 0,
count: 2,
kind: CollectKind::All,
})
));
assert_eq!(harness.handle.activity_ordinals_allocated(), 2);
assert_eq!(
harness.step(CollectKind::All, &two),
Ok(CollectStep::Suspend)
);
assert!(matches!(
harness.pinned(),
Some(PendingAwait::Collect {
base_ordinal: 0,
..
})
));
assert_eq!(harness.handle.activity_ordinals_allocated(), 2);
harness.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn fail_fast_returns_lowest_ordinal_failure_and_cancels_unresolved() -> TestResult {
let mut events = pending_batch(&["a", "b", "c"]);
events.push(failed(1, "boom-b"));
events.push(failed(2, "boom-c"));
let harness = CollectHarness::over_events(&events).await?;
let step = harness.step(CollectKind::All, &specs(&["a", "b", "c"]));
assert_eq!(step, Ok(CollectStep::FailFast("boom-b".to_owned())));
assert_eq!(harness.cancelled_ordinals().await?, vec![0]);
assert_eq!(harness.pinned(), None);
harness.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn cancellation_set_covers_exactly_the_unresolved_ordinals() -> TestResult {
let mut events = pending_batch(&["a", "b", "c", "d"]);
events.push(completed(0, r#""done-a""#));
events.push(failed(2, "boom-c"));
let harness = CollectHarness::over_events(&events).await?;
let step = harness.step(CollectKind::All, &specs(&["a", "b", "c", "d"]));
assert_eq!(step, Ok(CollectStep::FailFast("boom-c".to_owned())));
assert_eq!(harness.cancelled_ordinals().await?, vec![1, 3]);
assert_eq!(harness.pinned(), None);
harness.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn race_winner_is_first_recorded_terminal_not_lowest_ordinal() -> TestResult {
let mut events = pending_batch(&["a", "b"]);
events.push(completed(1, r#""win-b""#));
let harness = CollectHarness::over_events(&events).await?;
let step = harness.step(CollectKind::Race, &specs(&["a", "b"]));
assert_eq!(step, Ok(CollectStep::RaceWon(Ok(r#""win-b""#.to_owned()))));
assert_eq!(harness.cancelled_ordinals().await?, vec![0]);
assert_eq!(harness.pinned(), None);
let mut events = pending_batch(&["a", "b"]);
events.push(failed(1, "boom-b"));
let failing = CollectHarness::over_events(&events).await?;
assert_eq!(
failing.step(CollectKind::Race, &specs(&["a", "b"])),
Ok(CollectStep::RaceWon(Err("boom-b".to_owned())))
);
assert_eq!(failing.cancelled_ordinals().await?, vec![0]);
harness.shutdown()?;
failing.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn race_batch_tie_breaks_to_lowest_ordinal_and_drains_loser_entries() -> TestResult {
let harness = CollectHarness::over_events(&pending_batch(&["a", "b"])).await?;
harness.deps.runtime.deliver_activity_completion_message(
harness.pid,
"activity:0",
r#""r0""#.to_owned(),
)?;
harness.deps.runtime.deliver_activity_completion_message(
harness.pid,
"activity:1",
r#""r1""#.to_owned(),
)?;
let step = harness.step(CollectKind::Race, &specs(&["a", "b"]));
assert_eq!(step, Ok(CollectStep::RaceWon(Ok(r#""r0""#.to_owned()))));
assert_eq!(harness.cancelled_ordinals().await?, vec![1]);
assert_eq!(harness.deps.runtime.retained_activity_completions(), 0);
let history = harness.store.read_history(&harness.workflow_id).await?;
let winner_terminals = history
.iter()
.filter(|event| {
matches!(
event,
Event::ActivityCompleted { .. } | Event::ActivityFailed { .. }
)
})
.count();
assert_eq!(
winner_terminals, 1,
"exactly one non-cancelled terminal may exist: {history:#?}"
);
harness.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn empty_list_resolves_immediately_without_pinning() -> TestResult {
let harness = CollectHarness::over_events(&[]).await?;
assert_eq!(
harness.step(CollectKind::All, &[]),
Ok(CollectStep::AllCompleted(Vec::new()))
);
assert_eq!(harness.pinned(), None);
assert_eq!(harness.handle.activity_ordinals_allocated(), 0);
let race = harness.step(CollectKind::Race, &[]);
assert_eq!(race, Err("expected at least one activity".to_owned()));
assert_eq!(harness.pinned(), None);
harness.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn expired_scope_cancels_unresolved_and_replay_derives_the_same_abort() -> TestResult {
let mut events = pending_batch(&["a", "b"]);
events.push(completed(0, r#""done-a""#));
let harness = CollectHarness::over_events(&events).await?;
harness
.state
.timeout_scopes
.insert(9, TimeoutScope::replayed_for_test(harness.pid, true));
harness
.state
.timeout_scope_stacks
.insert(harness.pid, vec![9]);
let step = harness.step(CollectKind::All, &specs(&["a", "b"]));
assert_eq!(
step,
Ok(CollectStep::ScopeExpired(
"timeout:deadline expired".to_owned()
))
);
assert_eq!(harness.cancelled_ordinals().await?, vec![1]);
assert_eq!(harness.pinned(), None);
let store = Arc::clone(&harness.store);
let workflow_id = harness.workflow_id.clone();
let run_id = harness.handle.run_id().clone();
let history_len = store.read_history(&workflow_id).await?.len();
harness.shutdown()?;
let replay = CollectHarness::over_store(store, workflow_id, run_id).await?;
assert_eq!(
replay.step(CollectKind::All, &specs(&["a", "b"])),
Ok(CollectStep::ScopeExpired(
"timeout:deadline expired".to_owned()
))
);
assert_eq!(
replay.store.read_history(&replay.workflow_id).await?.len(),
history_len,
"replay must append nothing"
);
assert_eq!(replay.pinned(), None);
replay.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn all_stale_snapshot_expiry_suspends_then_converges_with_replay() -> TestResult {
let scope_timer = aion_core::TimerId::anonymous(7);
let backing = Arc::new(crate::runtime::nif_test_stores::StaleReadStore::new(6));
let store: Arc<dyn EventStore> = Arc::clone(&backing) as Arc<dyn EventStore>;
let mut events = pending_batch(&["a", "b"]);
events.push(completed(0, r#""done-a""#));
events.push(scope_deadline_fired(7));
let (workflow_id, run_id) = seed_history(&store, &events).await?;
let harness =
CollectHarness::over_store(Arc::clone(&store), workflow_id.clone(), run_id.clone())
.await?;
install_fresh_read_bridge(&harness);
backing.set_stale_target(&workflow_id, 1);
harness
.state
.timeout_scopes
.insert(21, TimeoutScope::live_for_test(harness.pid, scope_timer));
harness
.state
.timeout_scope_stacks
.insert(harness.pid, vec![21]);
let two = specs(&["a", "b"]);
assert_eq!(
harness.step(CollectKind::All, &two),
Ok(CollectStep::Suspend),
"a snapshot lacking the deadline must park, not branch"
);
assert_eq!(harness.cancelled_ordinals().await?, Vec::<u64>::new());
assert_eq!(
harness.step(CollectKind::All, &two),
Ok(CollectStep::ScopeExpired(
"timeout:deadline expired".to_owned()
))
);
assert_eq!(harness.cancelled_ordinals().await?, vec![1]);
assert_eq!(harness.pinned(), None);
let history_len = store.read_history(&workflow_id).await?.len();
harness.shutdown()?;
let replay = CollectHarness::over_store(store, workflow_id, run_id).await?;
replay.state.timeout_scopes.insert(
1,
TimeoutScope::replayed_expired_with_deadline_for_test(
replay.pid,
aion_core::TimerId::anonymous(7),
),
);
replay
.state
.timeout_scope_stacks
.insert(replay.pid, vec![1]);
assert_eq!(
replay.step(CollectKind::All, &two),
Ok(CollectStep::ScopeExpired(
"timeout:deadline expired".to_owned()
)),
"replay must take the same branch as the converged live run"
);
assert_eq!(
replay.store.read_history(&replay.workflow_id).await?.len(),
history_len,
"replay must append nothing"
);
replay.shutdown()
}
#[tokio::test(flavor = "multi_thread")]
async fn race_stale_snapshot_expiry_suspends_then_settles_the_recorded_winner() -> TestResult {
let scope_timer = aion_core::TimerId::anonymous(7);
let backing = Arc::new(crate::runtime::nif_test_stores::StaleReadStore::new(5));
let store: Arc<dyn EventStore> = Arc::clone(&backing) as Arc<dyn EventStore>;
let mut events = pending_batch(&["a", "b"]);
events.push(scope_deadline_fired(7));
let (workflow_id, run_id) = seed_history(&store, &events).await?;
let harness =
CollectHarness::over_store(Arc::clone(&store), workflow_id.clone(), run_id.clone())
.await?;
install_fresh_read_bridge(&harness);
backing.set_stale_target(&workflow_id, 1);
harness
.state
.timeout_scopes
.insert(23, TimeoutScope::live_for_test(harness.pid, scope_timer));
harness
.state
.timeout_scope_stacks
.insert(harness.pid, vec![23]);
let two = specs(&["a", "b"]);
assert_eq!(
harness.step(CollectKind::Race, &two),
Ok(CollectStep::Suspend),
"a snapshot lacking the deadline must park, not branch"
);
assert_eq!(harness.cancelled_ordinals().await?, Vec::<u64>::new());
harness.deps.runtime.deliver_activity_completion_message(
harness.pid,
"activity:0",
r#""r0""#.to_owned(),
)?;
assert_eq!(
harness.step(CollectKind::Race, &two),
Ok(CollectStep::RaceWon(Ok(r#""r0""#.to_owned())))
);
assert_eq!(harness.cancelled_ordinals().await?, vec![1]);
assert_eq!(harness.pinned(), None);
let history_len = store.read_history(&workflow_id).await?.len();
harness.shutdown()?;
let replay = CollectHarness::over_store(store, workflow_id, run_id).await?;
replay.state.timeout_scopes.insert(
1,
TimeoutScope::replayed_expired_with_deadline_for_test(
replay.pid,
aion_core::TimerId::anonymous(7),
),
);
replay
.state
.timeout_scope_stacks
.insert(replay.pid, vec![1]);
assert_eq!(
replay.step(CollectKind::Race, &two),
Ok(CollectStep::RaceWon(Ok(r#""r0""#.to_owned()))),
"replay must settle the recorded winner, not re-derive the race"
);
assert_eq!(
replay.store.read_history(&replay.workflow_id).await?.len(),
history_len,
"replay must append nothing"
);
replay.shutdown()
}
}