1use std::{sync::Arc, time::Duration};
2
3use bigerror::{ConversionError, Report};
4use tokio::{
5 sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
6 task::JoinSet,
7};
8
9use crate::{
10 ingress::{BoxedStateRouter, Ingress, IngressAdapter, PacketRouter},
11 manager::{BoxedStateMachine, EmptyContext},
12 notification::NotificationQueue,
13 timeout::{self, Timeout, TimeoutManager, TimeoutMessage},
14 NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, StateMachine,
15 StateMachineManager,
16};
17
18pub struct RexBuilder<K, In = (), Out = ()>
19where
20 K: Rex,
21{
22 signal_queue: SignalQueue<K>,
23 notification_queue: NotificationQueue<K::Message>,
24 state_machines: Vec<BoxedStateMachine<K>>,
25 notification_processors: Vec<Box<dyn NotificationProcessor<K::Message>>>,
26 timeout_topic: Option<<K::Message as RexMessage>::Topic>,
27 tick_rate: Option<Duration>,
28 outbound_tx: Option<UnboundedSender<Out>>,
29 ingress_channel: Option<(UnboundedSender<In>, UnboundedReceiver<In>)>,
30}
31
32pub struct BuilderContext<K: Rex> {
34 pub signal_queue: SignalQueue<K>,
35 pub notification_queue: NotificationQueue<K::Message>,
36}
37
38impl<K: Rex> RexBuilder<K, (), ()> {
39 #[must_use]
40 pub fn new() -> Self {
41 Self::default()
42 }
43}
44
45impl<K, In, Out> RexBuilder<K, In, Out>
46where
47 K: Rex + Timeout,
48 K::Message: TimeoutMessage<K>,
49 TimeoutManager<K>: NotificationProcessor<K::Message>,
50{
51 pub fn ctx(&self) -> BuilderContext<K> {
52 BuilderContext {
53 signal_queue: self.signal_queue.clone(),
54 notification_queue: self.notification_queue.clone(),
55 }
56 }
57
58 #[must_use]
59 pub fn with_sm<SM: StateMachine<K> + 'static>(mut self, state_machine: SM) -> Self {
60 self.state_machines.push(Box::new(state_machine));
61 self
62 }
63
64 #[must_use]
65 pub fn with_np<NP: NotificationProcessor<K::Message> + 'static>(
66 mut self,
67 processor: NP,
68 ) -> Self {
69 self.push_np(processor);
70 self
71 }
72
73 pub fn push_np<NP: NotificationProcessor<K::Message> + 'static>(&mut self, processor: NP) {
74 self.notification_processors.push(Box::new(processor));
75 }
76
77 #[must_use]
78 pub fn with_ctx_np<NP: NotificationProcessor<K::Message> + 'static>(
79 mut self,
80 op: impl FnOnce(BuilderContext<K>) -> NP,
81 ) -> Self {
82 self.notification_processors.push(Box::new(op(self.ctx())));
83 self
84 }
85
86 pub fn push_ctx_np<NP: NotificationProcessor<K::Message> + 'static>(
87 &mut self,
88 op: impl FnOnce(BuilderContext<K>) -> NP,
89 ) {
90 self.notification_processors.push(Box::new(op(self.ctx())));
91 }
92
93 #[must_use]
94 pub fn with_boxed_np(mut self, processor: Box<dyn NotificationProcessor<K::Message>>) -> Self {
95 self.notification_processors.push(processor);
96 self
97 }
98
99 #[must_use]
100 pub const fn with_timeout_manager(
101 mut self,
102 timeout_topic: <K::Message as RexMessage>::Topic,
103 ) -> Self {
104 self.timeout_topic = Some(timeout_topic);
105 self
106 }
107
108 #[must_use]
109 pub const fn with_tick_rate(mut self, tick_rate: Duration) -> Self {
110 self.tick_rate = Some(tick_rate);
111 self
112 }
113
114 fn build_timeout_manager(&mut self) {
115 if let Some(topic) = self.timeout_topic {
116 let timeout_manager = TimeoutManager::new(self.signal_queue.clone(), topic)
117 .with_tick_rate(self.tick_rate.unwrap_or(timeout::DEFAULT_TICK_RATE));
118 self.notification_processors.push(Box::new(timeout_manager));
119 }
120 }
121
122 fn build_inner(mut self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
123 self.build_timeout_manager();
124
125 if !self.notification_processors.is_empty() {
126 NotificationManager::new(
127 self.notification_processors,
128 join_set,
129 self.notification_queue.clone(),
130 )
131 .init(join_set);
132 }
133 let sm_manager = StateMachineManager::new(
134 self.state_machines,
135 self.signal_queue,
136 self.notification_queue.clone(),
137 );
138
139 sm_manager.init(join_set);
140 sm_manager.ctx_builder()
141 }
142
143 pub fn build(self) -> EmptyContext<K> {
144 let mut join_set = JoinSet::new();
145 let ctx = self.build_inner(&mut join_set);
146 join_set.detach_all();
147
148 ctx
149 }
150
151 pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
152 self.build_inner(join_set)
153 }
154}
155
156impl<K> RexBuilder<K, K::In, K::Out>
157where
158 K: Rex + Timeout + Ingress,
159 K::Message: TimeoutMessage<K> + TryInto<K::Out, Error = Report<ConversionError>>,
160 K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
161 TimeoutManager<K>: NotificationProcessor<K::Message>,
162{
163 #[must_use]
164 pub fn new_connected(outbound_tx: UnboundedSender<K::Out>) -> (UnboundedSender<K::In>, Self) {
165 let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();
166 (
167 inbound_tx.clone(),
168 Self {
169 outbound_tx: Some(outbound_tx),
170 ingress_channel: Some((inbound_tx, inbound_rx)),
171 ..Default::default()
172 },
173 )
174 }
175
176 #[must_use]
177 pub fn ingress_tx(&self) -> UnboundedSender<K::In> {
178 self.ingress_channel
179 .as_ref()
180 .map(|(tx, _)| tx.clone())
181 .expect("ingress_channel uninitialized")
182 }
183
184 #[must_use]
185 pub fn with_ingress_adapter(
186 mut self,
187 state_routers: Vec<BoxedStateRouter<K, K::In>>,
188 ingress_topic: <K::Message as RexMessage>::Topic,
189 ) -> Self {
190 assert!(!state_routers.is_empty());
191 let (tx, rx) = self
192 .ingress_channel
193 .take()
194 .expect("ingress_channel uninitialized");
195 let outbound_tx = self
196 .outbound_tx
197 .clone()
198 .expect("builder outbound_tx uninitialized");
199
200 let ingress_adapter = IngressAdapter {
201 signal_queue: self.signal_queue.clone(),
202 outbound_tx,
203 router: PacketRouter::new(state_routers),
204 inbound_tx: tx,
205 inbound_rx: Some(rx),
206 topic: ingress_topic,
207 };
208 self.with_np(ingress_adapter)
209 }
210}
211
212impl<K, In, Out> Default for RexBuilder<K, In, Out>
213where
214 K: Rex,
215{
216 fn default() -> Self {
217 Self {
218 notification_queue: NotificationQueue::new(),
219 signal_queue: Arc::default(),
220 state_machines: Vec::default(),
221 notification_processors: Vec::default(),
222 timeout_topic: None,
223 tick_rate: None,
224 outbound_tx: None,
225 ingress_channel: None,
226 }
227 }
228}