use aion_core::{Event, Payload, TimerId, WorkflowError, WorkflowId};
use crate::Pid;
use chrono::{DateTime, Utc};
use tokio::sync::oneshot;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct WorkflowProcessHandle {
pid: u64,
}
impl WorkflowProcessHandle {
#[must_use]
pub const fn new(pid: u64) -> Self {
Self { pid }
}
#[must_use]
pub const fn pid(self) -> u64 {
self.pid
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WorkflowResidency {
Resident(WorkflowProcessHandle),
NonResident,
Terminal,
Unknown,
}
pub type QueryReplySender = oneshot::Sender<crate::query::service::QueryResult>;
#[derive(Debug)]
pub enum WorkflowMailboxMessage {
TimerFired {
timer_id: TimerId,
fire_at: DateTime<Utc>,
},
SignalReceived {
name: String,
payload: Payload,
},
Query {
name: String,
payload: Payload,
reply_to: QueryReplySender,
},
ChildWorkflowCompleted {
child_workflow_id: WorkflowId,
correlation: u64,
result: Payload,
},
ChildWorkflowFailed {
child_workflow_id: WorkflowId,
correlation: u64,
error: WorkflowError,
},
ChildWorkflowCancelled {
child_workflow_id: WorkflowId,
correlation: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChildWorkflowSpawnRequest {
pub parent_workflow_id: WorkflowId,
pub child_workflow_id: WorkflowId,
pub workflow_type: String,
pub input: Payload,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChildWorkflowSpawnResult {
pub child_workflow_id: WorkflowId,
pub child_process: WorkflowProcessHandle,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TimerWheelEntry {
pub process: WorkflowProcessHandle,
pub timer_id: TimerId,
pub fire_at: DateTime<Utc>,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum EngineSeamError {
#[error("workflow {workflow_id} is not resident")]
NonResident {
workflow_id: WorkflowId,
},
#[error("workflow {workflow_id} is terminal")]
Terminal {
workflow_id: WorkflowId,
},
#[error("workflow {workflow_id} is unknown")]
UnknownWorkflow {
workflow_id: WorkflowId,
},
#[error("mailbox delivery failed: {reason}")]
Delivery {
reason: String,
},
#[error("child workflow spawn failed: {reason}")]
ChildSpawn {
reason: String,
},
#[error("timer wheel operation failed: {reason}")]
TimerWheel {
reason: String,
},
#[error("linked child termination failed: {reason}")]
ChildTermination {
reason: String,
},
#[error("workflow recorder failed: {reason}")]
Recorder {
reason: String,
},
}
pub trait EngineHandle: Send + Sync {
fn resolve_workflow(
&self,
workflow_id: &WorkflowId,
) -> Result<WorkflowResidency, EngineSeamError>;
fn deliver_workflow_message(
&self,
process: WorkflowProcessHandle,
message: WorkflowMailboxMessage,
) -> Result<(), EngineSeamError>;
fn spawn_child_workflow(
&self,
request: ChildWorkflowSpawnRequest,
) -> Result<ChildWorkflowSpawnResult, EngineSeamError>;
fn terminate_linked_child_workflow(
&self,
parent_workflow_id: &WorkflowId,
child_process: WorkflowProcessHandle,
correlation: u64,
) -> Result<(), EngineSeamError>;
fn terminate_linked_activity(
&self,
parent_workflow_id: &WorkflowId,
activity_process: Pid,
correlation: u64,
) -> Result<(), EngineSeamError>;
fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError>;
fn disarm_timer(
&self,
process: WorkflowProcessHandle,
timer_id: &TimerId,
) -> Result<(), EngineSeamError>;
fn record_workflow_event(
&self,
workflow_id: &WorkflowId,
event: Event,
) -> Result<(), EngineSeamError>;
}
#[cfg(test)]
pub(crate) mod test_support {
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};
use aion_store::{WritableEventStore, WriteToken};
use super::*;
#[derive(Clone, Debug, PartialEq)]
pub enum FakeEngineOperation {
Delivered {
process: WorkflowProcessHandle,
message: DeliveredWorkflowMessage,
},
ChildSpawnRequested(ChildWorkflowSpawnRequest),
TimerArmed(TimerWheelEntry),
TimerDisarmed {
process: WorkflowProcessHandle,
timer_id: TimerId,
},
LinkedChildWorkflowTerminated {
parent_workflow_id: WorkflowId,
child_process: WorkflowProcessHandle,
correlation: u64,
},
LinkedActivityTerminated {
parent_workflow_id: WorkflowId,
activity_process: Pid,
correlation: u64,
},
EventRecorded {
workflow_id: WorkflowId,
event: Event,
},
}
#[derive(Default)]
struct FakeEngineState {
residency: HashMap<WorkflowId, WorkflowResidency>,
delivered: Vec<(WorkflowProcessHandle, DeliveredWorkflowMessage)>,
delivery_responses: VecDeque<Result<(), EngineSeamError>>,
child_spawn_responses: VecDeque<Result<ChildWorkflowSpawnResult, EngineSeamError>>,
armed_timers: Vec<TimerWheelEntry>,
disarmed_timers: Vec<(WorkflowProcessHandle, TimerId)>,
recorded_events: Vec<(WorkflowId, Event)>,
operations: Vec<FakeEngineOperation>,
recorder_store: Option<Arc<dyn WritableEventStore>>,
record_responses: VecDeque<Result<(), EngineSeamError>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DeliveredWorkflowMessage {
TimerFired {
timer_id: TimerId,
fire_at: DateTime<Utc>,
},
SignalReceived { name: String, payload: Payload },
Query { name: String, payload: Payload },
ChildWorkflowCompleted {
child_workflow_id: WorkflowId,
correlation: u64,
result: Payload,
},
ChildWorkflowFailed {
child_workflow_id: WorkflowId,
correlation: u64,
error: WorkflowError,
},
ChildWorkflowCancelled {
child_workflow_id: WorkflowId,
correlation: u64,
},
}
impl DeliveredWorkflowMessage {
pub(crate) fn from_message(message: &WorkflowMailboxMessage) -> Self {
match message {
WorkflowMailboxMessage::TimerFired { timer_id, fire_at } => Self::TimerFired {
timer_id: timer_id.clone(),
fire_at: *fire_at,
},
WorkflowMailboxMessage::SignalReceived { name, payload } => Self::SignalReceived {
name: name.clone(),
payload: payload.clone(),
},
WorkflowMailboxMessage::Query {
name,
payload,
reply_to: _,
} => Self::Query {
name: name.clone(),
payload: payload.clone(),
},
WorkflowMailboxMessage::ChildWorkflowCompleted {
child_workflow_id,
correlation,
result,
} => Self::ChildWorkflowCompleted {
child_workflow_id: child_workflow_id.clone(),
correlation: *correlation,
result: result.clone(),
},
WorkflowMailboxMessage::ChildWorkflowFailed {
child_workflow_id,
correlation,
error,
} => Self::ChildWorkflowFailed {
child_workflow_id: child_workflow_id.clone(),
correlation: *correlation,
error: error.clone(),
},
WorkflowMailboxMessage::ChildWorkflowCancelled {
child_workflow_id,
correlation,
} => Self::ChildWorkflowCancelled {
child_workflow_id: child_workflow_id.clone(),
correlation: *correlation,
},
}
}
}
#[derive(Default)]
pub struct FakeEngineHandle {
state: Mutex<FakeEngineState>,
}
impl FakeEngineHandle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn recording_to(store: Arc<dyn WritableEventStore>) -> Self {
Self {
state: Mutex::new(FakeEngineState {
recorder_store: Some(store),
..FakeEngineState::default()
}),
}
}
pub fn set_residency(
&self,
workflow_id: WorkflowId,
residency: WorkflowResidency,
) -> Result<(), EngineSeamError> {
self.state()?.residency.insert(workflow_id, residency);
Ok(())
}
pub fn push_delivery_response(
&self,
response: Result<(), EngineSeamError>,
) -> Result<(), EngineSeamError> {
self.state()?.delivery_responses.push_back(response);
Ok(())
}
pub fn operations(&self) -> Result<Vec<FakeEngineOperation>, EngineSeamError> {
Ok(self.state()?.operations.clone())
}
pub fn delivered_messages(
&self,
) -> Result<Vec<(WorkflowProcessHandle, DeliveredWorkflowMessage)>, EngineSeamError>
{
Ok(self.state()?.delivered.clone())
}
pub fn armed_timers(&self) -> Result<Vec<TimerWheelEntry>, EngineSeamError> {
Ok(self.state()?.armed_timers.clone())
}
pub fn push_child_spawn_response(
&self,
response: Result<ChildWorkflowSpawnResult, EngineSeamError>,
) -> Result<(), EngineSeamError> {
self.state()?.child_spawn_responses.push_back(response);
Ok(())
}
pub fn recorded_events(&self) -> Result<Vec<(WorkflowId, Event)>, EngineSeamError> {
Ok(self.state()?.recorded_events.clone())
}
fn state(&self) -> Result<MutexGuard<'_, FakeEngineState>, EngineSeamError> {
self.state.lock().map_err(|_| EngineSeamError::Recorder {
reason: "fake engine state lock was poisoned".to_owned(),
})
}
}
impl EngineHandle for FakeEngineHandle {
fn resolve_workflow(
&self,
workflow_id: &WorkflowId,
) -> Result<WorkflowResidency, EngineSeamError> {
Ok(self
.state()?
.residency
.get(workflow_id)
.copied()
.unwrap_or(WorkflowResidency::Unknown))
}
fn deliver_workflow_message(
&self,
process: WorkflowProcessHandle,
message: WorkflowMailboxMessage,
) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
if let Some(response) = state.delivery_responses.pop_front() {
response?;
}
let delivered = DeliveredWorkflowMessage::from_message(&message);
state.delivered.push((process, delivered.clone()));
state.operations.push(FakeEngineOperation::Delivered {
process,
message: delivered,
});
Ok(())
}
fn spawn_child_workflow(
&self,
request: ChildWorkflowSpawnRequest,
) -> Result<ChildWorkflowSpawnResult, EngineSeamError> {
let mut state = self.state()?;
state
.operations
.push(FakeEngineOperation::ChildSpawnRequested(request.clone()));
if let Some(response) = state.child_spawn_responses.pop_front() {
response
} else {
Err(EngineSeamError::ChildSpawn {
reason: "fake child spawn response was not queued".to_owned(),
})
}
}
fn terminate_linked_child_workflow(
&self,
parent_workflow_id: &WorkflowId,
child_process: WorkflowProcessHandle,
correlation: u64,
) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
state
.operations
.push(FakeEngineOperation::LinkedChildWorkflowTerminated {
parent_workflow_id: parent_workflow_id.clone(),
child_process,
correlation,
});
Ok(())
}
fn terminate_linked_activity(
&self,
parent_workflow_id: &WorkflowId,
activity_process: Pid,
correlation: u64,
) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
state
.operations
.push(FakeEngineOperation::LinkedActivityTerminated {
parent_workflow_id: parent_workflow_id.clone(),
activity_process,
correlation,
});
Ok(())
}
fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
state.armed_timers.push(entry.clone());
state
.operations
.push(FakeEngineOperation::TimerArmed(entry));
Ok(())
}
fn disarm_timer(
&self,
process: WorkflowProcessHandle,
timer_id: &TimerId,
) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
state
.armed_timers
.retain(|entry| !(entry.process == process && &entry.timer_id == timer_id));
state.disarmed_timers.push((process, timer_id.clone()));
state.operations.push(FakeEngineOperation::TimerDisarmed {
process,
timer_id: timer_id.clone(),
});
Ok(())
}
fn record_workflow_event(
&self,
workflow_id: &WorkflowId,
event: Event,
) -> Result<(), EngineSeamError> {
let mut state = self.state()?;
if let Some(response) = state.record_responses.pop_front() {
response?;
}
state
.recorded_events
.push((workflow_id.clone(), event.clone()));
let recorder_store = state.recorder_store.clone();
state.operations.push(FakeEngineOperation::EventRecorded {
workflow_id: workflow_id.clone(),
event: event.clone(),
});
drop(state);
if let Some(store) = recorder_store {
let expected_seq = event.seq().saturating_sub(1);
futures::executor::block_on(store.append(
WriteToken::recorder(),
workflow_id,
&[event],
expected_seq,
))
.map_err(|error| EngineSeamError::Recorder {
reason: error.to_string(),
})?;
}
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use aion_core::{ContentType, Payload, WorkflowId};
use super::test_support::{DeliveredWorkflowMessage, FakeEngineHandle};
use super::{
EngineHandle, EngineSeamError, WorkflowMailboxMessage, WorkflowProcessHandle,
WorkflowResidency,
};
#[test]
fn fake_captures_delivered_message_for_resident_workflow()
-> Result<(), Box<dyn std::error::Error>> {
let engine = FakeEngineHandle::new();
let workflow_id = WorkflowId::new_v4();
let process = WorkflowProcessHandle::new(42);
engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;
let resolved = engine.resolve_workflow(&workflow_id)?;
assert_eq!(resolved, WorkflowResidency::Resident(process));
let payload = Payload::new(ContentType::Json, b"null".to_vec());
let message = WorkflowMailboxMessage::SignalReceived {
name: "wake".to_owned(),
payload: payload.clone(),
};
engine.deliver_workflow_message(process, message)?;
assert_eq!(
engine.delivered_messages()?,
vec![(
process,
DeliveredWorkflowMessage::SignalReceived {
name: "wake".to_owned(),
payload,
}
)]
);
Ok(())
}
#[test]
fn fake_can_inject_delivery_failure() -> Result<(), Box<dyn std::error::Error>> {
let engine = FakeEngineHandle::new();
let process = WorkflowProcessHandle::new(43);
engine.push_delivery_response(Err(EngineSeamError::Delivery {
reason: "mailbox unavailable".to_owned(),
}))?;
let error = engine
.deliver_workflow_message(
process,
WorkflowMailboxMessage::SignalReceived {
name: "wake".to_owned(),
payload: Payload::new(ContentType::Json, b"null".to_vec()),
},
)
.err()
.ok_or_else(|| std::io::Error::other("delivery failure was not returned"))?;
assert!(matches!(error, EngineSeamError::Delivery { .. }));
assert!(engine.delivered_messages()?.is_empty());
assert!(engine.operations()?.is_empty());
Ok(())
}
}