use crate::{
deadline::Deadline,
envelope::{EventEnvelope, NewEvent},
error::{EngineError, WorkflowError},
event_store::{EventStore, ExpectedVersion},
ids::{CausationId, ConversationId, CorrelationId, ProcessId, TenantId},
outbox::PendingOutbox,
version::{WorkflowId, WorkflowVersionPolicy},
};
#[derive(Debug, Clone)]
pub struct WorkflowOutput<E: EventPayload> {
pub events: Vec<E>,
pub outbox: Vec<PendingOutbox>,
}
impl<E: EventPayload> WorkflowOutput<E> {
#[must_use]
pub fn events(events: Vec<E>) -> Self {
Self {
events,
outbox: Vec::new(),
}
}
#[must_use]
pub fn with_outbox(events: Vec<E>, outbox: Vec<PendingOutbox>) -> Self {
Self { events, outbox }
}
}
impl<E: EventPayload> From<Vec<E>> for WorkflowOutput<E> {
fn from(events: Vec<E>) -> Self {
Self::events(events)
}
}
impl<E: EventPayload> std::ops::Deref for WorkflowOutput<E> {
type Target = [E];
fn deref(&self) -> &Self::Target {
&self.events
}
}
impl<E: EventPayload> IntoIterator for WorkflowOutput<E> {
type Item = E;
type IntoIter = std::vec::IntoIter<E>;
fn into_iter(self) -> Self::IntoIter {
self.events.into_iter()
}
}
impl<'a, E: EventPayload> IntoIterator for &'a WorkflowOutput<E> {
type Item = &'a E;
type IntoIter = std::slice::Iter<'a, E>;
fn into_iter(self) -> Self::IntoIter {
self.events.iter()
}
}
pub trait EventPayload:
serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
{
fn event_type(&self) -> &'static str;
fn schema_version(&self) -> u32 {
1
}
}
pub trait CommandPayload: Send + 'static {}
pub trait Workflow: Send + Sync + 'static {
type State: Default + Clone + Send + Sync + 'static;
type Event: EventPayload;
type Command: CommandPayload;
fn apply(state: Self::State, event: &Self::Event) -> Self::State;
fn handle(
state: &Self::State,
command: Self::Command,
) -> Result<WorkflowOutput<Self::Event>, WorkflowError>;
#[must_use]
fn state_schema_version() -> u32 {
1
}
fn upcast(
_event_type: &str,
_from_version: u32,
payload: serde_json::Value,
) -> Result<serde_json::Value, EngineError> {
Ok(payload)
}
#[must_use]
fn version_policy() -> WorkflowVersionPolicy {
WorkflowVersionPolicy::ForwardCompatible
}
fn on_deadline(_deadline: &Deadline, _state: &Self::State) -> Option<Self::Command> {
None
}
}
#[derive(Debug, Clone)]
pub struct CommandContext {
pub correlation_id: CorrelationId,
pub conversation_id: ConversationId,
pub process_id: ProcessId,
pub tenant_id: TenantId,
pub workflow_id: WorkflowId,
pub causation_id: Option<CausationId>,
}
impl CommandContext {
#[must_use]
pub fn new(tenant_id: TenantId, process_id: ProcessId, workflow_id: WorkflowId) -> Self {
Self {
correlation_id: CorrelationId::new(),
conversation_id: ConversationId::new(),
process_id,
tenant_id,
workflow_id,
causation_id: None,
}
}
#[must_use]
pub fn with_causation(mut self, id: CausationId) -> Self {
self.causation_id = Some(id);
self
}
#[must_use]
pub fn with_correlation(mut self, id: CorrelationId) -> Self {
self.correlation_id = id;
self
}
#[must_use]
pub fn with_conversation(mut self, id: ConversationId) -> Self {
self.conversation_id = id;
self
}
#[must_use]
pub fn from_envelope(env: &EventEnvelope, workflow_id: WorkflowId) -> Self {
Self {
correlation_id: env.correlation_id,
conversation_id: env.conversation_id,
process_id: env.process_id,
tenant_id: env.tenant_id,
workflow_id,
causation_id: Some(env.event_id.into()),
}
}
#[must_use]
pub fn from_deadline(deadline: &crate::deadline::Deadline, workflow_id: WorkflowId) -> Self {
Self::new(deadline.tenant_id(), deadline.process_id(), workflow_id)
}
pub fn new_event<E: EventPayload>(&self, event: &E) -> Result<NewEvent, EngineError> {
let payload =
serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
Ok(NewEvent {
correlation_id: self.correlation_id,
causation_id: self.causation_id,
conversation_id: self.conversation_id,
process_id: self.process_id,
tenant_id: self.tenant_id,
workflow_id: self.workflow_id.clone(),
event_type: event.event_type().into(),
schema_version: event.schema_version(),
payload,
})
}
}
impl EventEnvelope {
pub fn new_caused_event<E: EventPayload>(
&self,
workflow_id: WorkflowId,
event: &E,
) -> Result<NewEvent, EngineError> {
let payload =
serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
Ok(NewEvent {
correlation_id: self.correlation_id,
causation_id: Some(self.event_id.into()),
conversation_id: self.conversation_id,
process_id: self.process_id,
tenant_id: self.tenant_id,
workflow_id,
event_type: event.event_type().into(),
schema_version: event.schema_version(),
payload,
})
}
}
pub(crate) async fn execute_command<W, S>(
store: &S,
stream_id: &crate::ids::StreamId,
command: W::Command,
ctx: &CommandContext,
) -> Result<Vec<EventEnvelope>, EngineError>
where
W: Workflow,
S: EventStore,
{
execute_command_and_collect::<W, S>(store, stream_id, command, ctx)
.await
.map(|(envelopes, _outbox)| envelopes)
}
pub(crate) async fn execute_command_and_collect<W, S>(
store: &S,
stream_id: &crate::ids::StreamId,
command: W::Command,
ctx: &CommandContext,
) -> Result<(Vec<EventEnvelope>, Vec<PendingOutbox>), EngineError>
where
W: Workflow,
S: EventStore,
{
let (state, current_sequence) = store
.fold_stream(
stream_id,
0,
(W::State::default(), 0u64),
|(acc, _), env| {
let seq = env.sequence_number;
let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
let event: W::Event = serde_json::from_value(payload)
.map_err(|e| EngineError::Deserialization(e.to_string()))?;
Ok((W::apply(acc, &event), seq))
},
)
.await?;
let output = W::handle(&state, command)?;
if output.events.is_empty() {
return Ok((Vec::new(), output.outbox));
}
let new_events: Result<Vec<NewEvent>, EngineError> = output
.events
.iter()
.map(|event| ctx.new_event(event))
.collect();
let new_events = new_events?;
let result = store
.append(
stream_id,
ExpectedVersion::Exact(current_sequence),
&new_events,
)
.await?;
Ok((result.events, output.outbox))
}
pub(crate) async fn execute_command_atomic<W, S>(
store: &S,
stream_id: &crate::ids::StreamId,
command: W::Command,
ctx: &CommandContext,
) -> Result<Vec<EventEnvelope>, EngineError>
where
W: Workflow,
S: crate::event_store::AtomicAppend,
{
let (state, current_sequence) = store
.fold_stream(
stream_id,
0,
(W::State::default(), 0u64),
|(acc, _), env| {
let seq = env.sequence_number;
let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
let event: W::Event = serde_json::from_value(payload)
.map_err(|e| EngineError::Deserialization(e.to_string()))?;
Ok((W::apply(acc, &event), seq))
},
)
.await?;
let output = W::handle(&state, command)?;
if output.events.is_empty() {
return Ok(Vec::new());
}
let new_events: Result<Vec<NewEvent>, EngineError> = output
.events
.iter()
.map(|event| ctx.new_event(event))
.collect();
let new_events = new_events?;
let result = store
.append_with_outbox(
stream_id,
ExpectedVersion::Exact(current_sequence),
&new_events,
&output.outbox,
)
.await?;
Ok(result.events)
}
async fn reconstruct_with_snapshot<W, S, Snap>(
store: &S,
snap_store: &Snap,
stream_id: &crate::ids::StreamId,
) -> Result<(W::State, u64), EngineError>
where
W: Workflow,
W::State: serde::de::DeserializeOwned,
S: EventStore,
Snap: crate::snapshot::SnapshotStore,
{
let maybe_snap = snap_store.load(stream_id).await?;
let (initial_state, from_sequence) = match &maybe_snap {
Some(snap) if snap.state_schema_version == W::state_schema_version() => {
let state = serde_json::from_value::<W::State>(snap.state.clone())
.map_err(|e| EngineError::Deserialization(e.to_string()))?;
(state, snap.sequence_number)
}
#[allow(unused_variables)]
Some(snap) => {
#[cfg(feature = "tracing")]
tracing::warn!(
expected = W::state_schema_version(),
actual = snap.state_schema_version,
stream_id = %stream_id,
"snapshot schema version mismatch; falling back to full replay"
);
(W::State::default(), 0)
}
None => (W::State::default(), 0),
};
store
.fold_stream(
stream_id,
from_sequence,
(initial_state, from_sequence),
|(acc, _), env| {
let seq = env.sequence_number;
let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
let event: W::Event = serde_json::from_value(payload)
.map_err(|e| EngineError::Deserialization(e.to_string()))?;
Ok((W::apply(acc, &event), seq))
},
)
.await
}
pub(crate) async fn execute_command_with_snapshot<W, S, Snap>(
store: &S,
snap_store: &Snap,
stream_id: &crate::ids::StreamId,
command: W::Command,
ctx: &CommandContext,
) -> Result<Vec<EventEnvelope>, EngineError>
where
W: Workflow,
W::State: serde::de::DeserializeOwned,
S: EventStore,
Snap: crate::snapshot::SnapshotStore,
{
let (state, current_sequence) =
reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
let output = W::handle(&state, command)?;
if output.events.is_empty() {
return Ok(Vec::new());
}
let new_events: Result<Vec<NewEvent>, EngineError> = output
.events
.iter()
.map(|event| ctx.new_event(event))
.collect();
let new_events = new_events?;
let result = store
.append(
stream_id,
ExpectedVersion::Exact(current_sequence),
&new_events,
)
.await?;
Ok(result.events)
}
pub(crate) async fn execute_command_atomic_with_snapshot<W, S, Snap>(
store: &S,
snap_store: &Snap,
stream_id: &crate::ids::StreamId,
command: W::Command,
ctx: &CommandContext,
) -> Result<Vec<EventEnvelope>, EngineError>
where
W: Workflow,
W::State: serde::de::DeserializeOwned,
S: crate::event_store::AtomicAppend,
Snap: crate::snapshot::SnapshotStore,
{
let (state, current_sequence) =
reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
let output = W::handle(&state, command)?;
if output.events.is_empty() {
return Ok(Vec::new());
}
let new_events: Result<Vec<NewEvent>, EngineError> = output
.events
.iter()
.map(|event| ctx.new_event(event))
.collect();
let new_events = new_events?;
let result = store
.append_with_outbox(
stream_id,
ExpectedVersion::Exact(current_sequence),
&new_events,
&output.outbox,
)
.await?;
Ok(result.events)
}