rex/
builder.rs

1use std::{sync::Arc, time::Duration};
2
3use bigerror::{ConversionError, Report};
4use tokio::{
5    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
6    task::JoinSet,
7};
8
9use crate::{
10    ingress::{BoxedStateRouter, Ingress, IngressAdapter, PacketRouter},
11    manager::{BoxedStateMachine, EmptyContext},
12    notification::NotificationQueue,
13    timeout::{self, Timeout, TimeoutManager, TimeoutMessage},
14    NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, StateMachine,
15    StateMachineManager,
16};
17
18pub struct RexBuilder<K, In = (), Out = ()>
19where
20    K: Rex,
21{
22    signal_queue: SignalQueue<K>,
23    notification_queue: NotificationQueue<K::Message>,
24    state_machines: Vec<BoxedStateMachine<K>>,
25    notification_processors: Vec<Box<dyn NotificationProcessor<K::Message>>>,
26    timeout_topic: Option<<K::Message as RexMessage>::Topic>,
27    tick_rate: Option<Duration>,
28    outbound_tx: Option<UnboundedSender<Out>>,
29    ingress_channel: Option<(UnboundedSender<In>, UnboundedReceiver<In>)>,
30}
31
32// context used before calling build
33pub struct BuilderContext<K: Rex> {
34    pub signal_queue: SignalQueue<K>,
35    pub notification_queue: NotificationQueue<K::Message>,
36}
37
38impl<K: Rex> RexBuilder<K, (), ()> {
39    #[must_use]
40    pub fn new() -> Self {
41        Self::default()
42    }
43}
44
45impl<K, In, Out> RexBuilder<K, In, Out>
46where
47    K: Rex + Timeout,
48    K::Message: TimeoutMessage<K>,
49    TimeoutManager<K>: NotificationProcessor<K::Message>,
50{
51    pub fn ctx(&self) -> BuilderContext<K> {
52        BuilderContext {
53            signal_queue: self.signal_queue.clone(),
54            notification_queue: self.notification_queue.clone(),
55        }
56    }
57
58    #[must_use]
59    pub fn with_sm<SM: StateMachine<K> + 'static>(mut self, state_machine: SM) -> Self {
60        self.state_machines.push(Box::new(state_machine));
61        self
62    }
63
64    #[must_use]
65    pub fn with_np<NP: NotificationProcessor<K::Message> + 'static>(
66        mut self,
67        processor: NP,
68    ) -> Self {
69        self.push_np(processor);
70        self
71    }
72
73    pub fn push_np<NP: NotificationProcessor<K::Message> + 'static>(&mut self, processor: NP) {
74        self.notification_processors.push(Box::new(processor));
75    }
76
77    #[must_use]
78    pub fn with_ctx_np<NP: NotificationProcessor<K::Message> + 'static>(
79        mut self,
80        op: impl FnOnce(BuilderContext<K>) -> NP,
81    ) -> Self {
82        self.notification_processors.push(Box::new(op(self.ctx())));
83        self
84    }
85
86    pub fn push_ctx_np<NP: NotificationProcessor<K::Message> + 'static>(
87        &mut self,
88        op: impl FnOnce(BuilderContext<K>) -> NP,
89    ) {
90        self.notification_processors.push(Box::new(op(self.ctx())));
91    }
92
93    #[must_use]
94    pub fn with_boxed_np(mut self, processor: Box<dyn NotificationProcessor<K::Message>>) -> Self {
95        self.notification_processors.push(processor);
96        self
97    }
98
99    #[must_use]
100    pub const fn with_timeout_manager(
101        mut self,
102        timeout_topic: <K::Message as RexMessage>::Topic,
103    ) -> Self {
104        self.timeout_topic = Some(timeout_topic);
105        self
106    }
107
108    #[must_use]
109    pub const fn with_tick_rate(mut self, tick_rate: Duration) -> Self {
110        self.tick_rate = Some(tick_rate);
111        self
112    }
113
114    fn build_timeout_manager(&mut self) {
115        if let Some(topic) = self.timeout_topic {
116            let timeout_manager = TimeoutManager::new(self.signal_queue.clone(), topic)
117                .with_tick_rate(self.tick_rate.unwrap_or(timeout::DEFAULT_TICK_RATE));
118            self.notification_processors.push(Box::new(timeout_manager));
119        }
120    }
121
122    fn build_inner(mut self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
123        self.build_timeout_manager();
124
125        if !self.notification_processors.is_empty() {
126            NotificationManager::new(
127                self.notification_processors,
128                join_set,
129                self.notification_queue.clone(),
130            )
131            .init(join_set);
132        }
133        let sm_manager = StateMachineManager::new(
134            self.state_machines,
135            self.signal_queue,
136            self.notification_queue.clone(),
137        );
138
139        sm_manager.init(join_set);
140        sm_manager.ctx_builder()
141    }
142
143    pub fn build(self) -> EmptyContext<K> {
144        let mut join_set = JoinSet::new();
145        let ctx = self.build_inner(&mut join_set);
146        join_set.detach_all();
147
148        ctx
149    }
150
151    pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
152        self.build_inner(join_set)
153    }
154}
155
156impl<K> RexBuilder<K, K::In, K::Out>
157where
158    K: Rex + Timeout + Ingress,
159    K::Message: TimeoutMessage<K> + TryInto<K::Out, Error = Report<ConversionError>>,
160    K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
161    TimeoutManager<K>: NotificationProcessor<K::Message>,
162{
163    #[must_use]
164    pub fn new_connected(outbound_tx: UnboundedSender<K::Out>) -> (UnboundedSender<K::In>, Self) {
165        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();
166        (
167            inbound_tx.clone(),
168            Self {
169                outbound_tx: Some(outbound_tx),
170                ingress_channel: Some((inbound_tx, inbound_rx)),
171                ..Default::default()
172            },
173        )
174    }
175
176    #[must_use]
177    pub fn ingress_tx(&self) -> UnboundedSender<K::In> {
178        self.ingress_channel
179            .as_ref()
180            .map(|(tx, _)| tx.clone())
181            .expect("ingress_channel uninitialized")
182    }
183
184    #[must_use]
185    pub fn with_ingress_adapter(
186        mut self,
187        state_routers: Vec<BoxedStateRouter<K, K::In>>,
188        ingress_topic: <K::Message as RexMessage>::Topic,
189    ) -> Self {
190        assert!(!state_routers.is_empty());
191        let (tx, rx) = self
192            .ingress_channel
193            .take()
194            .expect("ingress_channel uninitialized");
195        let outbound_tx = self
196            .outbound_tx
197            .clone()
198            .expect("builder outbound_tx uninitialized");
199
200        let ingress_adapter = IngressAdapter {
201            signal_queue: self.signal_queue.clone(),
202            outbound_tx,
203            router: PacketRouter::new(state_routers),
204            inbound_tx: tx,
205            inbound_rx: Some(rx),
206            topic: ingress_topic,
207        };
208        self.with_np(ingress_adapter)
209    }
210}
211
212impl<K, In, Out> Default for RexBuilder<K, In, Out>
213where
214    K: Rex,
215{
216    fn default() -> Self {
217        Self {
218            notification_queue: NotificationQueue::new(),
219            signal_queue: Arc::default(),
220            state_machines: Vec::default(),
221            notification_processors: Vec::default(),
222            timeout_topic: None,
223            tick_rate: None,
224            outbound_tx: None,
225            ingress_channel: None,
226        }
227    }
228}