Skip to main content

cosmic_space/wave/exchange/
asynch.rs

1use crate::loc::{ToPoint, ToSurface};
2use crate::log::{PointLogger, RootLogger, Trackable, Tracker};
3use crate::particle::traversal::Traversal;
4use crate::settings::Timeouts;
5use crate::wave::core::cmd::CmdMethod;
6use crate::wave::core::http2::StatusCode;
7use crate::wave::core::CoreBounce;
8use crate::wave::exchange::{
9    BroadTxRouter, DirectedHandlerShellDef, InCtxDef, ProtoTransmitterBuilderDef,
10    ProtoTransmitterDef, RootInCtxDef, SetStrategy,
11};
12use crate::wave::{
13    BounceBacks, BounceProto, DirectedKind, DirectedProto, DirectedWave, Echo,
14    FromReflectedAggregate, Handling, Pong, RecipientSelector, ReflectedAggregate, ReflectedProto,
15    ReflectedWave, Scope, UltraWave, Wave, WaveId,
16};
17use crate::{Agent, ReflectedCore, SpaceErr, Substance, Surface, ToSubstance};
18use alloc::borrow::Cow;
19use dashmap::{DashMap, DashSet};
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::{mpsc, oneshot};
24use crate::point::Point;
25
26#[async_trait]
27impl Router for TxRouter {
28    async fn route(&self, wave: UltraWave) {
29        self.tx.send(wave).await;
30    }
31}
32
33#[async_trait]
34impl Router for BroadTxRouter {
35    async fn route(&self, wave: UltraWave) {
36        self.tx.send(wave);
37    }
38}
39
40#[async_trait]
41pub trait Router: Send + Sync {
42    async fn route(&self, wave: UltraWave);
43}
44
45#[async_trait]
46pub trait TraversalRouter: Send + Sync {
47    async fn traverse(&self, traversal: Traversal<UltraWave>) -> Result<(), SpaceErr>;
48}
49
50#[derive(Clone)]
51pub struct AsyncRouter {
52    pub router: Arc<dyn Router>,
53}
54
55impl AsyncRouter {
56    pub fn new(router: Arc<dyn Router>) -> Self {
57        Self { router }
58    }
59}
60
61#[async_trait]
62impl Router for AsyncRouter {
63    async fn route(&self, wave: UltraWave) {
64        self.router.route(wave).await
65    }
66}
67
68pub type ProtoTransmitter = ProtoTransmitterDef<AsyncRouter, Exchanger>;
69
70impl ProtoTransmitter {
71    pub fn new(router: Arc<dyn Router>, exchanger: Exchanger) -> ProtoTransmitter {
72        let router = AsyncRouter::new(router);
73        Self {
74            from: SetStrategy::None,
75            to: SetStrategy::None,
76            agent: SetStrategy::Fill(Agent::Anonymous),
77            scope: SetStrategy::Fill(Scope::None),
78            handling: SetStrategy::Fill(Handling::default()),
79            method: SetStrategy::None,
80            via: SetStrategy::None,
81            router,
82            exchanger,
83        }
84    }
85
86    pub async fn direct<D, W>(&self, wave: D) -> Result<W, SpaceErr>
87    where
88        W: FromReflectedAggregate,
89        D: Into<DirectedProto>,
90    {
91        let mut wave: DirectedProto = wave.into();
92
93        self.prep_direct(&mut wave);
94
95        let directed = wave.build()?;
96
97        match directed.bounce_backs() {
98            BounceBacks::None => {
99                self.router.route(directed.to_ultra()).await;
100                FromReflectedAggregate::from_reflected_aggregate(ReflectedAggregate::None)
101            }
102            _ => {
103                let reflected_rx = self.exchanger.exchange(&directed).await;
104                self.router.route(directed.to_ultra()).await;
105                let reflected_agg = reflected_rx.await?;
106                FromReflectedAggregate::from_reflected_aggregate(reflected_agg)
107            }
108        }
109    }
110
111    pub async fn ping<D>(&self, ping: D) -> Result<Wave<Pong>, SpaceErr>
112    where
113        D: Into<DirectedProto>,
114    {
115        let mut ping: DirectedProto = ping.into();
116        if let Some(DirectedKind::Ping) = ping.kind {
117            self.direct(ping).await
118        } else {
119            Err(SpaceErr::server_error("expected DirectedKind::Ping"))
120        }
121    }
122
123    pub async fn ripple<D>(&self, ripple: D) -> Result<Vec<Wave<Echo>>, SpaceErr>
124    where
125        D: Into<DirectedProto>,
126    {
127        let mut ripple: DirectedProto = ripple.into();
128        if let Some(DirectedKind::Ripple) = ripple.kind {
129            self.direct(ripple).await
130        } else {
131            Err(SpaceErr::server_error("expected DirectedKind::Ping"))
132        }
133    }
134
135    pub async fn signal<D>(&self, signal: D) -> Result<(), SpaceErr>
136    where
137        D: Into<DirectedProto>,
138    {
139        let mut signal: DirectedProto = signal.into();
140        if let Some(DirectedKind::Signal) = signal.kind {
141            self.direct(signal).await
142        } else {
143            Err(SpaceErr::server_error("expected DirectedKind::Ping"))
144        }
145    }
146
147    pub async fn bounce_from(&self, to: &Surface, from: &Surface) -> bool {
148        let mut directed = DirectedProto::ping();
149        directed.from(from.clone());
150        directed.to(to.clone());
151        directed.method(CmdMethod::Bounce);
152        match self.direct(directed).await {
153            Ok(pong) => {
154                let pong: Wave<Pong> = pong;
155                pong.is_ok()
156            }
157            Err(_) => false,
158        }
159    }
160
161    pub async fn bounce(&self, to: &Surface) -> bool {
162        let mut direct = DirectedProto::ping();
163        direct.to(to.clone());
164        direct.method(CmdMethod::Bounce);
165        match self.direct(direct).await {
166            Ok(pong) => {
167                let pong: Wave<Pong> = pong;
168                pong.is_ok()
169            }
170            Err(_) => false,
171        }
172    }
173
174    pub async fn route(&self, wave: UltraWave) {
175        self.router.route(wave).await
176    }
177
178    pub async fn reflect<W>(&self, wave: W) -> Result<(), SpaceErr>
179    where
180        W: Into<ReflectedProto>,
181    {
182        let mut wave: ReflectedProto = wave.into();
183
184        self.prep_reflect(&mut wave);
185
186        let wave = wave.build()?;
187        let wave = wave.to_ultra();
188        self.router.route(wave).await;
189
190        Ok(())
191    }
192}
193
194pub type ProtoTransmitterBuilder = ProtoTransmitterBuilderDef<AsyncRouter, Exchanger>;
195
196impl ProtoTransmitterBuilder {
197    pub fn new(router: Arc<dyn Router>, exchanger: Exchanger) -> ProtoTransmitterBuilder {
198        let router = AsyncRouter::new(router);
199        Self {
200            from: SetStrategy::None,
201            to: SetStrategy::None,
202            via: SetStrategy::None,
203            agent: SetStrategy::Fill(Agent::Anonymous),
204            scope: SetStrategy::Fill(Scope::None),
205            handling: SetStrategy::Fill(Handling::default()),
206            method: SetStrategy::None,
207            router,
208            exchanger,
209        }
210    }
211}
212
213pub type TraversalTransmitter = ProtoTransmitterDef<Arc<dyn TraversalRouter>, Exchanger>;
214
215impl TraversalTransmitter {
216    pub fn new(router: Arc<dyn TraversalRouter>, exchanger: Exchanger) -> Self {
217        Self {
218            agent: SetStrategy::None,
219            scope: SetStrategy::None,
220            handling: SetStrategy::None,
221            method: SetStrategy::None,
222            from: SetStrategy::None,
223            to: SetStrategy::None,
224            via: SetStrategy::None,
225            router,
226            exchanger,
227        }
228    }
229
230    pub async fn direct<W>(&self, traversal: Traversal<DirectedWave>) -> Result<W, SpaceErr>
231    where
232        W: FromReflectedAggregate,
233    {
234        match traversal.bounce_backs() {
235            BounceBacks::None => {
236                self.router.traverse(traversal.wrap()).await;
237                FromReflectedAggregate::from_reflected_aggregate(ReflectedAggregate::None)
238            }
239            _ => {
240                let reflected_rx = self.exchanger.exchange(&traversal.payload).await;
241                self.router.traverse(traversal.wrap()).await;
242                let reflected_agg = reflected_rx.await?;
243                FromReflectedAggregate::from_reflected_aggregate(reflected_agg)
244            }
245        }
246    }
247}
248
249pub type RootInCtx = RootInCtxDef<ProtoTransmitter>;
250
251pub type InCtx<'a, I> = InCtxDef<'a, I, ProtoTransmitter>;
252
253impl<'a, I> InCtx<'a, I> {
254    pub fn push_from(self, from: Surface) -> InCtx<'a, I> {
255        let mut transmitter = self.transmitter.clone();
256        transmitter.to_mut().from = SetStrategy::Override(from);
257        InCtx {
258            root: self.root,
259            input: self.input,
260            logger: self.logger.clone(),
261            transmitter,
262        }
263    }
264}
265
266#[async_trait]
267pub trait DirectedHandlerSelector {
268    fn select<'a>(&self, select: &'a RecipientSelector<'a>) -> Result<&dyn DirectedHandler, ()>;
269}
270
271#[async_trait]
272pub trait DirectedHandler: Send + Sync {
273    async fn handle(&self, ctx: RootInCtx) -> CoreBounce;
274
275    async fn bounce(&self, ctx: RootInCtx) -> CoreBounce {
276        CoreBounce::Reflected(ReflectedCore::ok())
277    }
278}
279
280#[derive(Clone)]
281pub struct TxRouter {
282    pub tx: mpsc::Sender<UltraWave>,
283}
284
285impl TxRouter {
286    pub fn new(tx: mpsc::Sender<UltraWave>) -> Self {
287        Self { tx }
288    }
289}
290
291#[derive(Clone)]
292pub struct Exchanger {
293    pub surface: Surface,
294    pub multis: Arc<DashMap<WaveId, mpsc::Sender<ReflectedWave>>>,
295    pub singles: Arc<DashMap<WaveId, oneshot::Sender<ReflectedAggregate>>>,
296    pub timeouts: Timeouts,
297    pub logger: PointLogger,
298    #[cfg(test)]
299    pub claimed: Arc<DashSet<String>>,
300}
301
302impl Exchanger {
303    pub fn new(surface: Surface, timeouts: Timeouts, logger: PointLogger) -> Self {
304        let logger = logger.point(surface.point.clone());
305        Self {
306            surface,
307            singles: Arc::new(DashMap::new()),
308            multis: Arc::new(DashMap::new()),
309            timeouts,
310            logger,
311            #[cfg(test)]
312            claimed: Arc::new(DashSet::new()),
313        }
314    }
315
316    pub fn with_surface(&self, surface: Surface) -> Self {
317        let logger = self.logger.point(surface.point.clone());
318        Self {
319            surface,
320            singles: self.singles.clone(),
321            multis: self.multis.clone(),
322            timeouts: self.timeouts.clone(),
323            logger,
324            #[cfg(test)]
325            claimed: self.claimed.clone(),
326        }
327    }
328
329    pub async fn reflected(&self, reflect: ReflectedWave) -> Result<(), SpaceErr> {
330        self.logger
331            .track(&reflect, || Tracker::new("exchange", "Reflected"));
332
333        if let Some(multi) = self.multis.get(reflect.reflection_of()) {
334            multi.value().send(reflect).await;
335        } else if let Some((_, tx)) = self.singles.remove(reflect.reflection_of()) {
336            #[cfg(test)]
337            self.claimed.insert(reflect.reflection_of().to_string());
338            tx.send(ReflectedAggregate::Single(reflect));
339        } else {
340            let reflect = reflect.to_ultra();
341            let kind = match &reflect {
342                UltraWave::Ping(_) => "Ping",
343                UltraWave::Pong(_) => "Pong",
344                UltraWave::Ripple(_) => "Ripple",
345                UltraWave::Echo(_) => "Echo",
346                UltraWave::Signal(_) => "Signal",
347            };
348            let reflect = reflect.to_reflected()?;
349
350            #[cfg(test)]
351            if self
352                .claimed
353                .contains(reflect.reflection_of().to_string().as_str())
354            {
355                return Err(SpaceErr::server_error(format!(
356                    "Reflection already claimed for {} from: {} to: {} KIND: {} STATUS: {}",
357                    reflect.reflection_of().to_short_string(),
358                    reflect.from().to_string(),
359                    reflect.to().to_string(),
360                    kind,
361                    reflect.core().status.to_string()
362                )));
363            }
364            return Err(SpaceErr::server_error(format!(
365                "Not expecting reflected message for {} from: {} to: {} KIND: {} STATUS: {}",
366                reflect.reflection_of().to_short_string(),
367                reflect.from().to_string(),
368                reflect.to().to_string(),
369                kind,
370                reflect.core().status.to_string()
371            )));
372        }
373        Ok(())
374    }
375
376    pub async fn exchange(&self, directed: &DirectedWave) -> oneshot::Receiver<ReflectedAggregate> {
377        let (tx, rx) = oneshot::channel();
378
379        let mut reflected = match directed.reflected_proto() {
380            BounceProto::Absorbed => {
381                return rx;
382            }
383            BounceProto::Reflected(reflected) => reflected,
384        };
385
386        reflected.from(self.surface.clone());
387
388        let reflection = directed.reflection().unwrap();
389
390        let timeout = self.timeouts.from(directed.handling().wait.clone());
391        self.singles.insert(directed.id().clone(), tx);
392        match directed.bounce_backs() {
393            BounceBacks::None => {
394                panic!("we already dealt with this")
395            }
396            BounceBacks::Single => {
397                let singles = self.singles.clone();
398                tokio::spawn(async move {
399                    tokio::time::sleep(Duration::from_secs(timeout)).await;
400                    let id = reflected.reflection_of.as_ref().unwrap();
401                    if let Some((_, tx)) = singles.remove(id) {
402                        reflected.status = Some(StatusCode::from_u16(408).unwrap());
403                        reflected.body = Some(Substance::Empty);
404                        reflected.intended = Some(reflection.intended);
405                        let reflected = reflected.build().unwrap();
406                        tx.send(ReflectedAggregate::Single(reflected));
407                    }
408                });
409            }
410            BounceBacks::Count(count) => {
411                let (tx, mut rx) = mpsc::channel(count);
412                self.multis.insert(directed.id().clone(), tx);
413                let singles = self.singles.clone();
414                let id = directed.id().clone();
415                tokio::spawn(async move {
416                    let mut agg = vec![];
417                    loop {
418                        if let Some(reflected) = rx.recv().await {
419                            agg.push(reflected);
420                            if count == agg.len() {
421                                if let Some((_, tx)) = singles.remove(&id) {
422                                    tx.send(ReflectedAggregate::Multi(agg));
423                                    break;
424                                }
425                            }
426                        } else {
427                            // this would occur in a timeout scenario
428                            if let Some((_, tx)) = singles.remove(&id) {
429                                reflected.status = Some(StatusCode::from_u16(408).unwrap());
430                                reflected.body = Some(Substance::Empty);
431                                reflected.intended = Some(reflection.intended);
432                                let reflected = reflected.build().unwrap();
433                                tx.send(ReflectedAggregate::Multi(vec![reflected]));
434                                break;
435                            }
436                        }
437                    }
438                });
439
440                let id = directed.id().clone();
441                let multis = self.multis.clone();
442                tokio::spawn(async move {
443                    tokio::time::sleep(Duration::from_secs(timeout)).await;
444                    // all we have to do is remove it, the multi loop will take care of the rest
445                    multis.remove(&id);
446                });
447            }
448            BounceBacks::Timer(wait) => {
449                let (tx, mut rx) = mpsc::channel(32);
450                self.multis.insert(directed.id().clone(), tx);
451                let singles = self.singles.clone();
452                let id = directed.id().clone();
453                tokio::spawn(async move {
454                    let mut agg = vec![];
455                    loop {
456                        if let Some(reflected) = rx.recv().await {
457                            agg.push(reflected);
458                        } else {
459                            // this would occur in a timeout scenario
460                            if let Some((_, tx)) = singles.remove(&id) {
461                                tx.send(ReflectedAggregate::Multi(agg));
462                                break;
463                            }
464                        }
465                    }
466                });
467
468                let id = directed.id().clone();
469                let multis = self.multis.clone();
470                let timeout = self.timeouts.from(wait);
471                tokio::spawn(async move {
472                    tokio::time::sleep(Duration::from_secs(timeout)).await;
473                    // all we have to do is remove it, the multi loop will take care of the rest
474                    multis.remove(&id);
475                });
476            }
477        }
478
479        rx
480    }
481}
482
483impl Default for Exchanger {
484    fn default() -> Self {
485        Self::new(
486            Point::root().to_surface(),
487            Default::default(),
488            RootLogger::default().point(Point::root()),
489        )
490    }
491}
492
493pub type DirectedHandlerShell<D> = DirectedHandlerShellDef<D, ProtoTransmitterBuilder>;
494
495impl<D> DirectedHandlerShell<D>
496where
497    D: DirectedHandler,
498{
499    pub async fn handle(&self, wave: DirectedWave) {
500        let logger = self
501            .logger
502            .point(self.surface.clone().to_point())
503            .spanner(&wave);
504        let mut transmitter = self.builder.clone().build();
505        let reflection = wave.reflection();
506        let ctx = RootInCtx::new(wave, self.surface.clone(), logger, transmitter);
507        match self.handler.handle(ctx).await {
508            CoreBounce::Absorbed => {}
509            CoreBounce::Reflected(reflected) => {
510                let wave = reflection.unwrap().make(reflected, self.surface.clone());
511                let wave = wave.to_ultra();
512                let transmitter = self.builder.clone().build();
513                transmitter.route(wave).await;
514            }
515        }
516    }
517}
518
519impl RootInCtx {
520    pub fn push<'a, I>(&self) -> Result<InCtx<I>, SpaceErr>
521    where
522        Substance: ToSubstance<I>,
523    {
524        let input = match self.wave.to_substance_ref() {
525            Ok(input) => input,
526            Err(err) => return Err(err.into()),
527        };
528        Ok(InCtx {
529            root: self,
530            input,
531            logger: self.logger.clone(),
532            transmitter: Cow::Borrowed(&self.transmitter),
533        })
534    }
535}