use std::time::Duration;
use nonempty::NonEmpty;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use time::OffsetDateTime;
use crate::Timer;
pub trait Workflow {
type State: Default;
type Input: HasWorkflowId + Serialize + DeserializeOwned;
type Event: Serialize + DeserializeOwned + Clone + Send;
type Effect: Serialize + Send;
type Rejection: Serialize + Send + std::fmt::Debug;
const TYPE: &'static str;
fn evolve(state: Self::State, event: Self::Event) -> Self::State;
fn decide(
now: OffsetDateTime,
state: &Self::State,
input: &Self::Input,
) -> Decision<Self::Event, Self::Effect, Self::Input, Self::Rejection>;
fn is_terminal(_state: &Self::State) -> bool {
false
}
fn unique_key(_input: &Self::Input) -> Option<String> {
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Never {}
pub trait HasWorkflowId {
fn workflow_id(&self) -> WorkflowId;
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct WorkflowId(String);
impl WorkflowId {
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn into_inner(self) -> String {
self.0
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for WorkflowId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl From<String> for WorkflowId {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for WorkflowId {
fn from(s: &str) -> Self {
Self(s.to_owned())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WorkflowRef {
workflow_type: String,
workflow_id: WorkflowId,
}
impl WorkflowRef {
pub fn new(workflow_type: impl Into<String>, workflow_id: impl Into<WorkflowId>) -> Self {
Self {
workflow_type: workflow_type.into(),
workflow_id: workflow_id.into(),
}
}
pub fn workflow_type(&self) -> &str {
&self.workflow_type
}
pub fn workflow_id(&self) -> &WorkflowId {
&self.workflow_id
}
pub fn into_workflow_id(self) -> WorkflowId {
self.workflow_id
}
}
impl std::fmt::Display for WorkflowRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.workflow_type, self.workflow_id)
}
}
impl<S: Into<String>> From<(&'static str, S)> for WorkflowRef {
fn from((workflow_type, workflow_id): (&'static str, S)) -> Self {
Self::new(workflow_type, workflow_id.into())
}
}
impl From<(String, WorkflowId)> for WorkflowRef {
fn from((workflow_type, workflow_id): (String, WorkflowId)) -> Self {
Self {
workflow_type,
workflow_id,
}
}
}
#[derive(Debug, Clone)]
pub enum Decision<E, F, I, R> {
Accept {
events: NonEmpty<E>,
effects: Vec<F>,
timers: Vec<Timer<I>>,
cancel_timers: Vec<String>,
},
Reject(R),
}
impl<E, F, I, R> Decision<E, F, I, R> {
pub fn accept(event: E) -> Self {
Self::Accept {
events: NonEmpty::new(event),
effects: Vec::new(),
timers: Vec::new(),
cancel_timers: Vec::new(),
}
}
pub fn accept_events(events: NonEmpty<E>) -> Self {
Self::Accept {
events,
effects: Vec::new(),
timers: Vec::new(),
cancel_timers: Vec::new(),
}
}
pub fn try_accept(events: impl IntoIterator<Item = E>) -> Option<Self> {
NonEmpty::collect(events).map(Self::accept_events)
}
pub fn reject(reason: R) -> Self {
Self::Reject(reason)
}
pub fn with_effect(mut self, effect: F) -> Self {
if let Self::Accept { effects, .. } = &mut self {
effects.push(effect);
}
self
}
pub fn with_effects(mut self, effects_in: impl IntoIterator<Item = F>) -> Self {
if let Self::Accept { effects, .. } = &mut self {
effects.extend(effects_in);
}
self
}
pub fn with_timer(mut self, timer: Timer<I>) -> Self {
if let Self::Accept { timers, .. } = &mut self {
timers.push(timer);
}
self
}
pub fn with_timer_after(self, delay: Duration, input: I) -> Self {
self.with_timer(Timer::after(delay, input))
}
pub fn with_timers(mut self, timers_in: impl IntoIterator<Item = Timer<I>>) -> Self {
if let Self::Accept { timers, .. } = &mut self {
timers.extend(timers_in);
}
self
}
pub fn cancel_timer(mut self, key: impl Into<String>) -> Self {
if let Self::Accept { cancel_timers, .. } = &mut self {
cancel_timers.push(key.into());
}
self
}
pub fn cancel_timers(mut self, keys: impl IntoIterator<Item = String>) -> Self {
if let Self::Accept { cancel_timers, .. } = &mut self {
cancel_timers.extend(keys);
}
self
}
pub fn is_accept(&self) -> bool {
matches!(self, Self::Accept { .. })
}
pub fn is_reject(&self) -> bool {
matches!(self, Self::Reject(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
type D<E, F, I> = Decision<E, F, I, &'static str>;
fn unwrap_accept<E, F, I, R>(
d: Decision<E, F, I, R>,
) -> (NonEmpty<E>, Vec<F>, Vec<Timer<I>>, Vec<String>) {
match d {
Decision::Accept {
events,
effects,
timers,
cancel_timers,
} => (events, effects, timers, cancel_timers),
Decision::Reject(_) => panic!("expected Accept, got Reject"),
}
}
#[test]
fn decision_single_event() {
let decision = D::<&str, i32, ()>::accept("created")
.with_effect(1)
.with_effect(2);
let (events, effects, timers, _) = unwrap_accept(decision);
assert_eq!(events.len(), 1);
assert_eq!(events.first(), &"created");
assert_eq!(effects, vec![1, 2]);
assert!(timers.is_empty());
}
#[test]
fn decision_accept_events() {
let events = NonEmpty::collect(["a", "b", "c"]).unwrap();
let decision = D::<&str, (), ()>::accept_events(events);
let (events, effects, timers, _) = unwrap_accept(decision);
let collected: Vec<_> = events.iter().copied().collect();
assert_eq!(collected, vec!["a", "b", "c"]);
assert!(effects.is_empty());
assert!(timers.is_empty());
}
#[test]
fn decision_try_accept_some() {
let decision: Option<D<&str, (), ()>> = Decision::try_accept(["a", "b"]);
assert!(decision.is_some_and(|d| d.is_accept()));
}
#[test]
fn decision_try_accept_none() {
let decision: Option<D<&str, (), ()>> = Decision::try_accept(std::iter::empty());
assert!(decision.is_none());
}
#[test]
fn decision_with_effects_batch() {
let decision = D::<&str, i32, ()>::accept("created").with_effects([1, 2, 3]);
let (_, effects, _, _) = unwrap_accept(decision);
assert_eq!(effects, vec![1, 2, 3]);
}
#[test]
fn decision_with_timer() {
let decision = D::<&str, (), &str>::accept("created")
.with_timer_after(Duration::from_secs(60), "timeout");
let (_, _, timers, _) = unwrap_accept(decision);
assert_eq!(timers.len(), 1);
assert_eq!(timers[0].input, "timeout");
}
#[test]
fn decision_with_timer_after_carries_delay() {
let delay = Duration::from_secs(3600);
let decision = D::<&str, (), &str>::accept("created").with_timer_after(delay, "reminder");
let (_, _, timers, _) = unwrap_accept(decision);
assert_eq!(timers.len(), 1);
assert_eq!(timers[0].input, "reminder");
assert_eq!(timers[0].delay, delay);
}
#[test]
fn decision_with_timer_direct() {
let timer = Timer::after(Duration::from_secs(60), "timeout").with_key("my-timer");
let decision = D::<&str, (), &str>::accept("created").with_timer(timer);
let (_, _, timers, _) = unwrap_accept(decision);
assert_eq!(timers.len(), 1);
assert_eq!(timers[0].key.as_deref(), Some("my-timer"));
}
#[test]
fn decision_with_timers_batch() {
let timers_in = vec![
Timer::after(Duration::from_secs(60), "t1"),
Timer::after(Duration::from_secs(120), "t2"),
];
let decision = D::<&str, (), &str>::accept("created").with_timers(timers_in);
let (_, _, timers, _) = unwrap_accept(decision);
assert_eq!(timers.len(), 2);
}
#[test]
fn decision_with_multiple_timers() {
let decision = D::<&str, (), &str>::accept("created")
.with_timer_after(Duration::from_secs(60), "timeout1")
.with_timer_after(Duration::from_secs(120), "timeout2");
let (_, _, timers, _) = unwrap_accept(decision);
assert_eq!(timers.len(), 2);
}
#[test]
fn decision_cancel_timer() {
let decision = D::<&str, (), ()>::accept("completed").cancel_timer("payment-timeout");
let (_, _, _, cancel_timers) = unwrap_accept(decision);
assert_eq!(cancel_timers, vec!["payment-timeout".to_string()]);
}
#[test]
fn decision_cancel_timers_batch() {
let decision = D::<&str, (), ()>::accept("completed")
.cancel_timers(["timer-1".to_string(), "timer-2".to_string()]);
let (_, _, _, cancel_timers) = unwrap_accept(decision);
assert_eq!(cancel_timers.len(), 2);
}
#[test]
fn decision_with_effect_and_timer() {
let decision = D::<&str, &str, &str>::accept("created")
.with_effect("send_email")
.with_timer_after(Duration::from_secs(60), "timeout");
let (_, effects, timers, _) = unwrap_accept(decision);
assert_eq!(effects.len(), 1);
assert_eq!(timers.len(), 1);
}
#[test]
fn decision_reject_carries_payload() {
let decision: Decision<&str, (), (), &'static str> = Decision::reject("nope");
assert!(decision.is_reject());
match decision {
Decision::Reject(reason) => assert_eq!(reason, "nope"),
Decision::Accept { .. } => panic!("expected Reject"),
}
}
#[test]
fn builder_methods_are_noop_on_reject() {
let decision: Decision<&str, i32, (), &'static str> = Decision::reject("nope")
.with_effect(1)
.with_timer(Timer::after(Duration::from_secs(1), ()))
.cancel_timer("k");
assert!(decision.is_reject());
}
#[test]
fn workflow_id_new() {
let id = WorkflowId::new("order-123");
assert_eq!(id.as_str(), "order-123");
assert_eq!(format!("{}", id), "order-123");
}
#[test]
fn workflow_id_into_inner() {
let id = WorkflowId::new("order-123");
assert_eq!(id.into_inner(), "order-123");
}
#[test]
fn workflow_id_from_string() {
let id: WorkflowId = String::from("order-456").into();
assert_eq!(id.as_str(), "order-456");
}
#[test]
fn workflow_id_from_str() {
let id: WorkflowId = "order-789".into();
assert_eq!(id.as_str(), "order-789");
}
#[test]
fn workflow_id_equality() {
let id1 = WorkflowId::new("same");
let id2 = WorkflowId::new("same");
let id3 = WorkflowId::new("different");
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
#[test]
fn workflow_ref_new() {
let wf = WorkflowRef::new("order", "ord-123");
assert_eq!(wf.workflow_type(), "order");
assert_eq!(wf.workflow_id().as_str(), "ord-123");
}
#[test]
fn workflow_ref_display() {
let wf = WorkflowRef::new("order", "ord-123");
assert_eq!(format!("{}", wf), "order:ord-123");
}
#[test]
fn workflow_ref_into_workflow_id() {
let wf = WorkflowRef::new("order", "ord-123");
let id = wf.into_workflow_id();
assert_eq!(id.as_str(), "ord-123");
}
#[test]
fn workflow_ref_from_tuple_static_str() {
let wf: WorkflowRef = ("order", "ord-123").into();
assert_eq!(wf.workflow_type(), "order");
assert_eq!(wf.workflow_id().as_str(), "ord-123");
}
#[test]
fn workflow_ref_from_tuple_string_id() {
let wf: WorkflowRef = (String::from("order"), WorkflowId::new("ord-123")).into();
assert_eq!(wf.workflow_type(), "order");
assert_eq!(wf.workflow_id().as_str(), "ord-123");
}
#[test]
fn workflow_ref_equality() {
let wf1 = WorkflowRef::new("order", "ord-123");
let wf2 = WorkflowRef::new("order", "ord-123");
let wf3 = WorkflowRef::new("order", "ord-456");
let wf4 = WorkflowRef::new("inventory", "ord-123");
assert_eq!(wf1, wf2);
assert_ne!(wf1, wf3); assert_ne!(wf1, wf4); }
}