Skip to main content

cosmic_space/wave/
exchange.rs

1pub mod asynch;
2pub mod synch;
3
4use alloc::borrow::Cow;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::sync::Arc;
8use std::time::Duration;
9
10use asynch::{
11    DirectedHandler, Exchanger, InCtx, ProtoTransmitter, ProtoTransmitterBuilder, RootInCtx, Router,
12};
13use dashmap::DashMap;
14use tokio::sync::{broadcast, mpsc, oneshot};
15
16use crate::config::bind::RouteSelector;
17use crate::loc::{Topic, ToPoint, ToSurface};
18use crate::log::{PointLogger, RootLogger, SpanLogger};
19use crate::settings::Timeouts;
20use crate::wave::core::cmd::CmdMethod;
21use crate::wave::core::http2::StatusCode;
22use crate::wave::core::{CoreBounce, Method};
23use crate::wave::exchange::asynch::AsyncRouter;
24use crate::wave::{
25    Bounce, BounceBacks, BounceProto, DirectedProto, DirectedWave, Echo, FromReflectedAggregate,
26    Handling, Pong, Recipients, RecipientSelector, ReflectedAggregate, ReflectedProto,
27    ReflectedWave, Scope, Session, ToRecipients, UltraWave, Wave, WaveId,
28};
29use crate::{Agent, ReflectedCore, SpaceErr, Substance, Surface, ToSubstance, wave};
30use crate::point::Point;
31
32#[derive(Clone)]
33pub struct DirectedHandlerShellDef<D, T> {
34    logger: PointLogger,
35    handler: D,
36    surface: Surface,
37    builder: T,
38}
39
40impl<D, T> DirectedHandlerShellDef<D, T>
41where
42    D: Sized,
43{
44    pub fn new(handler: D, builder: T, surface: Surface, logger: RootLogger) -> Self {
45        let logger = logger.point(surface.point.clone());
46        Self {
47            handler,
48            builder,
49            surface,
50            logger,
51        }
52    }
53}
54
55pub struct InternalPipeline<H> {
56    pub selector: RouteSelector,
57    pub handler: H,
58}
59
60impl<H> InternalPipeline<H> {
61    pub fn new(selector: RouteSelector, mut handler: H) -> Self {
62        Self { selector, handler }
63    }
64}
65
66pub struct RootInCtxDef<T> {
67    pub to: Surface,
68    pub wave: DirectedWave,
69    pub session: Option<Session>,
70    pub logger: SpanLogger,
71    pub transmitter: T,
72}
73
74impl<T> RootInCtxDef<T> {
75    pub fn new(wave: DirectedWave, to: Surface, logger: SpanLogger, transmitter: T) -> Self {
76        Self {
77            wave,
78            to,
79            logger,
80            session: None,
81            transmitter,
82        }
83    }
84
85    pub fn status(self, status: u16, from: Surface) -> Bounce<ReflectedWave> {
86        match self.wave {
87            DirectedWave::Ping(ping) => Bounce::Reflected(ReflectedWave::Pong(Wave::new(
88                Pong::new(
89                    ReflectedCore::status(status),
90                    ping.from.clone(),
91                    self.to.clone().to_recipients(),
92                    ping.id.clone(),
93                ),
94                from,
95            ))),
96            DirectedWave::Ripple(ripple) => Bounce::Reflected(ReflectedWave::Echo(Wave::new(
97                Echo::new(
98                    ReflectedCore::status(status),
99                    ripple.from.clone(),
100                    ripple.to.clone(),
101                    ripple.id.clone(),
102                ),
103                from,
104            ))),
105            DirectedWave::Signal(_) => Bounce::Absorbed,
106        }
107    }
108
109    pub fn err(self, status: u16, from: Surface, msg: String) -> Bounce<ReflectedWave> {
110        match self.wave {
111            DirectedWave::Ping(ping) => Bounce::Reflected(ReflectedWave::Pong(Wave::new(
112                Pong::new(
113                    ReflectedCore::fail(status, msg),
114                    ping.from.clone(),
115                    self.to.clone().to_recipients(),
116                    ping.id.clone(),
117                ),
118                from,
119            ))),
120            DirectedWave::Ripple(ripple) => Bounce::Reflected(ReflectedWave::Echo(Wave::new(
121                Echo::new(
122                    ReflectedCore::fail(status, msg),
123                    ripple.from.clone(),
124                    ripple.to.clone(),
125                    ripple.id.clone(),
126                ),
127                from,
128            ))),
129            DirectedWave::Signal(_) => Bounce::Absorbed,
130        }
131    }
132
133    pub fn not_found(self) -> Bounce<ReflectedWave> {
134        let to = self.to.clone();
135        let msg = format!(
136            "<{}>{}",
137            self.wave.core().method.to_string(),
138            self.wave.core().uri.path().to_string()
139        );
140        self.err(404, to, msg)
141    }
142
143    pub fn timeout(self) -> Bounce<ReflectedWave> {
144        let to = self.to.clone();
145        self.status(408, to)
146    }
147
148    pub fn bad_request(self) -> Bounce<ReflectedWave> {
149        let to = self.to.clone();
150        let msg = format!(
151            "<{}>{} -[ {} ]->",
152            self.wave.core().method.to_string(),
153            self.wave.core().uri.path().to_string(),
154            self.wave.core().body.kind().to_string()
155        );
156        self.err(400, to, msg)
157    }
158
159    pub fn server_error(self) -> Bounce<ReflectedWave> {
160        let to = self.to.clone();
161        self.status(500, to)
162    }
163
164    pub fn forbidden(self) -> Bounce<ReflectedWave> {
165        let to = self.to.clone();
166        let msg = format!(
167            "<{}>{} -[ {} ]->",
168            self.wave.core().method.to_string(),
169            self.wave.core().uri.path().to_string(),
170            self.wave.core().body.kind().to_string()
171        );
172        self.err(401, to, msg)
173    }
174
175    pub fn unavailable(self) -> Bounce<ReflectedWave> {
176        let to = self.to.clone();
177        self.status(503, to)
178    }
179
180    pub fn unauthorized(self) -> Bounce<ReflectedWave> {
181        let to = self.to.clone();
182        self.status(403, to)
183    }
184}
185
186pub struct InCtxDef<'a, I, T>
187where
188    T: Clone,
189{
190    root: &'a RootInCtxDef<T>,
191    pub transmitter: Cow<'a, T>,
192    pub input: &'a I,
193    pub logger: SpanLogger,
194}
195
196impl<'a, I, T> Deref for InCtxDef<'a, I, T>
197where
198    T: Clone,
199{
200    type Target = I;
201
202    fn deref(&self) -> &Self::Target {
203        self.input
204    }
205}
206
207impl<'a, I, T> InCtxDef<'a, I, T>
208where
209    T: Clone,
210{
211    pub fn new(
212        root: &'a RootInCtxDef<T>,
213        input: &'a I,
214        tx: Cow<'a, T>,
215        logger: SpanLogger,
216    ) -> Self {
217        Self {
218            root,
219            input,
220            logger,
221            transmitter: tx,
222        }
223    }
224
225    pub fn from(&self) -> &Surface {
226        self.root.wave.from()
227    }
228
229    pub fn to(&self) -> &Surface {
230        &self.root.to
231    }
232
233    pub fn push(self) -> InCtxDef<'a, I, T> {
234        InCtxDef {
235            root: self.root,
236            input: self.input,
237            logger: self.logger.span(),
238            transmitter: self.transmitter.clone(),
239        }
240    }
241
242    pub fn push_input_ref<I2>(self, input: &'a I2) -> InCtxDef<'a, I2, T> {
243        InCtxDef {
244            root: self.root,
245            input,
246            logger: self.logger.clone(),
247            transmitter: self.transmitter.clone(),
248        }
249    }
250
251    pub fn wave(&self) -> &DirectedWave {
252        &self.root.wave
253    }
254
255    /*
256    pub async fn ping(&self, req: DirectedProto) -> Result<Wave<Pong>, UniErr> {
257        self.transmitter.direct(req).await
258    }
259
260     */
261
262    pub fn ok_body(self, substance: Substance) -> ReflectedCore {
263        self.root.wave.core().ok_body(substance)
264    }
265
266    pub fn not_found(self) -> ReflectedCore {
267        self.root.wave.core().not_found()
268    }
269
270    pub fn forbidden(self) -> ReflectedCore {
271        self.root.wave.core().forbidden()
272    }
273
274    pub fn bad_request(self) -> ReflectedCore {
275        self.root.wave.core().bad_request()
276    }
277
278    pub fn err(self, err: SpaceErr) -> ReflectedCore {
279        self.root.wave.core().err(err)
280    }
281}
282
283#[derive(Clone)]
284pub struct BroadTxRouter {
285    pub tx: broadcast::Sender<UltraWave>,
286}
287
288impl BroadTxRouter {
289    pub fn new(tx: broadcast::Sender<UltraWave>) -> Self {
290        Self { tx }
291    }
292}
293
294#[derive(Clone)]
295pub struct ProtoTransmitterBuilderDef<R, E> {
296    pub agent: SetStrategy<Agent>,
297    pub scope: SetStrategy<Scope>,
298    pub handling: SetStrategy<Handling>,
299    pub method: SetStrategy<Method>,
300    pub via: SetStrategy<Surface>,
301    pub from: SetStrategy<Surface>,
302    pub to: SetStrategy<Recipients>,
303    pub router: R,
304    pub exchanger: E,
305}
306
307impl<R, E> ProtoTransmitterBuilderDef<R, E> {
308    pub fn build(self) -> ProtoTransmitterDef<R, E> {
309        ProtoTransmitterDef {
310            agent: self.agent,
311            scope: self.scope,
312            handling: self.handling,
313            method: self.method,
314            from: self.from,
315            to: self.to,
316            via: self.via,
317            router: self.router,
318            exchanger: self.exchanger,
319        }
320    }
321}
322
323#[derive(Clone)]
324pub struct ProtoTransmitterDef<R, E> {
325    agent: SetStrategy<Agent>,
326    scope: SetStrategy<Scope>,
327    handling: SetStrategy<Handling>,
328    method: SetStrategy<Method>,
329    from: SetStrategy<Surface>,
330    to: SetStrategy<Recipients>,
331    via: SetStrategy<Surface>,
332    router: R,
333    exchanger: E,
334}
335
336impl<R, E> ProtoTransmitterDef<R, E> {
337    pub fn from_topic(&mut self, topic: Topic) -> Result<(), SpaceErr> {
338        self.from = match self.from.clone() {
339            SetStrategy::None => {
340                return Err(SpaceErr::server_error(
341                    "cannot set Topic without first setting Surface",
342                ));
343            }
344            SetStrategy::Fill(from) => SetStrategy::Fill(from.with_topic(topic)),
345            SetStrategy::Override(from) => SetStrategy::Override(from.with_topic(topic)),
346        };
347        Ok(())
348    }
349
350    fn prep_direct(&self, wave: &mut DirectedProto) {
351        match &self.from {
352            SetStrategy::None => {}
353            SetStrategy::Fill(from) => wave.fill_from(from.clone()),
354            SetStrategy::Override(from) => wave.from(from.clone()),
355        }
356
357        match &self.to {
358            SetStrategy::None => {}
359            SetStrategy::Fill(to) => wave.fill_to(to.clone()),
360            SetStrategy::Override(to) => wave.to(to),
361        }
362
363        match &self.via {
364            SetStrategy::None => {}
365            SetStrategy::Fill(via) => wave.fill_via(via.clone()),
366            SetStrategy::Override(via) => wave.via(via),
367        }
368
369        match &self.agent {
370            SetStrategy::None => {}
371            SetStrategy::Fill(agent) => wave.fill_agent(agent),
372            SetStrategy::Override(agent) => wave.agent(agent.clone()),
373        }
374
375        match &self.scope {
376            SetStrategy::None => {}
377            SetStrategy::Fill(scope) => wave.fill_scope(scope),
378            SetStrategy::Override(scope) => wave.scope(scope.clone()),
379        }
380
381        match &self.handling {
382            SetStrategy::None => {}
383            SetStrategy::Fill(handling) => wave.fill_handling(handling),
384            SetStrategy::Override(handling) => wave.handling(handling.clone()),
385        }
386
387        match &self.method {
388            SetStrategy::None => {}
389            SetStrategy::Fill(method) => wave.fill_method(method),
390            SetStrategy::Override(handling) => wave.method(handling.clone()),
391        }
392    }
393
394    fn prep_reflect(&self, wave: &mut ReflectedProto) {
395        match &self.from {
396            SetStrategy::None => {}
397            SetStrategy::Fill(from) => wave.fill_from(from),
398            SetStrategy::Override(from) => wave.from(from.clone()),
399        }
400
401        match &self.agent {
402            SetStrategy::None => {}
403            SetStrategy::Fill(agent) => wave.fill_agent(agent),
404            SetStrategy::Override(agent) => wave.agent(agent.clone()),
405        }
406
407        match &self.scope {
408            SetStrategy::None => {}
409            SetStrategy::Fill(scope) => wave.fill_scope(scope),
410            SetStrategy::Override(scope) => wave.scope(scope.clone()),
411        }
412
413        match &self.handling {
414            SetStrategy::None => {}
415            SetStrategy::Fill(handling) => wave.fill_handling(handling),
416            SetStrategy::Override(handling) => wave.handling(handling.clone()),
417        }
418    }
419}
420
421#[derive(Clone, strum_macros::Display)]
422pub enum SetStrategy<T> {
423    /// The ProtoTransmitter will NOT set a value
424    None,
425    /// The ProtoTransmitter will set the DirectedProto value unless
426    /// the value was already explicitly set
427    Fill(T),
428    /// The ProtoTransmitter will over write the DirectedProto value
429    /// even if it has already been explicitly set
430    Override(T),
431}
432
433impl<T> SetStrategy<T> {
434    pub fn unwrap(self) -> Result<T, SpaceErr> {
435        match self {
436            SetStrategy::None => Err("cannot unwrap a SetStrategy::None".into()),
437            SetStrategy::Fill(t) => Ok(t),
438            SetStrategy::Override(t) => Ok(t),
439        }
440    }
441}
442
443impl SetStrategy<Surface> {
444    pub fn with_topic(self, topic: Topic) -> Result<Self, SpaceErr> {
445        match self {
446            SetStrategy::None => Err("cannot set topic if Strategy is None".into()),
447            SetStrategy::Fill(surface) => Ok(SetStrategy::Fill(surface.with_topic(topic))),
448            SetStrategy::Override(surface) => Ok(SetStrategy::Override(surface.with_topic(topic))),
449        }
450    }
451}