atomr_patterns/process_manager/
runner.rs1use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use atomr_core::actor::ActorSystem;
9use tokio::sync::mpsc::UnboundedReceiver;
10
11use crate::topology::Topology;
12use crate::PatternError;
13
14pub enum Transition<S, C> {
16 Stay,
18 Goto { next: S, commands: Vec<C> },
20 Complete { commands: Vec<C> },
23}
24
25pub trait ProcessManager: Send + 'static {
27 type Event: Send + Clone + 'static;
28 type Command: Send + 'static;
29 type State: Clone + Send + Default + 'static;
30 type Error: std::error::Error + Send + 'static;
31
32 fn correlation_id(event: &Self::Event) -> Option<String>;
33
34 fn transition(
35 state: &Self::State,
36 event: Self::Event,
37 ) -> Result<Transition<Self::State, Self::Command>, Self::Error>;
38}
39
40pub struct ProcessManagerPattern<P>(PhantomData<P>);
41
42impl<P: ProcessManager> ProcessManagerPattern<P> {
43 pub fn builder() -> ProcessManagerBuilder<P> {
44 ProcessManagerBuilder { name: None, events: None, dispatcher: None }
45 }
46}
47
48type DispatcherFn<C> = Arc<dyn Fn(C) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
49
50pub struct ProcessManagerBuilder<P: ProcessManager> {
51 name: Option<String>,
52 events: Option<UnboundedReceiver<P::Event>>,
53 dispatcher: Option<DispatcherFn<P::Command>>,
54}
55
56impl<P: ProcessManager> ProcessManagerBuilder<P> {
57 pub fn name(mut self, n: impl Into<String>) -> Self {
58 self.name = Some(n.into());
59 self
60 }
61 pub fn events(mut self, rx: UnboundedReceiver<P::Event>) -> Self {
62 self.events = Some(rx);
63 self
64 }
65 pub fn dispatcher<F, Fut>(mut self, f: F) -> Self
66 where
67 F: Fn(P::Command) -> Fut + Send + Sync + 'static,
68 Fut: std::future::Future<Output = bool> + Send + 'static,
69 {
70 let f = Arc::new(f);
71 self.dispatcher = Some(Arc::new(move |c| {
72 let f = f.clone();
73 Box::pin(async move { f(c).await })
74 }));
75 self
76 }
77 pub fn build(self) -> Result<ProcessManagerTopology<P>, PatternError<P::Error>> {
78 Ok(ProcessManagerTopology {
79 name: self.name.unwrap_or_else(|| "process-manager".into()),
80 events: self.events.ok_or(PatternError::NotConfigured("events"))?,
81 dispatcher: self.dispatcher.ok_or(PatternError::NotConfigured("dispatcher"))?,
82 })
83 }
84}
85
86pub struct ProcessManagerTopology<P: ProcessManager> {
87 name: String,
88 events: UnboundedReceiver<P::Event>,
89 dispatcher: DispatcherFn<P::Command>,
90}
91
92pub struct ProcessManagerHandles {
93 pub name: String,
94}
95
96#[async_trait]
97impl<P: ProcessManager> Topology for ProcessManagerTopology<P> {
98 type Handles = ProcessManagerHandles;
99
100 async fn materialize(self, _system: &ActorSystem) -> Result<Self::Handles, PatternError<()>> {
101 let ProcessManagerTopology { name, mut events, dispatcher } = self;
102 let task_name = name.clone();
103 tokio::spawn(async move {
104 let mut states: HashMap<String, P::State> = HashMap::new();
105 while let Some(event) = events.recv().await {
106 let Some(corr) = P::correlation_id(&event) else {
107 continue;
108 };
109 let state = states.entry(corr.clone()).or_default();
110 match P::transition(state, event) {
111 Ok(Transition::Stay) => {}
112 Ok(Transition::Goto { next, commands }) => {
113 *state = next;
114 for c in commands {
115 let _ = (dispatcher)(c).await;
116 }
117 }
118 Ok(Transition::Complete { commands }) => {
119 for c in commands {
120 let _ = (dispatcher)(c).await;
121 }
122 states.remove(&corr);
123 }
124 Err(e) => {
125 tracing::warn!(pm = %task_name, error = %e, "transition failed");
126 }
127 }
128 }
129 });
130 Ok(ProcessManagerHandles { name })
131 }
132}