Skip to main content

atomr_patterns/process_manager/
runner.rs

1//! Process Manager implementation.
2
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use atomr_core::actor::ActorSystem;
9use tokio::sync::mpsc::UnboundedReceiver;
10
11use crate::topology::Topology;
12use crate::PatternError;
13
14/// What a process manager does in response to an event.
15pub enum Transition<S, C> {
16    /// Don't change state; dispatch no commands.
17    Stay,
18    /// Move to `next` state; dispatch `commands` in order.
19    Goto { next: S, commands: Vec<C> },
20    /// Terminal state — clear correlation, dispatch any final
21    /// commands.
22    Complete { commands: Vec<C> },
23}
24
25/// Typed state-machine process manager.
26pub trait ProcessManager: Send + 'static {
27    type Event: Send + Clone + 'static;
28    type Command: Send + 'static;
29    type State: Clone + Send + Default + 'static;
30    type Error: std::error::Error + Send + 'static;
31
32    fn correlation_id(event: &Self::Event) -> Option<String>;
33
34    fn transition(
35        state: &Self::State,
36        event: Self::Event,
37    ) -> Result<Transition<Self::State, Self::Command>, Self::Error>;
38}
39
40pub struct ProcessManagerPattern<P>(PhantomData<P>);
41
42impl<P: ProcessManager> ProcessManagerPattern<P> {
43    pub fn builder() -> ProcessManagerBuilder<P> {
44        ProcessManagerBuilder { name: None, events: None, dispatcher: None }
45    }
46}
47
48type DispatcherFn<C> = Arc<dyn Fn(C) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
49
50pub struct ProcessManagerBuilder<P: ProcessManager> {
51    name: Option<String>,
52    events: Option<UnboundedReceiver<P::Event>>,
53    dispatcher: Option<DispatcherFn<P::Command>>,
54}
55
56impl<P: ProcessManager> ProcessManagerBuilder<P> {
57    pub fn name(mut self, n: impl Into<String>) -> Self {
58        self.name = Some(n.into());
59        self
60    }
61    pub fn events(mut self, rx: UnboundedReceiver<P::Event>) -> Self {
62        self.events = Some(rx);
63        self
64    }
65    pub fn dispatcher<F, Fut>(mut self, f: F) -> Self
66    where
67        F: Fn(P::Command) -> Fut + Send + Sync + 'static,
68        Fut: std::future::Future<Output = bool> + Send + 'static,
69    {
70        let f = Arc::new(f);
71        self.dispatcher = Some(Arc::new(move |c| {
72            let f = f.clone();
73            Box::pin(async move { f(c).await })
74        }));
75        self
76    }
77    pub fn build(self) -> Result<ProcessManagerTopology<P>, PatternError<P::Error>> {
78        Ok(ProcessManagerTopology {
79            name: self.name.unwrap_or_else(|| "process-manager".into()),
80            events: self.events.ok_or(PatternError::NotConfigured("events"))?,
81            dispatcher: self.dispatcher.ok_or(PatternError::NotConfigured("dispatcher"))?,
82        })
83    }
84}
85
86pub struct ProcessManagerTopology<P: ProcessManager> {
87    name: String,
88    events: UnboundedReceiver<P::Event>,
89    dispatcher: DispatcherFn<P::Command>,
90}
91
92pub struct ProcessManagerHandles {
93    pub name: String,
94}
95
96#[async_trait]
97impl<P: ProcessManager> Topology for ProcessManagerTopology<P> {
98    type Handles = ProcessManagerHandles;
99
100    async fn materialize(self, _system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
101        let ProcessManagerTopology { name, mut events, dispatcher } = self;
102        let task_name = name.clone();
103        tokio::spawn(async move {
104            let mut states: HashMap<String, P::State> = HashMap::new();
105            while let Some(event) = events.recv().await {
106                let Some(corr) = P::correlation_id(&event) else {
107                    continue;
108                };
109                let state = states.entry(corr.clone()).or_default();
110                match P::transition(state, event) {
111                    Ok(Transition::Stay) => {}
112                    Ok(Transition::Goto { next, commands }) => {
113                        *state = next;
114                        for c in commands {
115                            let _ = (dispatcher)(c).await;
116                        }
117                    }
118                    Ok(Transition::Complete { commands }) => {
119                        for c in commands {
120                            let _ = (dispatcher)(c).await;
121                        }
122                        states.remove(&corr);
123                    }
124                    Err(e) => {
125                        tracing::warn!(pm = %task_name, error = %e, "transition failed");
126                    }
127                }
128            }
129        });
130        Ok(ProcessManagerHandles { name })
131    }
132}