use aion_core::{ActivityId, Event, RunId, WorkflowId};
use crate::durability::{
correlation::{CorrelationKey, key_for_event},
error::DurabilityError,
};
pub fn current_run_segment(
history: Vec<Event>,
run_id: &RunId,
) -> Result<Vec<Event>, DurabilityError> {
let start = history
.iter()
.position(|event| {
matches!(
event,
Event::WorkflowStarted {
run_id: event_run_id,
..
} if event_run_id == run_id
)
})
.ok_or_else(|| DurabilityError::HistoryShape {
reason: format!("history has no WorkflowStarted for run {run_id}"),
})?;
let mut segment = history;
segment.drain(..start);
Ok(segment)
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum RecordedEventFamily {
Activity,
Timer,
Signal,
Child,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FoundEventDescriptor {
pub seq: u64,
pub family: Option<RecordedEventFamily>,
pub key: Option<CorrelationKey>,
pub kind: &'static str,
}
#[derive(Clone, Debug, PartialEq)]
pub enum CursorResolveResult {
Matched(Vec<Event>),
Exhausted,
Mismatch {
expected_key: CorrelationKey,
found: FoundEventDescriptor,
},
}
#[derive(Clone, Debug, PartialEq)]
pub enum ChildTerminalResolveResult {
Matched(Vec<Event>),
Exhausted,
Mismatch {
found: FoundEventDescriptor,
},
}
#[derive(Clone, Debug)]
pub struct HistoryCursor {
events: Vec<Event>,
consumed: Vec<bool>,
position: usize,
}
impl HistoryCursor {
pub fn new(events: Vec<Event>) -> Result<Self, DurabilityError> {
for pair in events.windows(2) {
let prior = pair[0].seq();
let next = pair[1].seq();
if next < prior {
return Err(DurabilityError::HistoryShape {
reason: format!("history sequence order decreased from {prior} to {next}"),
});
}
}
let consumed = vec![false; events.len()];
Ok(Self {
events,
consumed,
position: 0,
})
}
#[must_use]
pub fn current_sequence(&self) -> Option<u64> {
self.events.get(self.position).map(Event::seq)
}
#[must_use]
pub fn events(&self) -> &[Event] {
&self.events
}
#[must_use]
pub const fn position_index(&self) -> usize {
self.position
}
#[must_use]
pub fn next_key(&self, family: RecordedEventFamily) -> Option<CorrelationKey> {
let index = self.next_matchable_index()?;
let descriptor = self.descriptor_at(index, &self.events[index]);
if descriptor.family == Some(family) {
descriptor.key
} else {
None
}
}
pub fn fast_forward_to_key(&mut self, key: &CorrelationKey) {
while let Some(index) = self.next_matchable_index() {
let descriptor = self.descriptor_at(index, &self.events[index]);
if descriptor.key.as_ref() == Some(key) {
return;
}
self.position = index + 1;
}
}
pub fn fast_forward_to_child_terminal(&mut self, child_workflow_id: &WorkflowId) {
while let Some(index) = self.next_matchable_index() {
if is_child_terminal_for(&self.events[index], child_workflow_id) {
return;
}
self.position = index + 1;
}
}
#[must_use]
pub fn resolve_child_terminal(
&mut self,
child_workflow_id: &WorkflowId,
) -> ChildTerminalResolveResult {
let Some(found_index) = self.next_matchable_index() else {
return ChildTerminalResolveResult::Exhausted;
};
self.position = found_index;
match self.events.get(found_index) {
Some(event) if is_child_terminal_for(event, child_workflow_id) => {
ChildTerminalResolveResult::Matched(self.take_range(found_index, found_index + 1))
}
Some(event) => ChildTerminalResolveResult::Mismatch {
found: self.descriptor_at(found_index, event),
},
None => ChildTerminalResolveResult::Exhausted,
}
}
#[must_use]
pub fn resolve_next(
&mut self,
family: RecordedEventFamily,
expected_key: CorrelationKey,
) -> CursorResolveResult {
let Some(found_index) = self.next_matchable_index() else {
return CursorResolveResult::Exhausted;
};
self.position = found_index;
let found = self.descriptor_at(found_index, &self.events[found_index]);
if found.family != Some(family) || found.key.as_ref() != Some(&expected_key) {
return CursorResolveResult::Mismatch {
expected_key,
found,
};
}
match family {
RecordedEventFamily::Activity => self.resolve_activity(expected_key),
RecordedEventFamily::Timer => {
self.resolve_started_with_immediate_outcome(&expected_key)
}
RecordedEventFamily::Child => self.resolve_child(expected_key),
RecordedEventFamily::Signal => match expected_key {
CorrelationKey::Signal { .. } => self.consume_one(),
_ => self.mismatch_at_current(expected_key),
},
}
}
fn resolve_child(&mut self, expected_key: CorrelationKey) -> CursorResolveResult {
match self.events.get(self.position) {
Some(Event::ChildWorkflowStarted { .. }) => self.consume_one(),
_ => self.mismatch_at_current(expected_key),
}
}
fn resolve_activity(&mut self, expected_key: CorrelationKey) -> CursorResolveResult {
let Some(Event::ActivityScheduled { activity_id, .. }) = self.events.get(self.position)
else {
return self.mismatch_at_current(expected_key);
};
let activity_id = activity_id.clone();
let mut matched = vec![self.position];
let mut index = self.position + 1;
while let Some(event) = self.events.get(index) {
match event {
Event::ActivityStarted {
activity_id: event_activity_id,
..
} if event_activity_id == &activity_id => {
matched.push(index);
}
Event::ActivityFailed {
activity_id: event_activity_id,
..
} if event_activity_id == &activity_id => {
matched.push(index);
if !self.has_later_activity_attempt_or_outcome(index + 1, &activity_id) {
return self.consume_indices(matched);
}
}
Event::ActivityCompleted {
activity_id: event_activity_id,
..
}
| Event::ActivityCancelled {
activity_id: event_activity_id,
..
} if event_activity_id == &activity_id => {
matched.push(index);
return self.consume_indices(matched);
}
_ => {}
}
index += 1;
}
CursorResolveResult::Exhausted
}
fn resolve_started_with_immediate_outcome(
&mut self,
expected_key: &CorrelationKey,
) -> CursorResolveResult {
let start = self.position;
let next = self.position + 1;
if self
.events
.get(next)
.is_some_and(|event| self.is_outcome_for_start_key(event, expected_key))
{
CursorResolveResult::Matched(self.take_range(start, next + 1))
} else {
self.consume_one()
}
}
fn consume_one(&mut self) -> CursorResolveResult {
CursorResolveResult::Matched(self.take_range(self.position, self.position + 1))
}
fn take_range(&mut self, start: usize, end: usize) -> Vec<Event> {
let consumed = self.events[start..end].to_vec();
for slot in &mut self.consumed[start..end] {
*slot = true;
}
self.advance_past_consumed();
consumed
}
fn consume_indices(&mut self, indices: Vec<usize>) -> CursorResolveResult {
let mut events = Vec::with_capacity(indices.len());
for index in indices {
if let (Some(event), Some(slot)) =
(self.events.get(index), self.consumed.get_mut(index))
{
*slot = true;
events.push(event.clone());
}
}
self.advance_past_consumed();
CursorResolveResult::Matched(events)
}
fn advance_past_consumed(&mut self) {
while self.consumed.get(self.position).copied().unwrap_or(false) {
self.position += 1;
}
}
fn next_matchable_index(&self) -> Option<usize> {
let events = self.events.get(self.position..)?;
let consumed = self.consumed.get(self.position..)?;
events
.iter()
.zip(consumed)
.position(|(event, consumed)| !consumed && family_for_event(event).is_some())
.map(|offset| self.position + offset)
}
fn mismatch_at_current(&self, expected_key: CorrelationKey) -> CursorResolveResult {
match self.events.get(self.position) {
Some(event) => CursorResolveResult::Mismatch {
expected_key,
found: self.descriptor_at(self.position, event),
},
None => CursorResolveResult::Exhausted,
}
}
fn descriptor_at(&self, index: usize, event: &Event) -> FoundEventDescriptor {
FoundEventDescriptor {
seq: event.seq(),
family: family_for_event(event),
key: key_for_event(&self.events, index),
kind: event_kind(event),
}
}
fn has_later_activity_attempt_or_outcome(
&self,
start: usize,
activity_id: &ActivityId,
) -> bool {
self.events.iter().skip(start).any(|event| {
matches!(
event,
Event::ActivityStarted {
activity_id: event_activity_id,
..
} | Event::ActivityFailed {
activity_id: event_activity_id,
..
} | Event::ActivityCompleted {
activity_id: event_activity_id,
..
} if event_activity_id == activity_id
)
})
}
fn is_outcome_for_start_key(&self, event: &Event, expected_key: &CorrelationKey) -> bool {
match (event, expected_key) {
(
Event::TimerFired { timer_id, .. }
| Event::TimerCancelled { timer_id, .. }
| Event::WithTimeoutCompleted { timer_id, .. },
CorrelationKey::Timer(expected_timer_id),
) => timer_id == expected_timer_id,
(
Event::ChildWorkflowCompleted {
child_workflow_id, ..
}
| Event::ChildWorkflowFailed {
child_workflow_id, ..
}
| Event::ChildWorkflowCancelled {
child_workflow_id, ..
},
CorrelationKey::Child(_),
) => self.events.get(self.position).is_some_and(|start| {
matches!(
start,
Event::ChildWorkflowStarted {
child_workflow_id: start_child_workflow_id,
..
} if start_child_workflow_id == child_workflow_id
)
}),
_ => false,
}
}
}
fn is_child_terminal_for(event: &Event, child_workflow_id: &WorkflowId) -> bool {
matches!(
event,
Event::ChildWorkflowCompleted {
child_workflow_id: terminal_child,
..
} | Event::ChildWorkflowFailed {
child_workflow_id: terminal_child,
..
} if terminal_child == child_workflow_id
)
}
fn family_for_event(event: &Event) -> Option<RecordedEventFamily> {
match event {
Event::ActivityScheduled { .. } => Some(RecordedEventFamily::Activity),
Event::TimerStarted { .. } | Event::WithTimeoutCompleted { .. } => {
Some(RecordedEventFamily::Timer)
}
Event::SignalReceived { .. } | Event::SignalSent { .. } => {
Some(RecordedEventFamily::Signal)
}
Event::ChildWorkflowStarted { .. }
| Event::ChildWorkflowCompleted { .. }
| Event::ChildWorkflowFailed { .. } => Some(RecordedEventFamily::Child),
_ => None,
}
}
fn event_kind(event: &Event) -> &'static str {
match event {
Event::WorkflowStarted { .. } => "WorkflowStarted",
Event::WorkflowCompleted { .. } => "WorkflowCompleted",
Event::WorkflowFailed { .. } => "WorkflowFailed",
Event::WorkflowCancelled { .. } => "WorkflowCancelled",
Event::WorkflowTimedOut { .. } => "WorkflowTimedOut",
Event::WorkflowContinuedAsNew { .. } => "WorkflowContinuedAsNew",
Event::SearchAttributesUpdated { .. } => "SearchAttributesUpdated",
Event::ActivityScheduled { .. } => "ActivityScheduled",
Event::ActivityStarted { .. } => "ActivityStarted",
Event::ActivityCompleted { .. } => "ActivityCompleted",
Event::ActivityFailed { .. } => "ActivityFailed",
Event::ActivityCancelled { .. } => "ActivityCancelled",
Event::TimerStarted { .. } => "TimerStarted",
Event::TimerFired { .. } => "TimerFired",
Event::TimerCancelled { .. } => "TimerCancelled",
Event::WithTimeoutCompleted { .. } => "WithTimeoutCompleted",
Event::SignalReceived { .. } => "SignalReceived",
Event::SignalSent { .. } => "SignalSent",
Event::ChildWorkflowStarted { .. } => "ChildWorkflowStarted",
Event::ChildWorkflowCompleted { .. } => "ChildWorkflowCompleted",
Event::ChildWorkflowFailed { .. } => "ChildWorkflowFailed",
Event::ChildWorkflowCancelled { .. } => "ChildWorkflowCancelled",
Event::ScheduleCreated { .. } => "ScheduleCreated",
Event::ScheduleUpdated { .. } => "ScheduleUpdated",
Event::SchedulePaused { .. } => "SchedulePaused",
Event::ScheduleResumed { .. } => "ScheduleResumed",
Event::ScheduleDeleted { .. } => "ScheduleDeleted",
Event::ScheduleTriggered { .. } => "ScheduleTriggered",
}
}
#[cfg(test)]
mod tests {
use aion_core::{
ActivityError, ActivityErrorKind, ActivityId, Event, EventEnvelope, Payload, TimerId,
WorkflowId,
};
use chrono::{DateTime, TimeZone, Utc};
use serde_json::json;
use uuid::Uuid;
use super::{
ChildTerminalResolveResult, CursorResolveResult, HistoryCursor, RecordedEventFamily,
};
use crate::durability::correlation::CorrelationKey;
fn timestamp() -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
Utc.timestamp_opt(0, 0)
.single()
.ok_or_else(|| "invalid timestamp".into())
}
fn envelope(seq: u64) -> Result<EventEnvelope, Box<dyn std::error::Error>> {
Ok(EventEnvelope {
seq,
recorded_at: timestamp()?,
workflow_id: WorkflowId::new(Uuid::nil()),
})
}
fn payload() -> Result<Payload, Box<dyn std::error::Error>> {
Ok(Payload::from_json(&json!(null))?)
}
fn workflow_started(seq: u64) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::WorkflowStarted {
envelope: envelope(seq)?,
workflow_type: "workflow".to_owned(),
input: payload()?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
})
}
fn scheduled(seq: u64, ordinal: u64) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ActivityScheduled {
envelope: envelope(seq)?,
activity_id: ActivityId::from_sequence_position(ordinal),
activity_type: "activity".to_owned(),
input: payload()?,
})
}
fn started(seq: u64, ordinal: u64) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ActivityStarted {
envelope: envelope(seq)?,
activity_id: ActivityId::from_sequence_position(ordinal),
})
}
fn completed(seq: u64, ordinal: u64) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ActivityCompleted {
envelope: envelope(seq)?,
activity_id: ActivityId::from_sequence_position(ordinal),
result: payload()?,
})
}
fn failed(
seq: u64,
ordinal: u64,
attempt: u32,
kind: ActivityErrorKind,
) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ActivityFailed {
envelope: envelope(seq)?,
activity_id: ActivityId::from_sequence_position(ordinal),
error: ActivityError {
kind,
message: "activity failed".to_owned(),
details: None,
},
attempt,
})
}
#[test]
fn new_accepts_in_order_history_and_exposes_starting_sequence()
-> Result<(), Box<dyn std::error::Error>> {
let cursor = HistoryCursor::new(vec![scheduled(7, 0)?, completed(8, 0)?])?;
assert_eq!(cursor.current_sequence(), Some(7));
assert_eq!(cursor.position_index(), 0);
Ok(())
}
#[test]
fn new_rejects_decreasing_sequence_order() -> Result<(), Box<dyn std::error::Error>> {
let error = HistoryCursor::new(vec![scheduled(9, 0)?, completed(8, 0)?])
.map(|_| "unexpected success")
.err();
assert!(error.is_some());
Ok(())
}
#[test]
fn resolves_activity_match_then_reports_exhaustion() -> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![scheduled(1, 0)?, completed(2, 0)?])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert_eq!(cursor.current_sequence(), None);
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("activity should match recorded history".into());
}
}
assert_eq!(
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(1),),
CursorResolveResult::Exhausted
);
Ok(())
}
#[test]
fn skips_non_matchable_lifecycle_events_before_resolving()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
workflow_started(1)?,
scheduled(2, 0)?,
completed(3, 0)?,
])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert!(matches!(
events.first(),
Some(Event::ActivityScheduled { .. })
));
assert_eq!(cursor.current_sequence(), None);
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("lifecycle events should not block command replay".into());
}
}
Ok(())
}
#[test]
fn reports_mismatch_for_different_next_family() -> Result<(), Box<dyn std::error::Error>> {
let timer_id = TimerId::anonymous(1);
let mut cursor = HistoryCursor::new(vec![Event::TimerStarted {
envelope: envelope(1)?,
timer_id: timer_id.clone(),
fire_at: timestamp()?,
}])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Mismatch {
expected_key,
found,
} => {
assert_eq!(expected_key, CorrelationKey::Activity(0));
assert_eq!(found.family, Some(RecordedEventFamily::Timer));
assert_eq!(found.key, Some(CorrelationKey::Timer(timer_id)));
}
CursorResolveResult::Matched(_) | CursorResolveResult::Exhausted => {
return Err("different next family should be a mismatch".into());
}
}
Ok(())
}
#[test]
fn walks_retry_failures_to_eventual_activity_success() -> Result<(), Box<dyn std::error::Error>>
{
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
failed(2, 0, 1, ActivityErrorKind::Retryable)?,
started(3, 0)?,
failed(4, 0, 2, ActivityErrorKind::Retryable)?,
completed(5, 0)?,
])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 5);
assert!(matches!(
events.last(),
Some(Event::ActivityCompleted { .. })
));
assert_eq!(cursor.current_sequence(), None);
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("retry history should resolve to eventual completion".into());
}
}
Ok(())
}
fn child_id(value: u128) -> WorkflowId {
WorkflowId::new(Uuid::from_u128(value))
}
fn child_started(seq: u64, child: u128) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ChildWorkflowStarted {
envelope: envelope(seq)?,
child_workflow_id: child_id(child),
workflow_type: "child".to_owned(),
input: payload()?,
})
}
fn child_completed(seq: u64, child: u128) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ChildWorkflowCompleted {
envelope: envelope(seq)?,
child_workflow_id: child_id(child),
result: payload()?,
})
}
fn signal_received(seq: u64, name: &str) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::SignalReceived {
envelope: envelope(seq)?,
name: name.to_owned(),
payload: payload()?,
})
}
#[test]
fn fast_forward_to_child_terminal_skips_consumed_commands()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
completed(2, 0)?,
child_started(3, 1)?,
signal_received(4, "mid")?,
child_started(5, 2)?,
child_completed(6, 1)?,
])?;
cursor.fast_forward_to_child_terminal(&child_id(1));
let result = cursor.resolve_child_terminal(&child_id(1));
match result {
ChildTerminalResolveResult::Matched(events) => {
assert_eq!(events.len(), 1);
assert!(matches!(
events.first(),
Some(Event::ChildWorkflowCompleted { child_workflow_id, .. })
if *child_workflow_id == child_id(1)
));
}
ChildTerminalResolveResult::Exhausted | ChildTerminalResolveResult::Mismatch { .. } => {
return Err("await must reach the awaited child's recorded terminal".into());
}
}
Ok(())
}
#[test]
fn fast_forward_to_child_terminal_exhausts_when_no_terminal_recorded()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
completed(2, 0)?,
child_started(3, 1)?,
])?;
cursor.fast_forward_to_child_terminal(&child_id(1));
assert_eq!(
cursor.resolve_child_terminal(&child_id(1)),
ChildTerminalResolveResult::Exhausted
);
Ok(())
}
#[test]
fn resolve_child_terminal_reports_mismatch_without_skipping_in_strict_replay()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![scheduled(1, 0)?, child_completed(2, 1)?])?;
let result = cursor.resolve_child_terminal(&child_id(1));
match result {
ChildTerminalResolveResult::Mismatch { found } => {
assert_eq!(found.seq, 1);
assert_eq!(found.family, Some(RecordedEventFamily::Activity));
}
ChildTerminalResolveResult::Matched(_) | ChildTerminalResolveResult::Exhausted => {
return Err("strict replay must not skip an unconsumed recorded command".into());
}
}
Ok(())
}
fn child_failed(seq: u64, child: u128) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ChildWorkflowFailed {
envelope: envelope(seq)?,
child_workflow_id: child_id(child),
error: aion_core::WorkflowError {
message: "child failed".to_owned(),
details: None,
},
})
}
#[test]
fn resolve_activity_skips_interleaved_signal_and_leaves_it_matchable()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
signal_received(2, "mid")?,
completed(3, 0)?,
])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert!(matches!(
events.first(),
Some(Event::ActivityScheduled { .. })
));
assert!(matches!(
events.last(),
Some(Event::ActivityCompleted { activity_id, .. })
if activity_id.sequence_position() == 0
));
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err(
"an async signal arrival inside the activity range must not fail replay".into(),
);
}
}
let signal = cursor.resolve_next(
RecordedEventFamily::Signal,
CorrelationKey::Signal {
name: "mid".to_owned(),
index: 0,
},
);
match signal {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 1);
assert!(matches!(events.first(), Some(Event::SignalReceived { .. })));
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("the skipped signal must stay matchable for its own command".into());
}
}
Ok(())
}
#[test]
fn resolve_activity_resolves_interleaved_parallel_activity_ranges()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
scheduled(2, 1)?,
completed(3, 1)?,
completed(4, 0)?,
])?;
let first = cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match first {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert!(matches!(
events.last(),
Some(Event::ActivityCompleted { activity_id, .. })
if activity_id.sequence_position() == 0
));
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("a parallel activity's events inside the range must be skipped".into());
}
}
let second =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(1));
match second {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert!(matches!(
events.last(),
Some(Event::ActivityCompleted { activity_id, .. })
if activity_id.sequence_position() == 1
));
assert_eq!(cursor.current_sequence(), None);
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("the interleaved activity must remain resolvable afterwards".into());
}
}
Ok(())
}
#[test]
fn resolve_activity_skips_interleaved_child_terminal() -> Result<(), Box<dyn std::error::Error>>
{
let mut cursor = HistoryCursor::new(vec![
child_started(1, 7)?,
scheduled(2, 0)?,
child_completed(3, 7)?,
child_failed(4, 9)?,
completed(5, 0)?,
])?;
cursor.fast_forward_to_key(&CorrelationKey::Activity(0));
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 2);
assert!(matches!(
events.last(),
Some(Event::ActivityCompleted { activity_id, .. })
if activity_id.sequence_position() == 0
));
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("child terminals inside the activity range must be skipped".into());
}
}
Ok(())
}
#[test]
fn resolve_activity_still_mismatches_on_wrong_anchor_key()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![scheduled(1, 1)?, completed(2, 1)?])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Mismatch {
expected_key,
found,
} => {
assert_eq!(expected_key, CorrelationKey::Activity(0));
assert_eq!(found.key, Some(CorrelationKey::Activity(1)));
}
CursorResolveResult::Matched(_) | CursorResolveResult::Exhausted => {
return Err("a wrong key at the Scheduled anchor must stay a mismatch".into());
}
}
Ok(())
}
#[test]
fn fast_forward_and_resolution_smoke_over_large_history()
-> Result<(), Box<dyn std::error::Error>> {
let count: u64 = 5_000;
let mut events = Vec::with_capacity(usize::try_from(count * 2)?);
for ordinal in 0..count {
events.push(scheduled(ordinal * 2 + 1, ordinal)?);
events.push(completed(ordinal * 2 + 2, ordinal)?);
}
let mut cursor = HistoryCursor::new(events)?;
for ordinal in 0..count {
let key = CorrelationKey::Activity(ordinal);
cursor.fast_forward_to_key(&key);
let result = cursor.resolve_next(RecordedEventFamily::Activity, key);
assert!(
matches!(result, CursorResolveResult::Matched(ref events) if events.len() == 2),
"ordinal {ordinal} failed to resolve in the large-history smoke"
);
}
assert_eq!(cursor.current_sequence(), None);
Ok(())
}
#[test]
fn returns_terminal_activity_failure_as_recorded_outcome()
-> Result<(), Box<dyn std::error::Error>> {
let mut cursor = HistoryCursor::new(vec![
scheduled(1, 0)?,
failed(2, 0, 1, ActivityErrorKind::Retryable)?,
failed(3, 0, 2, ActivityErrorKind::Terminal)?,
])?;
let result =
cursor.resolve_next(RecordedEventFamily::Activity, CorrelationKey::Activity(0));
match result {
CursorResolveResult::Matched(events) => {
assert_eq!(events.len(), 3);
assert!(matches!(
events.last(),
Some(Event::ActivityFailed { error, .. }) if error.kind == ActivityErrorKind::Terminal
));
assert_eq!(cursor.current_sequence(), None);
}
CursorResolveResult::Exhausted | CursorResolveResult::Mismatch { .. } => {
return Err("terminal failure should be the recorded outcome".into());
}
}
Ok(())
}
}