1use crate::codec::{Codec, CodecName};
10use crate::erasure::payload::{
11 PayloadErased, PayloadErasedWrapper, PayloadRawErased, PayloadRawErasedWrapper,
12};
13use crate::reactant::{ErrorReactant, Reactant, ReactantError, ReactantRaw};
14use std::any::{Any, TypeId};
15use std::future::Future;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::sync::Arc;
19
20pub trait ReactantErased: Send + Sync + 'static {
22 fn react_erased(
23 &self,
24 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
25 ) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>>;
26 fn payload_type_id(&self) -> TypeId;
27 fn codec_type_id(&self) -> TypeId;
28 fn clone_to_box(&self) -> Box<dyn ReactantErased + Send + Sync + 'static>;
29 fn clone_to_arc(&self) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
30 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static>;
31 fn as_any(&self) -> &dyn Any;
32}
33
34pub trait ReactantRawErased: Send + Sync + 'static {
36 fn react_erased(
37 &self,
38 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
39 ) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>>;
40 fn payload_type_id(&self) -> TypeId;
41 fn codec_type_id(&self) -> TypeId;
42 fn clone_to_box(&self) -> Box<dyn ReactantRawErased + Send + Sync + 'static>;
43 fn clone_to_arc(&self) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
44 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static>;
45 fn as_any(&self) -> &dyn Any;
46}
47
48pub trait ErrorReactantErased: Send + Sync + 'static {
50 fn react_error_erased(
51 &self,
52 error: Arc<ReactantError>,
53 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
54 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
55 fn payload_type_id(&self) -> TypeId;
56 fn codec_type_id(&self) -> TypeId;
57 fn clone_to_box(&self) -> Box<dyn ErrorReactantErased + Send + Sync + 'static>;
58 fn clone_to_arc(&self) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static>;
59 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static>;
60 fn as_any(&self) -> &dyn Any;
61}
62
63pub struct ErrorReactantErasedWrapper<T, C, R> {
67 reactant: Box<R>,
68 _phantom: PhantomData<(T, C)>,
69}
70
71impl<T, C, R> ErrorReactantErasedWrapper<T, C, R>
72where
73 T: Send + Sync + 'static,
74 C: Codec<T> + CodecName + Send + Sync + 'static,
75 R: ErrorReactant<T, C> + Send + Sync + Clone + 'static,
76{
77 pub fn new(reactant: Box<R>) -> Self {
78 Self {
79 reactant,
80 _phantom: PhantomData,
81 }
82 }
83
84 pub fn from_typed_reactant(
85 reactant: Box<R>,
86 ) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static> {
87 Arc::new(Self::new(reactant))
88 }
89}
90
91impl<T, C, R> ErrorReactantErased for ErrorReactantErasedWrapper<T, C, R>
92where
93 T: Send + Sync + 'static,
94 C: Codec<T> + CodecName + Send + Sync + 'static,
95 R: ErrorReactant<T, C> + Send + Sync + Clone + 'static,
96{
97 fn react_error_erased(
98 &self,
99 error: Arc<ReactantError>,
100 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
101 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
102 if payload.payload_type_id() == TypeId::of::<T>()
104 && payload.codec_type_id() == TypeId::of::<C>()
105 {
106 if let Some(wrapper) = payload
107 .as_any()
108 .downcast_ref::<PayloadErasedWrapper<T, C>>()
109 {
110 let typed_payload = wrapper.get_typed_payload();
111 self.reactant.react_error(error, typed_payload)
112 } else {
113 Box::pin(async move {})
114 }
115 } else {
116 Box::pin(async move {})
117 }
118 }
119
120 fn payload_type_id(&self) -> TypeId {
121 TypeId::of::<T>()
122 }
123
124 fn codec_type_id(&self) -> TypeId {
125 TypeId::of::<C>()
126 }
127
128 fn clone_to_box(&self) -> Box<dyn ErrorReactantErased + Send + Sync + 'static> {
129 Box::new(ErrorReactantErasedWrapper::new(Box::new(
130 (*self.reactant).clone(),
131 )))
132 }
133
134 fn clone_to_arc(&self) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static> {
135 Arc::new(ErrorReactantErasedWrapper::new(Box::new(
136 (*self.reactant).clone(),
137 )))
138 }
139
140 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static> {
141 let cloned_reactant = (*self.reactant).clone();
142 let typed_arc: Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static> =
143 Arc::new(cloned_reactant);
144 Arc::new(typed_arc)
145 }
146
147 fn as_any(&self) -> &dyn Any {
148 self
149 }
150}
151
152pub struct ReactantErasedWrapper<T, C, R> {
154 reactant: Box<R>,
155 _phantom: PhantomData<(T, C)>,
156}
157
158impl<T, C, R> ReactantErasedWrapper<T, C, R>
159where
160 T: Send + Sync + 'static,
161 C: Codec<T> + CodecName + Send + Sync + 'static,
162 R: Reactant<T, C> + Send + Sync + Clone + 'static,
163{
164 pub fn new(reactant: Box<R>) -> Self {
165 Self {
166 reactant,
167 _phantom: PhantomData,
168 }
169 }
170
171 pub fn from_typed_reactant(
173 reactant: Box<R>,
174 ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
175 Arc::new(Self::new(reactant))
176 }
177
178 pub fn get_typed_reactant(&self) -> &R {
180 &self.reactant
181 }
182}
183
184impl<T, C, R> ReactantErased for ReactantErasedWrapper<T, C, R>
185where
186 T: Send + Sync + 'static,
187 C: Codec<T> + CodecName + Send + Sync + 'static,
188 R: Reactant<T, C> + Send + Sync + Clone + 'static,
189{
190 fn react_erased(
191 &self,
192 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
193 ) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
194 if payload.payload_type_id() == TypeId::of::<T>()
196 && payload.codec_type_id() == TypeId::of::<C>()
197 {
198 if let Some(wrapper) = payload
200 .as_any()
201 .downcast_ref::<PayloadErasedWrapper<T, C>>()
202 {
203 let typed_payload = wrapper.get_typed_payload();
205 self.reactant.react(typed_payload)
207 } else {
208 Box::pin(async move { Ok(()) })
209 }
210 } else {
211 Box::pin(async move {
212 Ok(())
215 })
216 }
217 }
218
219 fn payload_type_id(&self) -> TypeId {
220 TypeId::of::<T>()
221 }
222
223 fn codec_type_id(&self) -> TypeId {
224 TypeId::of::<C>()
225 }
226
227 fn clone_to_box(&self) -> Box<dyn ReactantErased + Send + Sync + 'static> {
228 Box::new(ReactantErasedWrapper::new(Box::new(
229 (*self.reactant).clone(),
230 )))
231 }
232
233 fn clone_to_arc(&self) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
234 Arc::new(ReactantErasedWrapper::new(Box::new(
235 (*self.reactant).clone(),
236 )))
237 }
238
239 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static> {
240 let cloned_reactant = (*self.reactant).clone();
242 let typed_arc: Arc<dyn Reactant<T, C> + Send + Sync + 'static> = Arc::new(cloned_reactant);
244 Arc::new(typed_arc)
246 }
247
248 fn as_any(&self) -> &dyn Any {
249 self
250 }
251}
252
253pub struct ReactantRawErasedWrapper<T, C, R> {
255 reactant: Box<R>,
256 _phantom: PhantomData<(T, C)>,
257}
258
259impl<T, C, R> ReactantRawErasedWrapper<T, C, R>
260where
261 T: Send + Sync + 'static,
262 C: Codec<T> + CodecName + Send + Sync + 'static,
263 R: ReactantRaw<T, C> + Send + Sync + Clone + 'static,
264{
265 pub fn new(reactant: Box<R>) -> Self {
266 Self {
267 reactant,
268 _phantom: PhantomData,
269 }
270 }
271
272 pub fn from_typed_reactant(
274 reactant: Box<R>,
275 ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
276 Arc::new(Self::new(reactant))
277 }
278
279 pub fn get_typed_reactant(&self) -> &R {
281 &self.reactant
282 }
283}
284
285impl<T, C, R> ReactantRawErased for ReactantRawErasedWrapper<T, C, R>
286where
287 T: Send + Sync + 'static,
288 C: Codec<T> + CodecName + Send + Sync + 'static,
289 R: ReactantRaw<T, C> + Send + Sync + Clone + 'static,
290{
291 fn react_erased(
292 &self,
293 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
294 ) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
295 if payload.payload_type_id() == TypeId::of::<T>()
297 && payload.codec_type_id() == TypeId::of::<C>()
298 {
299 if let Some(wrapper) = payload
301 .as_any()
302 .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
303 {
304 let typed_payload = wrapper.to_typed_payload();
306 self.reactant.react(typed_payload)
308 } else {
309 Box::pin(async move { Ok(()) })
310 }
311 } else {
312 Box::pin(async move {
313 Ok(())
316 })
317 }
318 }
319
320 fn payload_type_id(&self) -> TypeId {
321 TypeId::of::<T>()
322 }
323
324 fn codec_type_id(&self) -> TypeId {
325 TypeId::of::<C>()
326 }
327
328 fn clone_to_box(&self) -> Box<dyn ReactantRawErased + Send + Sync + 'static> {
329 Box::new(ReactantRawErasedWrapper::new(Box::new(
330 (*self.reactant).clone(),
331 )))
332 }
333
334 fn clone_to_arc(&self) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
335 Arc::new(ReactantRawErasedWrapper::new(Box::new(
336 (*self.reactant).clone(),
337 )))
338 }
339
340 fn clone_to_any(&self) -> Arc<dyn Any + Send + Sync + 'static> {
341 let cloned_reactant = (*self.reactant).clone();
343 let typed_arc: Arc<dyn ReactantRaw<T, C> + Send + Sync + 'static> =
345 Arc::new(cloned_reactant);
346 Arc::new(typed_arc)
348 }
349
350 fn as_any(&self) -> &dyn Any {
351 self
352 }
353}
354
355pub fn erase_reactant<T, C, R>(reactant: Box<R>) -> Arc<dyn ReactantErased + Send + Sync + 'static>
357where
358 T: Send + Sync + 'static,
359 C: Codec<T> + CodecName + Send + Sync + 'static,
360 R: Reactant<T, C> + Send + Sync + Clone + 'static,
361{
362 ReactantErasedWrapper::from_typed_reactant(reactant)
363}
364
365pub fn erase_reactant_raw<T, C, R>(
367 reactant: Box<R>,
368) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>
369where
370 T: Send + Sync + 'static,
371 C: Codec<T> + CodecName + Send + Sync + 'static,
372 R: ReactantRaw<T, C> + Send + Sync + Clone + 'static,
373{
374 ReactantRawErasedWrapper::from_typed_reactant(reactant)
375}
376
377pub fn unerase_reactant<T, C, R>(wrapper: &ReactantErasedWrapper<T, C, R>) -> &R
379where
380 T: Send + Sync + 'static,
381 C: Codec<T> + CodecName + Send + Sync + 'static,
382 R: Reactant<T, C> + Send + Sync + Clone + 'static,
383{
384 wrapper.get_typed_reactant()
385}
386
387pub fn unerase_reactant_raw<T, C, R>(wrapper: &ReactantRawErasedWrapper<T, C, R>) -> &R
389where
390 T: Send + Sync + 'static,
391 C: Codec<T> + CodecName + Send + Sync + 'static,
392 R: ReactantRaw<T, C> + Send + Sync + Clone + 'static,
393{
394 wrapper.get_typed_reactant()
395}
396
397pub fn erase_error_reactant<T, C, R>(
398 reactant: Box<R>,
399) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static>
400where
401 T: Send + Sync + 'static,
402 C: Codec<T> + CodecName + Send + Sync + 'static,
403 R: ErrorReactant<T, C> + Send + Sync + Clone + 'static,
404{
405 ErrorReactantErasedWrapper::from_typed_reactant(reactant)
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, TokioMpscReactantRaw};
412 use tokio::sync::mpsc;
413
414 #[test]
415 fn test_reactant_erased_cloning() {
416 let (sender, _receiver) = mpsc::channel(10);
418 let reactant = TokioMpscReactant { sender };
419
420 let erased = erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(reactant));
422
423 let _cloned_box = erased.clone_to_box();
425
426 let _cloned_arc = erased.clone_to_arc();
428
429 }
431
432 #[test]
433 fn test_reactant_raw_erased_cloning() {
434 let (sender, _receiver) = mpsc::channel(10);
436 let raw_reactant = TokioMpscReactantRaw { sender };
437
438 let erased_raw = erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(raw_reactant));
440
441 let _cloned_raw_box = erased_raw.clone_to_box();
443
444 let _cloned_raw_arc = erased_raw.clone_to_arc();
446
447 }
449}