use std::collections::HashMap;
use aion_core::{Event, TimerId};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum CorrelationKey {
Activity(
u64,
),
Child(
u64,
),
Timer(
TimerId,
),
Signal {
name: String,
index: usize,
},
}
#[must_use]
pub fn correlation_keys_for_history(events: &[Event]) -> Vec<Option<CorrelationKey>> {
let mut counters = OccurrenceCounters::default();
events
.iter()
.map(|event| key_for_event_with_counters(event, &mut counters))
.collect()
}
#[must_use]
pub fn key_for_event(events: &[Event], index: usize) -> Option<CorrelationKey> {
let event = events.get(index)?;
match event {
Event::SignalReceived { name, .. } | Event::SignalSent { name, .. } => {
let prior_same_name = events
.iter()
.take(index)
.filter(|prior| matches!(prior, Event::SignalReceived { name: prior_name, .. } | Event::SignalSent { name: prior_name, .. } if prior_name == name))
.count();
Some(CorrelationKey::Signal {
name: name.clone(),
index: prior_same_name,
})
}
Event::ChildWorkflowStarted { .. } => {
let prior_starts = events
.iter()
.take(index)
.filter(|prior| matches!(prior, Event::ChildWorkflowStarted { .. }))
.count();
u64::try_from(prior_starts).ok().map(CorrelationKey::Child)
}
_ => key_for_positionless_event(event),
}
}
#[derive(Default)]
struct OccurrenceCounters {
signal_counts: HashMap<String, usize>,
child_spawns: u64,
}
fn key_for_event_with_counters(
event: &Event,
counters: &mut OccurrenceCounters,
) -> Option<CorrelationKey> {
match event {
Event::SignalReceived { name, .. } | Event::SignalSent { name, .. } => {
let index = counters
.signal_counts
.get(name)
.copied()
.unwrap_or_default();
counters.signal_counts.insert(name.clone(), index + 1);
Some(CorrelationKey::Signal {
name: name.clone(),
index,
})
}
Event::ChildWorkflowStarted { .. } => {
let ordinal = counters.child_spawns;
counters.child_spawns += 1;
Some(CorrelationKey::Child(ordinal))
}
_ => key_for_positionless_event(event),
}
}
fn key_for_positionless_event(event: &Event) -> Option<CorrelationKey> {
match event {
Event::ActivityScheduled { activity_id, .. } => {
Some(CorrelationKey::Activity(activity_id.sequence_position()))
}
Event::TimerStarted { timer_id, .. } => Some(CorrelationKey::Timer(timer_id.clone())),
_ => None,
}
}
#[cfg(test)]
mod tests {
use aion_core::{Event, EventEnvelope, Payload, WorkflowId};
use chrono::Utc;
use serde_json::json;
use uuid::Uuid;
use super::{CorrelationKey, correlation_keys_for_history, key_for_event};
fn envelope(seq: u64) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at: Utc::now(),
workflow_id: WorkflowId::new(Uuid::nil()),
}
}
fn payload() -> Result<Payload, Box<dyn std::error::Error>> {
Ok(Payload::from_json(&json!(null))?)
}
#[test]
fn derives_signal_occurrence_indices_by_name() -> Result<(), Box<dyn std::error::Error>> {
let history = vec![
Event::SignalReceived {
envelope: envelope(1),
name: "ready".to_owned(),
payload: payload()?,
},
Event::SignalReceived {
envelope: envelope(2),
name: "other".to_owned(),
payload: payload()?,
},
Event::SignalReceived {
envelope: envelope(3),
name: "ready".to_owned(),
payload: payload()?,
},
];
let keys = correlation_keys_for_history(&history);
assert_eq!(
keys,
vec![
Some(CorrelationKey::Signal {
name: "ready".to_owned(),
index: 0,
}),
Some(CorrelationKey::Signal {
name: "other".to_owned(),
index: 0,
}),
Some(CorrelationKey::Signal {
name: "ready".to_owned(),
index: 1,
}),
]
);
Ok(())
}
fn child_started(seq: u64, child: u128) -> Result<Event, Box<dyn std::error::Error>> {
Ok(Event::ChildWorkflowStarted {
envelope: envelope(seq),
child_workflow_id: WorkflowId::new(Uuid::from_u128(child)),
workflow_type: "child".to_owned(),
input: payload()?,
})
}
#[test]
fn derives_positional_child_ordinals_independent_of_sequence_numbers()
-> Result<(), Box<dyn std::error::Error>> {
let history = vec![child_started(41, 1)?, child_started(97, 2)?];
let keys = correlation_keys_for_history(&history);
assert_eq!(
keys,
vec![
Some(CorrelationKey::Child(0)),
Some(CorrelationKey::Child(1)),
]
);
assert_eq!(key_for_event(&history, 0), Some(CorrelationKey::Child(0)));
assert_eq!(key_for_event(&history, 1), Some(CorrelationKey::Child(1)));
Ok(())
}
#[test]
fn interleaved_async_arrivals_do_not_shift_child_ordinals()
-> Result<(), Box<dyn std::error::Error>> {
let history = vec![
child_started(1, 1)?,
Event::SignalReceived {
envelope: envelope(2),
name: "mid".to_owned(),
payload: payload()?,
},
Event::ChildWorkflowCompleted {
envelope: envelope(3),
child_workflow_id: WorkflowId::new(Uuid::from_u128(1)),
result: payload()?,
},
child_started(4, 2)?,
];
let keys = correlation_keys_for_history(&history);
assert_eq!(
keys,
vec![
Some(CorrelationKey::Child(0)),
Some(CorrelationKey::Signal {
name: "mid".to_owned(),
index: 0,
}),
None,
Some(CorrelationKey::Child(1)),
]
);
assert_eq!(key_for_event(&history, 3), Some(CorrelationKey::Child(1)));
Ok(())
}
}