cosmic_hyperlane/
lib.rs

1#![allow(warnings)]
2
3#[macro_use]
4extern crate async_trait;
5#[macro_use]
6extern crate lazy_static;
7
8use std::cell::{Cell, RefCell};
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::ops::{Deref, DerefMut};
13use std::str::FromStr;
14use std::sync::atomic::{AtomicU16, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17
18use dashmap::DashMap;
19use futures::future::select_all;
20use futures::FutureExt;
21use tokio::io::AsyncWriteExt;
22use tokio::select;
23use tokio::sync::mpsc::error::{SendError, SendTimeoutError, TrySendError};
24use tokio::sync::mpsc::Receiver;
25use tokio::sync::oneshot::Sender;
26use tokio::sync::{broadcast, mpsc, Mutex, oneshot, RwLock, watch};
27
28use cosmic_space::command::direct::create::{PointFactoryU64, PointSegTemplate};
29use cosmic_space::err::SpaceErr;
30use cosmic_space::frame::PrimitiveFrame;
31use cosmic_space::hyper::{Greet, HyperSubstance, InterchangeKind, Knock};
32use cosmic_space::loc::{Layer, PointFactory, Surface, ToPoint, ToSurface, Version};
33use cosmic_space::log::{PointLogger, RootLogger, Tracker};
34use cosmic_space::particle::Status;
35use cosmic_space::point::Point;
36use cosmic_space::settings::Timeouts;
37use cosmic_space::substance::{FormErrs, Substance, SubstanceKind, Token};
38use cosmic_space::util::uuid;
39use cosmic_space::wave::core::ext::ExtMethod;
40use cosmic_space::wave::core::hyp::HypMethod;
41use cosmic_space::wave::core::Method;
42use cosmic_space::wave::exchange::asynch::{
43    Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
44};
45use cosmic_space::wave::exchange::SetStrategy;
46use cosmic_space::wave::{
47    Agent, DirectedKind, DirectedProto, Handling, HyperWave, Ping, Pong, Reflectable,
48    ReflectedKind, ReflectedProto, ReflectedWave, UltraWave, Wave, WaveId, WaveKind,
49};
50use cosmic_space::VERSION;
51
52lazy_static! {
53    pub static ref LOCAL_CLIENT: Point = Point::from_str("LOCAL::client").expect("point");
54    pub static ref LOCAL_CLIENT_RUNNER: Point =
55        Point::from_str("LOCAL::client:runner").expect("point");
56    pub static ref HYPERLANE_INDEX: AtomicU16 = AtomicU16::new(0);
57}
58
59pub enum HyperwayKind {
60    Mount,
61    Ephemeral,
62}
63
64pub struct Hyperway {
65    pub remote: Surface,
66    outbound: Hyperlane,
67    inbound: Hyperlane,
68    logger: PointLogger,
69
70    #[cfg(test)]
71    pub diagnostic: HyperwayDiagnostic,
72}
73
74impl Hyperway {
75    pub fn new(remote: Surface, agent: Agent, logger: PointLogger) -> Self {
76        let logger = logger.point(remote.point.clone());
77        let mut inbound = Hyperlane::new(format!("{}<Inbound>", remote.to_string()));
78        inbound
79            .tx
80            .try_send(HyperlaneCall::Transform(Box::new(FromTransform::new(
81                remote.clone(),
82            ))));
83        inbound
84            .tx
85            .try_send(HyperlaneCall::Transform(Box::new(AgentTransform::new(
86                agent,
87            ))));
88        Self {
89            outbound: Hyperlane::new(format!("{}<Outbound>", remote.to_string())),
90            remote,
91            inbound,
92            logger,
93            #[cfg(test)]
94            diagnostic: HyperwayDiagnostic::new(),
95        }
96    }
97
98    pub fn transform_inbound(&self, transform: Box<dyn HyperTransform>) {
99        self.inbound
100            .tx
101            .try_send(HyperlaneCall::Transform(transform));
102    }
103
104    pub fn transform_to(&self, to: Surface) {
105        self.inbound
106            .tx
107            .try_send(HyperlaneCall::Transform(Box::new(ToTransform::new(to))));
108    }
109
110    pub async fn hyperway_endpoint_near(&self, init_wave: Option<UltraWave>) -> HyperwayEndpoint {
111        let drop_tx = None;
112
113        HyperwayEndpoint {
114            tx: self.outbound.tx(),
115            rx: self.inbound.rx(init_wave).await,
116            drop_tx,
117            logger: self.logger.clone(),
118        }
119    }
120
121    pub async fn hyperway_endpoint_near_with_drop_event(
122        &self,
123        drop_tx: oneshot::Sender<()>,
124        init_wave: Option<UltraWave>,
125    ) -> HyperwayEndpoint {
126        let drop_tx = Some(drop_tx);
127
128        HyperwayEndpoint {
129            tx: self.outbound.tx(),
130            rx: self.inbound.rx(init_wave).await,
131            drop_tx,
132            logger: self.logger.clone(),
133        }
134    }
135
136    pub async fn hyperway_endpoint_far(&self, init_wave: Option<UltraWave>) -> HyperwayEndpoint {
137        HyperwayEndpoint {
138            tx: self.inbound.tx(),
139            rx: self.outbound.rx(init_wave).await,
140            drop_tx: None,
141            logger: self.logger.clone(),
142        }
143    }
144
145    pub async fn hyperway_endpoint_far_drop_event(
146        &self,
147        init_wave: Option<UltraWave>,
148        drop_tx: oneshot::Sender<()>,
149    ) -> HyperwayEndpoint {
150        HyperwayEndpoint {
151            tx: self.inbound.tx(),
152            rx: self.outbound.rx(init_wave).await,
153            drop_tx: Some(drop_tx),
154            logger: self.logger.clone(),
155        }
156    }
157}
158
159#[cfg(test)]
160pub struct HyperwayDiagnostic {
161    pub replaced_ext: broadcast::Sender<Result<(), SpaceErr>>,
162}
163
164#[cfg(test)]
165impl HyperwayDiagnostic {
166    pub fn new() -> Self {
167        let (replaced_ext, _) = broadcast::channel(128);
168        Self { replaced_ext }
169    }
170}
171
172pub struct HyperwayEndpoint {
173    drop_tx: Option<oneshot::Sender<()>>,
174    pub tx: mpsc::Sender<UltraWave>,
175    pub rx: mpsc::Receiver<UltraWave>,
176    pub logger: PointLogger,
177}
178
179impl HyperwayEndpoint {
180    pub fn new(
181        tx: mpsc::Sender<UltraWave>,
182        rx: mpsc::Receiver<UltraWave>,
183        logger: PointLogger,
184    ) -> Self {
185        let drop_tx = None;
186        Self {
187            tx,
188            rx,
189            drop_tx,
190            logger,
191        }
192    }
193
194    pub fn new_with_drop(
195        tx: mpsc::Sender<UltraWave>,
196        rx: mpsc::Receiver<UltraWave>,
197        drop_tx: oneshot::Sender<()>,
198        logger: PointLogger,
199    ) -> Self {
200        let drop_tx = Some(drop_tx);
201        Self {
202            tx,
203            rx,
204            drop_tx,
205            logger,
206        }
207    }
208
209    pub fn connect(mut self, mut endpoint: HyperwayEndpoint) {
210        tokio::spawn(async move {
211            let logger = endpoint.logger.clone();
212            let end_tx = endpoint.tx.clone();
213            {
214                let my_tx = self.tx.clone();
215                tokio::spawn(async move {
216                    let logger = endpoint.logger.push_mark("mux:tx").unwrap();
217                    while let Some(wave) = endpoint.rx.recv().await {
218                        logger.track(&wave, || Tracker::new("hyperway-endpoint", "Rx"));
219                        match logger.result(my_tx.send(wave).await) {
220                            Ok(_) => {}
221                            Err(_) => break,
222                        }
223                    }
224                });
225            }
226
227            let logger = logger.push_mark("mux:rx").unwrap();
228            while let Some(wave) = self.rx.recv().await {
229                logger.track(&wave, || Tracker::new("hyperway-endpoint", "Tx"));
230                match logger.result(end_tx.send(wave).await) {
231                    Ok(_) => {}
232                    Err(_) => break,
233                }
234            }
235        });
236    }
237
238    pub fn add_drop_tx(&mut self, drop_tx: oneshot::Sender<()>) {
239        self.drop_tx.replace(drop_tx);
240    }
241
242    pub fn router(&self) -> TxRouter {
243        TxRouter::new(self.tx.clone())
244    }
245}
246
247impl Drop for HyperwayEndpoint {
248    fn drop(&mut self) {
249        match self.drop_tx.take() {
250            None => {}
251            Some(drop_tx) => {
252                drop_tx.send(());
253            }
254        }
255    }
256}
257
258#[derive(Clone)]
259pub struct HyperwayStub {
260    pub agent: Agent,
261    pub remote: Surface,
262}
263
264impl From<Greet> for HyperwayStub {
265    fn from(greet: Greet) -> Self {
266        Self {
267            agent: greet.agent,
268            remote: greet.surface,
269        }
270    }
271}
272
273impl HyperwayStub {
274    pub fn from_port(remote: Surface) -> Self {
275        Self {
276            agent: remote.to_agent(),
277            remote,
278        }
279    }
280
281    pub fn new(remote: Surface, agent: Agent) -> Self {
282        Self { agent, remote }
283    }
284}
285
286pub enum HyperwayInterchangeCall {
287    Wave(UltraWave),
288    Internal(Hyperway),
289    Remove(Surface),
290    Mount {
291        stub: HyperwayStub,
292        init_wave: Option<UltraWave>,
293        rtn: oneshot::Sender<Result<HyperwayEndpoint, SpaceErr>>,
294    },
295}
296
297pub enum HyperlaneCall {
298    Drain,
299    Ext(mpsc::Sender<UltraWave>),
300    ResetExt,
301    Wave(UltraWave),
302    Transform(Box<dyn HyperTransform>),
303}
304
305pub trait HyperTransform: Send + Sync {
306    fn filter(&self, wave: UltraWave) -> UltraWave;
307}
308
309#[derive(Clone)]
310pub struct AgentTransform {
311    agent: Agent,
312}
313
314impl AgentTransform {
315    pub fn new(agent: Agent) -> Self {
316        Self { agent }
317    }
318}
319
320impl HyperTransform for AgentTransform {
321    fn filter(&self, mut wave: UltraWave) -> UltraWave {
322        wave.set_agent(self.agent.clone());
323        wave
324    }
325}
326
327#[derive(Clone)]
328pub struct LayerTransform {
329    layer: Layer,
330}
331
332impl LayerTransform {
333    pub fn new(layer: Layer) -> Self {
334        Self { layer }
335    }
336}
337
338impl HyperTransform for LayerTransform {
339    fn filter(&self, mut wave: UltraWave) -> UltraWave {
340        let to = wave
341            .to()
342            .clone()
343            .to_single()
344            .unwrap()
345            .with_layer(self.layer.clone());
346        wave.set_to(to);
347        wave
348    }
349}
350
351#[derive(Clone)]
352pub struct TransportTransform {
353    transport_to: Surface,
354}
355
356impl TransportTransform {
357    pub fn new(transport_to: Surface) -> Self {
358        Self { transport_to }
359    }
360}
361
362impl HyperTransform for TransportTransform {
363    fn filter(&self, wave: UltraWave) -> UltraWave {
364        let from = wave.from().clone();
365        let transport = wave.wrap_in_transport(from, self.transport_to.clone());
366        let wave = transport.build().unwrap();
367        wave.to_ultra()
368    }
369}
370
371#[derive(Clone)]
372pub struct HopTransform {
373    hop_to: Surface,
374}
375
376impl HopTransform {
377    pub fn new(hop_to: Surface) -> Self {
378        Self { hop_to }
379    }
380}
381
382impl HyperTransform for HopTransform {
383    fn filter(&self, wave: UltraWave) -> UltraWave {
384        let signal = wave.to_signal().unwrap();
385        let from = signal.from.clone();
386        let wave = signal.wrap_in_hop(from, self.hop_to.clone());
387        let wave = wave.build().unwrap();
388        wave.to_ultra()
389    }
390}
391
392pub struct ToTransform {
393    to: Surface,
394}
395
396impl ToTransform {
397    pub fn new(to: Surface) -> Self {
398        Self { to }
399    }
400}
401
402impl HyperTransform for ToTransform {
403    fn filter(&self, mut wave: UltraWave) -> UltraWave {
404        wave.set_to(self.to.clone());
405        wave
406    }
407}
408
409pub struct FromTransform {
410    from: Surface,
411}
412
413impl FromTransform {
414    pub fn new(from: Surface) -> Self {
415        Self { from }
416    }
417}
418
419impl HyperTransform for FromTransform {
420    fn filter(&self, mut wave: UltraWave) -> UltraWave {
421        wave.set_from(self.from.clone());
422        wave
423    }
424}
425#[derive(Clone)]
426pub struct Hyperlane {
427    tx: mpsc::Sender<HyperlaneCall>,
428    #[cfg(test)]
429    eavesdrop_tx: broadcast::Sender<UltraWave>,
430    label: String,
431}
432
433impl Hyperlane {
434    pub fn new<S: ToString>(label: S) -> Self {
435        #[cfg(test)]
436        let (eavesdrop_tx, _) = broadcast::channel(16);
437
438        let label = format!(
439            "{}::{}",
440            label.to_string(),
441            HYPERLANE_INDEX.fetch_add(1, Ordering::Relaxed)
442        );
443
444        let (tx, mut rx) = mpsc::channel(1024);
445        {
446            let label = label.clone();
447            let tx = tx.clone();
448            #[cfg(test)]
449            let eavesdrop_tx = eavesdrop_tx.clone();
450
451            tokio::spawn(async move {
452                let mut ext = None;
453                let mut queue = vec![];
454                let mut transforms = vec![];
455                while let Some(call) = rx.recv().await {
456                    match call {
457                        HyperlaneCall::Ext(ext_tx) => {
458                            ext.replace(ext_tx);
459                        }
460                        HyperlaneCall::Transform(filter) => {
461                            transforms.push(filter);
462                        }
463                        HyperlaneCall::Wave(mut wave) => {
464                            while queue.len() > 1024 {
465                                // start dropping the oldest messages
466                                queue.remove(0);
467                            }
468                            for transform in transforms.iter() {
469                                wave = transform.filter(wave);
470                            }
471                            queue.push(wave);
472                        }
473                        HyperlaneCall::Drain => {
474                            // just drains the queue later if there is a listener
475                        }
476                        HyperlaneCall::ResetExt => {
477                            ext = None;
478                        }
479                    }
480                    if !queue.is_empty() {
481                        if let Some(ext_tx) = ext.as_mut() {
482                            for wave in queue.drain(..) {
483                                #[cfg(test)]
484                                let wave_cp = wave.clone();
485
486                                match ext_tx.send(wave).await {
487                                    Ok(_) => {
488                                        #[cfg(test)]
489                                        eavesdrop_tx.send(wave_cp);
490                                    }
491                                    Err(err) => {
492                                        tx.send(HyperlaneCall::ResetExt).await;
493                                        tx.try_send(HyperlaneCall::Wave(err.0));
494                                    }
495                                }
496                            }
497                        } else {
498                        }
499                    }
500                }
501            });
502        }
503
504        Self {
505            tx,
506            label,
507            #[cfg(test)]
508            eavesdrop_tx,
509        }
510    }
511
512    #[cfg(test)]
513    pub fn eavesdrop(&self) -> broadcast::Receiver<UltraWave> {
514        self.eavesdrop_tx.subscribe()
515    }
516
517    pub async fn send(&self, wave: UltraWave) -> Result<(), SpaceErr> {
518        Ok(self
519            .tx
520            .send_timeout(HyperlaneCall::Wave(wave), Duration::from_secs(5))
521            .await?)
522    }
523
524    pub fn tx(&self) -> mpsc::Sender<UltraWave> {
525        let (tx, mut rx) = mpsc::channel(1024);
526        let call_tx = self.tx.clone();
527        tokio::spawn(async move {
528            while let Some(wave) = rx.recv().await {
529                call_tx.send(HyperlaneCall::Wave(wave)).await;
530            }
531        });
532        tx
533    }
534
535    pub async fn rx(&self, init_wave: Option<UltraWave>) -> mpsc::Receiver<UltraWave> {
536        let (tx, rx) = mpsc::channel(1024);
537        if let Some(init_wave) = init_wave {
538            tx.send(init_wave).await;
539        }
540        self.tx.send(HyperlaneCall::Ext(tx)).await;
541        rx
542    }
543}
544
545pub struct HyperwayInterchange {
546    call_tx: mpsc::Sender<HyperwayInterchangeCall>,
547    logger: PointLogger,
548    singular_to: Option<Surface>,
549}
550
551impl HyperwayInterchange {
552    pub fn new(logger: PointLogger) -> Self {
553        let (call_tx, mut call_rx) = mpsc::channel(1024);
554
555        {
556            let call_tx = call_tx.clone();
557            let logger = logger.clone();
558            tokio::spawn(async move {
559                let mut hyperways = HashMap::new();
560                while let Some(call) = call_rx.recv().await {
561                    match call {
562                        HyperwayInterchangeCall::Internal(hyperway) => {
563                            let mut rx = hyperway.inbound.rx(None).await;
564                            hyperways.insert(hyperway.remote.clone(), hyperway);
565                            let call_tx = call_tx.clone();
566                            let logger = logger.clone();
567                            tokio::spawn(async move {
568                                while let Some(wave) = rx.recv().await {
569                                    call_tx
570                                        .send_timeout(
571                                            HyperwayInterchangeCall::Wave(wave),
572                                            Duration::from_secs(60u64),
573                                        )
574                                        .await;
575                                }
576                            });
577                        }
578                        HyperwayInterchangeCall::Remove(point) => {
579                            hyperways.remove(&point);
580                        }
581                        HyperwayInterchangeCall::Wave(wave) => match wave.to().single_or() {
582                            Ok(to) => match hyperways.get(&to) {
583                                None => {
584                                    logger.warn(
585                                            format!("wave is addressed to hyperway that this interchagne does not have from: {} to: {} ",
586                                                    wave.from().to_string(),
587                                                    wave.to().to_string()
588                                            )
589                                        );
590                                }
591                                Some(hyperway) => {
592                                    hyperway.outbound.send(wave).await;
593                                }
594                            },
595                            Err(_) => {
596                                logger.warn("Hyperway Interchange cannot route Ripples, instead wrap in a Hop or Transport");
597                            }
598                        },
599                        HyperwayInterchangeCall::Mount {
600                            stub,
601                            init_wave,
602                            rtn,
603                        } => match hyperways.get(&stub.remote) {
604                            None => {
605                                logger.error(format!(
606                                    "mount hyperway {} not found",
607                                    stub.remote.to_string()
608                                ));
609                                rtn.send(Err(format!(
610                                    "hyperway {} not found",
611                                    stub.remote.to_string()
612                                )
613                                .into()));
614                            }
615                            Some(hyperway) => {
616                                let endpoint = hyperway.hyperway_endpoint_far(init_wave).await;
617                                rtn.send(Ok(endpoint));
618                            }
619                        },
620                    }
621                }
622            });
623        }
624
625        Self {
626            call_tx,
627            logger,
628            singular_to: None,
629        }
630    }
631
632    pub fn router(&self) -> Box<dyn Router> {
633        Box::new(OutboundRouter::new(
634            self.call_tx.clone(),
635            self.logger.clone(),
636        ))
637    }
638
639    pub fn point(&self) -> &Point {
640        &self.logger.point
641    }
642
643    pub async fn mount(
644        &self,
645        stub: HyperwayStub,
646        init_wave: Option<UltraWave>,
647    ) -> Result<HyperwayEndpoint, SpaceErr> {
648        let call_tx = self.call_tx.clone();
649        let (tx, rx) = oneshot::channel();
650        call_tx
651            .send(HyperwayInterchangeCall::Mount {
652                stub: stub.clone(),
653                init_wave,
654                rtn: tx,
655            })
656            .await;
657        rx.await?
658    }
659
660    pub fn singular_to(&mut self, to: Surface) {
661        self.singular_to.replace(to);
662    }
663
664    pub async fn add(&self, mut hyperway: Hyperway) {
665        if let Some(to) = self.singular_to.as_ref() {
666            hyperway.transform_to(to.clone());
667        }
668
669        self.call_tx
670            .send(HyperwayInterchangeCall::Internal(hyperway))
671            .await;
672    }
673
674    pub fn remove(&self, hyperway: Surface) {
675        let call_tx = self.call_tx.clone();
676        tokio::spawn(async move {
677            call_tx
678                .send(HyperwayInterchangeCall::Remove(hyperway))
679                .await;
680        });
681    }
682
683    pub async fn route(&self, wave: UltraWave) {
684        self.call_tx.send(HyperwayInterchangeCall::Wave(wave)).await;
685    }
686}
687
688#[async_trait]
689pub trait HyperRouter: Send + Sync {
690    async fn route(&self, wave: HyperWave);
691}
692
693pub struct OutboundRouter {
694    pub logger: PointLogger,
695    pub call_tx: mpsc::Sender<HyperwayInterchangeCall>,
696}
697
698impl OutboundRouter {
699    pub fn new(call_tx: mpsc::Sender<HyperwayInterchangeCall>, logger: PointLogger) -> Self {
700        Self { call_tx, logger }
701    }
702}
703
704#[async_trait]
705impl Router for OutboundRouter {
706    async fn route(&self, wave: UltraWave) {
707        self.logger
708            .track(&wave, || Tracker::new(format!("outbound:router"), "Route"));
709
710        self.call_tx.send(HyperwayInterchangeCall::Wave(wave)).await;
711    }
712}
713
714#[async_trait]
715pub trait HyperGreeter: Send + Sync + Clone + Sized {
716    async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr>;
717}
718
719#[derive(Clone)]
720pub struct SimpleGreeter {
721    hop: Surface,
722    transport: Surface,
723}
724
725impl SimpleGreeter {
726    pub fn new(hop: Surface, transport: Surface) -> Self {
727        Self { hop, transport }
728    }
729}
730
731#[async_trait]
732impl HyperGreeter for SimpleGreeter {
733    async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr> {
734        Ok(Greet {
735            surface: stub.remote,
736            agent: stub.agent,
737            hop: self.hop.clone(),
738            transport: self.transport.clone(),
739        })
740    }
741}
742
743#[async_trait]
744pub trait HyperAuthenticator: Send + Sync + Clone + Sized {
745    async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr>;
746}
747
748#[derive(Clone)]
749pub struct TokenAuthenticator {
750    pub token: Token,
751    pub agent: Agent,
752}
753
754impl TokenAuthenticator {
755    pub fn new(agent: Agent, token: Token) -> Self {
756        Self { agent, token }
757    }
758}
759
760#[async_trait]
761impl HyperAuthenticator for TokenAuthenticator {
762    async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
763        if let Substance::Token(token) = &*knock.auth {
764            if *token == self.token {
765                Ok(HyperwayStub {
766                    agent: self.agent.clone(),
767                    remote: knock
768                        .remote
769                        .ok_or::<SpaceErr>("expected a remote entry selection".into())?,
770                })
771            } else {
772                Err(SpaceErr::new(500, "invalid token"))
773            }
774        } else {
775            Err(SpaceErr::new(500, "expected Subtance: Token"))
776        }
777    }
778}
779
780#[derive(Clone)]
781pub struct AnonHyperAuthenticator;
782
783impl AnonHyperAuthenticator {
784    pub fn new() -> Self {
785        Self {}
786    }
787}
788
789#[derive(Clone)]
790pub struct TokenAuthenticatorWithRemoteWhitelist {
791    pub token: Token,
792    pub agent: Agent,
793    pub whitelist: HashSet<Point>,
794}
795
796impl TokenAuthenticatorWithRemoteWhitelist {
797    pub fn new(agent: Agent, token: Token, whitelist: HashSet<Point>) -> Self {
798        Self {
799            agent,
800            token,
801            whitelist,
802        }
803    }
804}
805
806#[async_trait]
807impl HyperAuthenticator for TokenAuthenticatorWithRemoteWhitelist {
808    async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
809        if let Substance::Token(token) = &*knock.auth {
810            if *token == self.token {
811                let remote = knock
812                    .remote
813                    .ok_or(SpaceErr::new(500, "expected a remote entry selection"))?;
814                if self.whitelist.contains(&remote) {
815                    Ok(HyperwayStub {
816                        agent: self.agent.clone(),
817                        remote,
818                    })
819                } else {
820                    Err(SpaceErr::new(500, "remote is not part of the whitelist"))
821                }
822            } else {
823                Err(SpaceErr::new(500, "invalid token"))
824            }
825        } else {
826            Err(SpaceErr::new(500, "expecting Substance: Token"))
827        }
828    }
829}
830
831#[async_trait]
832impl HyperAuthenticator for AnonHyperAuthenticator {
833    async fn auth(&self, req: Knock) -> Result<HyperwayStub, SpaceErr> {
834        let remote = req
835            .remote
836            .ok_or(SpaceErr::new(500, "required remote point request"))?;
837
838        Ok(HyperwayStub {
839            agent: Agent::Anonymous,
840            remote,
841        })
842    }
843}
844
845#[derive(Clone)]
846pub struct AnonHyperAuthenticatorAssignEndPoint {
847    pub logger: PointLogger,
848    pub remote_point_factory: Arc<dyn PointFactory>,
849}
850
851impl AnonHyperAuthenticatorAssignEndPoint {
852    pub fn new(remote_point_factory: Arc<dyn PointFactory>, logger: PointLogger) -> Self {
853        Self {
854            remote_point_factory,
855            logger,
856        }
857    }
858}
859
860#[async_trait]
861impl HyperAuthenticator for AnonHyperAuthenticatorAssignEndPoint {
862    async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
863        let remote = self
864            .logger
865            .result(self.remote_point_factory.create().await)?
866            .to_surface();
867        Ok(HyperwayStub {
868            agent: Agent::Anonymous,
869            remote,
870        })
871    }
872}
873
874#[derive(Clone)]
875pub struct TokensFromHeavenHyperAuthenticatorAssignEndPoint {
876    pub logger: RootLogger,
877    pub tokens: Arc<DashMap<Token, HyperwayStub>>,
878}
879
880impl TokensFromHeavenHyperAuthenticatorAssignEndPoint {
881    pub fn new(tokens: Arc<DashMap<Token, HyperwayStub>>, logger: RootLogger) -> Self {
882        Self { logger, tokens }
883    }
884}
885
886#[async_trait]
887impl HyperAuthenticator for TokensFromHeavenHyperAuthenticatorAssignEndPoint {
888    async fn auth(&self, auth_req: Knock) -> Result<HyperwayStub, SpaceErr> {
889        match &*auth_req.auth {
890            Substance::Token(token) => {
891                if let Some((_, stub)) = self.tokens.remove(token) {
892                    return Ok(stub);
893                } else {
894                    return Err(SpaceErr::new(500, "invalid token"));
895                }
896            }
897            _ => {
898                return Err(SpaceErr::new(500, "expected Substance: Token"));
899            }
900        }
901    }
902}
903
904pub struct TokenDispensingHyperwayInterchange {
905    pub agent: Agent,
906    pub logger: PointLogger,
907    pub tokens: Arc<DashMap<Token, HyperwayStub>>,
908    pub lane_point_factory: Box<dyn PointFactory>,
909    pub remote_point_factory: Box<dyn PointFactory>,
910    pub interchange: HyperwayInterchange,
911}
912
913impl TokenDispensingHyperwayInterchange {
914    pub fn new(
915        agent: Agent,
916        router: Box<dyn HyperRouter>,
917        lane_point_factory: Box<dyn PointFactory>,
918        end_point_factory: Box<dyn PointFactory>,
919        logger: PointLogger,
920    ) -> Self {
921        let tokens = Arc::new(DashMap::new());
922        let authenticator = Box::new(TokensFromHeavenHyperAuthenticatorAssignEndPoint::new(
923            tokens.clone(),
924            logger.logger.clone(),
925        ));
926        let interchange = HyperwayInterchange::new(logger.clone());
927        Self {
928            agent,
929            tokens,
930            logger,
931            lane_point_factory,
932            remote_point_factory: end_point_factory,
933            interchange,
934        }
935    }
936
937    pub async fn dispense(&self) -> Result<(Token, HyperwayStub), SpaceErr> {
938        let token = Token::new_uuid();
939        let remote_point = self.remote_point_factory.create().await?.to_surface();
940        let lane_point = self.lane_point_factory.create().await?;
941        let logger = self.logger.point(lane_point);
942        let stub = HyperwayStub {
943            agent: self.agent.clone(),
944            remote: remote_point,
945        };
946        self.tokens.insert(token.clone(), stub.clone());
947        Ok((token, stub))
948    }
949}
950
951impl Deref for TokenDispensingHyperwayInterchange {
952    type Target = HyperwayInterchange;
953
954    fn deref(&self) -> &Self::Target {
955        &self.interchange
956    }
957}
958
959impl DerefMut for TokenDispensingHyperwayInterchange {
960    fn deref_mut(&mut self) -> &mut Self::Target {
961        &mut self.interchange
962    }
963}
964
965pub struct VersionGate {
966    selector: HyperGateSelector,
967}
968
969impl VersionGate {
970    pub fn new(selector: HyperGateSelector) -> Self {
971        Self { selector }
972    }
973    pub async fn unlock(&self, version: semver::Version) -> Result<HyperGateSelector, String> {
974        if version == *VERSION {
975            Ok(self.selector.clone())
976        } else {
977            Err("version mismatch".to_string())
978        }
979    }
980}
981
982#[async_trait]
983pub trait HyperGate: Send + Sync {
984    async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr>;
985
986    async fn jump(
987        &self,
988        kind: InterchangeKind,
989        stub: HyperwayStub,
990    ) -> Result<HyperwayEndpoint, SpaceErr>;
991}
992
993pub struct HopRouter {
994    greet: Greet,
995    tx: mpsc::Sender<UltraWave>,
996}
997
998impl HopRouter {
999    fn to_hop(&self, mut wave: UltraWave) -> Result<UltraWave, SpaceErr> {
1000        wave.set_agent(self.greet.agent.clone());
1001        let mut transport = wave
1002            .wrap_in_transport(self.greet.surface.clone(), self.greet.transport.clone())
1003            .build()?
1004            .to_signal()?;
1005        let hop = transport
1006            .wrap_in_hop(Point::local_portal().to_surface(), self.greet.hop.clone())
1007            .build()?
1008            .to_ultra();
1009        Ok(hop)
1010    }
1011}
1012
1013#[async_trait]
1014impl Router for HopRouter {
1015    async fn route(&self, wave: UltraWave) {
1016        match self.to_hop(wave) {
1017            Ok(hop) => {
1018                self.tx.send(hop).await.unwrap_or_default();
1019            }
1020            Err(err) => {
1021                println!("{}", err.to_string());
1022            }
1023        }
1024    }
1025}
1026
1027pub struct HyperApi {
1028    greet: Greet,
1029    hyperway: HyperwayEndpoint,
1030    exchanger: Exchanger,
1031}
1032
1033impl HyperApi {
1034    pub fn new(hyperway: HyperwayEndpoint, greet: Greet, logger: PointLogger) -> Self {
1035        let exchanger = Exchanger::new(greet.surface.clone(), Default::default(), logger);
1036        Self {
1037            greet,
1038            hyperway,
1039            exchanger,
1040        }
1041    }
1042
1043    pub fn router(&self) -> HopRouter {
1044        HopRouter {
1045            greet: self.greet.clone(),
1046            tx: self.hyperway.tx.clone(),
1047        }
1048    }
1049
1050    pub fn transmitter(&self) -> ProtoTransmitter {
1051        let mut builder =
1052            ProtoTransmitterBuilder::new(Arc::new(self.router()), self.exchanger.clone());
1053        builder.agent = SetStrategy::Override(self.greet.agent.clone());
1054        builder.build()
1055    }
1056}
1057
1058#[derive(Clone)]
1059pub struct HyperGateSelector {
1060    map: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>>,
1061}
1062
1063impl Default for HyperGateSelector {
1064    fn default() -> Self {
1065        Self::new(Arc::new(DashMap::new()))
1066    }
1067}
1068
1069impl HyperGateSelector {
1070    pub fn new(map: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>>) -> Self {
1071        Self { map }
1072    }
1073
1074    pub fn add(&self, kind: InterchangeKind, gate: Arc<dyn HyperGate>) -> Result<(), SpaceErr> {
1075        if self.map.contains_key(&kind) {
1076            Err(format!("already have an interchange of kind: {}", kind.to_string()).into())
1077        } else {
1078            self.map.insert(kind, gate);
1079            Ok(())
1080        }
1081    }
1082}
1083
1084#[async_trait]
1085impl HyperGate for HyperGateSelector {
1086    async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1087        if let Some(gate) = self.map.get(&knock.kind) {
1088            gate.value().knock(knock).await
1089        } else {
1090            Err(SpaceErr::new(
1091                500,
1092                format!("interchange not available: {}", knock.kind.to_string()).as_str(),
1093            ))
1094        }
1095    }
1096
1097    async fn jump(
1098        &self,
1099        kind: InterchangeKind,
1100        stub: HyperwayStub,
1101    ) -> Result<HyperwayEndpoint, SpaceErr> {
1102        self.map
1103            .get(&kind)
1104            .ok_or(SpaceErr::new(
1105                500,
1106                format!("interchange kind not available: {}", kind.to_string()).as_str(),
1107            ))?
1108            .value()
1109            .jump(kind, stub)
1110            .await
1111    }
1112}
1113
1114pub trait HyperwayConfigurator: Send + Sync {
1115    fn config(&self, greet: &Greet, hyperway: &mut Hyperway);
1116}
1117
1118pub struct DefaultHyperwayConfigurator;
1119
1120impl HyperwayConfigurator for DefaultHyperwayConfigurator {
1121    fn config(&self, greet: &Greet, hyperway: &mut Hyperway) {}
1122}
1123
1124#[derive(Clone)]
1125pub struct InterchangeGate<A, G, C>
1126where
1127    A: HyperAuthenticator,
1128    G: HyperGreeter,
1129    C: HyperwayConfigurator,
1130{
1131    logger: PointLogger,
1132    auth: A,
1133    greeter: G,
1134    interchange: Arc<HyperwayInterchange>,
1135    configurator: C,
1136}
1137impl<A, G, C> InterchangeGate<A, G, C>
1138where
1139    A: HyperAuthenticator,
1140    G: HyperGreeter,
1141    C: HyperwayConfigurator,
1142{
1143    pub fn new(
1144        auth: A,
1145        greeter: G,
1146        configurator: C,
1147        interchange: Arc<HyperwayInterchange>,
1148        logger: PointLogger,
1149    ) -> Self {
1150        Self {
1151            auth,
1152            greeter,
1153            configurator,
1154            interchange,
1155            logger,
1156        }
1157    }
1158}
1159
1160impl<A, G, C> InterchangeGate<A, G, C>
1161where
1162    A: HyperAuthenticator,
1163    G: HyperGreeter,
1164    C: HyperwayConfigurator,
1165{
1166    async fn enter(&self, greet: Greet) -> Result<HyperwayEndpoint, SpaceErr> {
1167        let mut hyperway = Hyperway::new(
1168            greet.surface.clone(),
1169            greet.agent.clone(),
1170            self.logger.clone(),
1171        );
1172        self.configurator.config(&greet, &mut hyperway);
1173
1174        self.interchange.add(hyperway).await;
1175
1176        let port = greet.surface.clone();
1177        let stub = HyperwayStub {
1178            agent: greet.agent.clone(),
1179            remote: greet.surface.clone(),
1180        };
1181
1182        let mut ext = self.logger.result_ctx(
1183            "InterchangeGate.enter",
1184            self.interchange.mount(stub, Some(greet.into())).await,
1185        )?;
1186
1187        let (drop_tx, drop_rx) = oneshot::channel();
1188        ext.drop_tx = Some(drop_tx);
1189
1190        let interchange = self.interchange.clone();
1191        tokio::spawn(async move {
1192            drop_rx.await;
1193            interchange.remove(port);
1194        });
1195
1196        Ok(ext)
1197    }
1198}
1199
1200#[async_trait]
1201impl<A, G, C> HyperGate for InterchangeGate<A, G, C>
1202where
1203    A: HyperAuthenticator,
1204    G: HyperGreeter,
1205    C: HyperwayConfigurator,
1206{
1207    async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1208        let stub = self.auth.auth(knock).await?;
1209        let greet = self.greeter.greet(stub).await?;
1210        self.enter(greet).await
1211    }
1212
1213    async fn jump(
1214        &self,
1215        _kind: InterchangeKind,
1216        stub: HyperwayStub,
1217    ) -> Result<HyperwayEndpoint, SpaceErr> {
1218        let greet = self.greeter.greet(stub).await?;
1219        self.enter(greet).await
1220    }
1221}
1222
1223#[derive(Clone)]
1224pub struct MountInterchangeGate<A, G>
1225where
1226    A: HyperAuthenticator,
1227    G: HyperGreeter,
1228{
1229    logger: PointLogger,
1230    auth: A,
1231    greeter: G,
1232    interchange: Arc<HyperwayInterchange>,
1233}
1234
1235impl<A, G> MountInterchangeGate<A, G>
1236where
1237    A: HyperAuthenticator,
1238    G: HyperGreeter,
1239{
1240    pub fn new(
1241        auth: A,
1242        greeter: G,
1243        interchange: Arc<HyperwayInterchange>,
1244        logger: PointLogger,
1245    ) -> Self {
1246        Self {
1247            auth,
1248            greeter,
1249            interchange,
1250            logger,
1251        }
1252    }
1253
1254    async fn enter(&self, greet: Greet) -> Result<HyperwayEndpoint, SpaceErr> {
1255        let stub = HyperwayStub::new(greet.surface.clone(), greet.agent.clone());
1256        let ext = self
1257            .interchange
1258            .mount(stub.clone(), Some(greet.into()))
1259            .await?;
1260        Ok(ext)
1261    }
1262}
1263
1264#[async_trait]
1265impl<A, G> HyperGate for MountInterchangeGate<A, G>
1266where
1267    A: HyperAuthenticator,
1268    G: HyperGreeter,
1269{
1270    async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1271        let stub = self.auth.auth(knock).await?;
1272        let greet = self.greeter.greet(stub).await?;
1273        let ext = self.enter(greet).await?;
1274        Ok(ext)
1275    }
1276
1277    async fn jump(
1278        &self,
1279        _kind: InterchangeKind,
1280        stub: HyperwayStub,
1281    ) -> Result<HyperwayEndpoint, SpaceErr> {
1282        let greet = self.greeter.greet(stub).await?;
1283        let ext = self.enter(greet).await?;
1284        Ok(ext)
1285    }
1286}
1287
1288pub struct HyperClient {
1289    tx: mpsc::Sender<UltraWave>,
1290    status_rx: watch::Receiver<HyperConnectionStatus>,
1291    to_client_listener_tx: broadcast::Sender<UltraWave>,
1292    logger: PointLogger,
1293    greet_rx: watch::Receiver<Option<Greet>>,
1294    exchanger: Option<Exchanger>,
1295}
1296
1297impl HyperClient {
1298    pub fn new(
1299        factory: Box<dyn HyperwayEndpointFactory>,
1300        logger: PointLogger,
1301    ) -> Result<HyperClient, SpaceErr> {
1302        Self::new_with_exchanger(factory, None, logger)
1303    }
1304
1305    pub fn new_with_exchanger(
1306        factory: Box<dyn HyperwayEndpointFactory>,
1307        exchanger: Option<Exchanger>,
1308        logger: PointLogger,
1309    ) -> Result<HyperClient, SpaceErr> {
1310        let (to_client_listener_tx, _) = broadcast::channel(1024);
1311        let (to_hyperway_tx, from_client_rx) = mpsc::channel(1024);
1312        let (status_watch_tx, mut status_rx) = watch::channel(HyperConnectionStatus::Pending);
1313
1314        let (status_mpsc_tx, mut status_mpsc_rx): (
1315            mpsc::Sender<HyperConnectionStatus>,
1316            mpsc::Receiver<HyperConnectionStatus>,
1317        ) = mpsc::channel(128);
1318
1319        tokio::spawn(async move {
1320            while let Some(status) = status_mpsc_rx.recv().await {
1321                let result = status_watch_tx.send(status.clone());
1322                if status == HyperConnectionStatus::Fatal {
1323                    break;
1324                }
1325                if status == HyperConnectionStatus::Closed {
1326                    break;
1327                }
1328                if let Err(_) = result {
1329                    break;
1330                }
1331            }
1332        });
1333
1334        let mut from_runner_rx = HyperClientRunner::new(
1335            factory,
1336            from_client_rx,
1337            status_mpsc_tx.clone(),
1338            logger.clone(),
1339        );
1340
1341        let (greet_tx, greet_rx) = watch::channel(None);
1342
1343        let mut client = Self {
1344            tx: to_hyperway_tx,
1345            status_rx: status_rx.clone(),
1346            to_client_listener_tx: to_client_listener_tx.clone(),
1347            logger: logger.clone(),
1348            greet_rx,
1349            exchanger: exchanger.clone(),
1350        };
1351
1352        {
1353            let logger = logger.clone();
1354            tokio::spawn(async move {
1355                while let Ok(_) = status_rx.changed().await {
1356                    let status = status_rx.borrow().clone();
1357                    //                    logger.info(format!("HyperClient status: {}", status.to_string()))
1358                }
1359            });
1360        }
1361
1362        {
1363            let logger = logger.clone();
1364            let status_tx = status_mpsc_tx.clone();
1365            tokio::spawn(async move {
1366                async fn relay(
1367                    mut from_runner_rx: mpsc::Receiver<UltraWave>,
1368                    to_client_listener_tx: broadcast::Sender<UltraWave>,
1369                    status_tx: mpsc::Sender<HyperConnectionStatus>,
1370                    greet_tx: watch::Sender<Option<Greet>>,
1371                    exchanger: Option<Exchanger>,
1372                    logger: PointLogger,
1373                ) -> Result<(), SpaceErr> {
1374                    if let Some(wave) = from_runner_rx.recv().await {
1375                        logger.track(&wave, || Tracker::new("client", "ReceiveReflected"));
1376
1377                        match wave.to_reflected() {
1378                            Ok(reflected) => {
1379                                if !reflected.core().status.is_success() {
1380                                    match reflected.core().status.as_u16() {
1381                                        400 => {
1382                                            status_tx
1383                                                .send(HyperConnectionStatus::Fatal)
1384                                                .await
1385                                                .unwrap_or_default();
1386                                            let err = "400: Bad Request: FATAL: something in the knock was incorrect";
1387                                            return Err(err.into());
1388                                        }
1389                                        401 => {
1390                                            status_tx
1391                                                .send(HyperConnectionStatus::Fatal)
1392                                                .await
1393                                                .unwrap_or_default();
1394                                            let err = "401: Unauthorized: FATAL: authentication failed (bad credentials?)";
1395                                            return Err(err.into());
1396                                        }
1397                                        403 => {
1398                                            status_tx
1399                                                .send(HyperConnectionStatus::Fatal)
1400                                                .await
1401                                                .unwrap_or_default();
1402                                            let err = "403: Forbidden: FATAL: authentication succeeded however the authenticated agent does not have permission to connect to this service";
1403                                            return Err(err.into());
1404                                        }
1405                                        408 => {
1406                                            status_tx
1407                                                .send(HyperConnectionStatus::Panic)
1408                                                .await
1409                                                .unwrap_or_default();
1410                                            let err = "408: Request Timeout: PANIC";
1411                                            return Err(err.into());
1412                                        }
1413                                        301 => {
1414                                            status_tx
1415                                                .send(HyperConnectionStatus::Fatal)
1416                                                .await
1417                                                .unwrap_or_default();
1418                                            let err = "301: Moved Permanently: FATAL: please update to new connection address";
1419                                            return Err(err.into());
1420                                        }
1421                                        503 => {
1422                                            status_tx
1423                                                .send(HyperConnectionStatus::Panic)
1424                                                .await
1425                                                .unwrap_or_default();
1426                                            let err =
1427                                                "503: Service Unavailable: PANIC: try again later";
1428                                            return Err(err.into());
1429                                        }
1430                                        _ => {
1431                                            status_tx
1432                                                .send(HyperConnectionStatus::Panic)
1433                                                .await
1434                                                .unwrap_or_default();
1435                                            let err = format!(
1436                                                "{}: {}: PANIC: expected 200",
1437                                                reflected.core().status.as_u16(),
1438                                                reflected.core().status.to_string()
1439                                            );
1440                                            return Err(err.into());
1441                                        }
1442                                    }
1443                                }
1444                                if let Substance::Greet(greet) = &reflected.core().body {
1445                                    greet_tx.send(Some(greet.clone()));
1446                                } else {
1447                                    status_tx
1448                                        .send(HyperConnectionStatus::Fatal)
1449                                        .await
1450                                        .unwrap_or_default();
1451                                    let err = "HyperClient expected first wave Substance to be a reflected Greeting";
1452                                    return Err(err.into());
1453                                }
1454                            }
1455                            Err(err) => {
1456                                status_tx
1457                                    .send(HyperConnectionStatus::Fatal)
1458                                    .await
1459                                    .unwrap_or_default();
1460                                let err = format!("HyperClient expected first wave Substance to be a reflected Greeting. Instead when attempting to convert to a reflected wave err occured: {}", err.to_string());
1461                                return Err(err.into());
1462                            }
1463                        }
1464                    }
1465
1466                    while let Some(wave) = from_runner_rx.recv().await {
1467                        if exchanger.is_some() {
1468                            if wave.is_directed() {
1469                                to_client_listener_tx.send(wave)?;
1470                            } else {
1471                                exchanger
1472                                    .as_ref()
1473                                    .unwrap()
1474                                    .reflected(wave.to_reflected()?)
1475                                    .await?;
1476                            }
1477                        } else {
1478                            to_client_listener_tx.send(wave)?;
1479                        }
1480                    }
1481                    Ok(())
1482                }
1483
1484                relay(
1485                    from_runner_rx,
1486                    to_client_listener_tx,
1487                    status_tx,
1488                    greet_tx,
1489                    exchanger,
1490                    logger.clone(),
1491                )
1492                .await
1493                .unwrap_or_default();
1494            });
1495        }
1496
1497        Ok(client)
1498    }
1499
1500    pub fn exchanger(&self) -> Option<Exchanger> {
1501        self.exchanger.clone()
1502    }
1503
1504    pub async fn transmitter_builder(&self) -> Result<ProtoTransmitterBuilder, SpaceErr> {
1505        self.wait_for_ready(Duration::from_secs(30)).await?;
1506        let mut builder = ProtoTransmitterBuilder::new(
1507            Arc::new(self.router()),
1508            self.exchanger
1509                .as_ref()
1510                .ok_or(SpaceErr::server_error(
1511                    "cannot create a transmitter on a client that does not have an exchanger",
1512                ))?
1513                .clone(),
1514        );
1515        let greet = self
1516            .get_greeting()
1517            .ok_or::<SpaceErr>("expected greeting to already be set in HyperClient".into())?;
1518        builder.agent = SetStrategy::Fill(greet.agent.clone());
1519        builder.from = SetStrategy::Fill(greet.surface.clone());
1520        Ok(builder)
1521    }
1522
1523    pub fn reset(&self) {
1524        let mut wave = DirectedProto::signal();
1525        wave.to(LOCAL_CLIENT_RUNNER.clone().to_surface());
1526        wave.method(ExtMethod::new("Reset").unwrap());
1527        let wave = wave.build().unwrap();
1528        let wave = wave.to_ultra();
1529        let tx = self.tx.clone();
1530        tokio::spawn(async move {
1531            tx.send(wave).await.unwrap_or_default();
1532        });
1533    }
1534
1535    pub async fn close(&self) {
1536        let mut wave = DirectedProto::signal();
1537        wave.from(LOCAL_CLIENT.clone().to_surface());
1538        wave.to(LOCAL_CLIENT_RUNNER.clone().to_surface());
1539        wave.method(ExtMethod::new("Close").unwrap());
1540        let wave = wave.build().unwrap();
1541        let wave = wave.to_ultra();
1542        let tx = self.tx.clone();
1543        tokio::spawn(async move {
1544            tx.send(wave).await.unwrap_or_default();
1545        });
1546    }
1547
1548    pub fn router(&self) -> TxRouter {
1549        TxRouter::new(self.tx.clone())
1550    }
1551
1552    pub fn rx(&self) -> broadcast::Receiver<UltraWave> {
1553        self.to_client_listener_tx.subscribe()
1554    }
1555
1556    pub fn get_greeting(&self) -> Option<Greet> {
1557        self.greet_rx.borrow().clone()
1558    }
1559
1560    pub async fn wait_for_greet(&self) -> Result<Greet, SpaceErr> {
1561        let mut greet_rx = self.greet_rx.clone();
1562        loop {
1563            let greet = greet_rx.borrow().clone();
1564            if greet.is_some() {
1565                return Ok(greet.unwrap());
1566            } else {
1567                greet_rx.changed().await?;
1568            }
1569        }
1570    }
1571
1572    pub async fn wait_for_ready(&self, duration: Duration) -> Result<(), SpaceErr> {
1573        let mut status_rx = self.status_rx.clone();
1574        let (rtn, mut rtn_rx) = oneshot::channel();
1575
1576        tokio::spawn(async move {
1577            loop {
1578                let status = status_rx.borrow().clone();
1579                match status {
1580                    HyperConnectionStatus::Ready => {
1581                        rtn.send(Ok(()));
1582                        break;
1583                    }
1584                    HyperConnectionStatus::Fatal => {
1585                        rtn.send(Err(SpaceErr::server_error(
1586                            "Fatal status from HyperClient while waiting for Ready",
1587                        )));
1588                        break;
1589                    }
1590                    _ => {}
1591                }
1592            }
1593        });
1594
1595        tokio::time::timeout(duration, rtn_rx).await??
1596    }
1597}
1598
1599#[derive(Clone, Eq, PartialEq)]
1600pub struct HyperConnectionDetails {
1601    pub status: HyperConnectionStatus,
1602    pub info: String,
1603}
1604
1605impl HyperConnectionDetails {
1606    pub fn new<S: ToString>(status: HyperConnectionStatus, info: S) -> Self {
1607        Self {
1608            status,
1609            info: info.to_string(),
1610        }
1611    }
1612}
1613
1614#[derive(Clone, strum_macros::Display, Eq, PartialEq)]
1615pub enum HyperConnectionStatus {
1616    Unknown,
1617    Pending,
1618    Connecting,
1619    Handshake,
1620    Auth,
1621    Ready,
1622    Panic,
1623    Fatal,
1624    Closed,
1625}
1626
1627pub enum HyperClientCall {
1628    Close,
1629}
1630
1631pub enum HyperConnectionErr {
1632    Fatal(String),
1633    Retry(String),
1634}
1635
1636impl ToString for HyperConnectionErr {
1637    fn to_string(&self) -> String {
1638        match self {
1639            HyperConnectionErr::Fatal(m) => format!("Fatal({})", m),
1640            HyperConnectionErr::Retry(m) => format!("Retry({})", m),
1641        }
1642    }
1643}
1644
1645impl From<SpaceErr> for HyperConnectionErr {
1646    fn from(err: SpaceErr) -> Self {
1647        HyperConnectionErr::Retry(err.to_string())
1648    }
1649}
1650
1651pub struct HyperClientRunner {
1652    ext: Option<HyperwayEndpoint>,
1653    factory: Box<dyn HyperwayEndpointFactory>,
1654    status_tx: mpsc::Sender<HyperConnectionStatus>,
1655    to_client_tx: mpsc::Sender<UltraWave>,
1656    from_client_rx: mpsc::Receiver<UltraWave>,
1657    logger: PointLogger,
1658}
1659
1660impl HyperClientRunner {
1661    pub fn new(
1662        factory: Box<dyn HyperwayEndpointFactory>,
1663        from_client_rx: mpsc::Receiver<UltraWave>,
1664        status_tx: mpsc::Sender<HyperConnectionStatus>,
1665        logger: PointLogger,
1666    ) -> mpsc::Receiver<UltraWave> {
1667        let (to_client_tx, from_runner_rx) = mpsc::channel(1024);
1668        let logger = logger.push_point("runner").unwrap();
1669        let runner = Self {
1670            ext: None,
1671            factory,
1672            to_client_tx,
1673            from_client_rx,
1674            status_tx,
1675            logger,
1676        };
1677
1678        tokio::spawn(async move {
1679            runner.start().await;
1680        });
1681
1682        from_runner_rx
1683    }
1684
1685    async fn start(mut self) {
1686        self.status_tx
1687            .send(HyperConnectionStatus::Pending)
1688            .await
1689            .unwrap_or_default();
1690
1691        loop {
1692            async fn connect(runner: &mut HyperClientRunner) -> Result<(), HyperConnectionErr> {
1693                if let Err(_) = runner
1694                    .status_tx
1695                    .send(HyperConnectionStatus::Connecting)
1696                    .await
1697                {
1698                    return Err(HyperConnectionErr::Fatal("can no longer update HyperClient status (probably due to previous Fatal status)".to_string()));
1699                }
1700                let (details_tx, mut details_rx): (
1701                    mpsc::Sender<HyperConnectionDetails>,
1702                    mpsc::Receiver<HyperConnectionDetails>,
1703                ) = mpsc::channel(1024);
1704                {
1705                    let logger = runner.logger.clone();
1706                    tokio::spawn(async move {
1707                        while let Some(detail) = details_rx.recv().await {
1708                            logger.info(format!("{} | {}", detail.status.to_string(), detail.info));
1709                        }
1710                    });
1711                }
1712                loop {
1713                    match runner.logger.result_ctx(
1714                        "connect",
1715                        tokio::time::timeout(
1716                            Duration::from_secs(60),
1717                            runner.factory.create(details_tx.clone()),
1718                        )
1719                        .await,
1720                    ) {
1721                        Ok(Ok(ext)) => {
1722                            runner.ext.replace(ext);
1723                            if let Err(_) =
1724                                runner.status_tx.send(HyperConnectionStatus::Ready).await
1725                            {
1726                                runner.ext.take();
1727                                return Err(HyperConnectionErr::Fatal("can no longer update HyperClient status (probably due to previous Fatal status)".to_string()));
1728                            }
1729
1730                            return Ok(());
1731                        }
1732                        Ok(Err(err)) => {
1733                            //runner.logger.error(format!("{}", err.to_string()));
1734                        }
1735                        Err(err) => {
1736                            runner.logger.error(format!("{}", err.to_string()));
1737                        }
1738                    }
1739                    // wait a little while before attempting to reconnect
1740                    // maybe add exponential backoff later
1741                    tokio::time::sleep(Duration::from_secs(1)).await;
1742                }
1743            }
1744
1745            async fn relay(runner: &mut HyperClientRunner) -> Result<(), SpaceErr> {
1746                let ext = runner
1747                    .ext
1748                    .as_mut()
1749                    .ok_or::<SpaceErr>("must reconnect".into())?;
1750
1751                loop {
1752                    tokio::select!(
1753                        wave = runner.from_client_rx.recv() => {
1754                                // message comes from client, therefore it should go towards ext (unless it's pointed to the runner)
1755                                match wave {
1756                                  Some(wave) => {
1757                                    if wave.is_directed() && wave.to().is_single() && wave.to().unwrap_single().point == *LOCAL_CLIENT_RUNNER
1758                                    {
1759                                        let method: ExtMethod = wave.to_directed().unwrap().core().method.clone().try_into().unwrap();
1760                                        if method.to_string() == "Reset".to_string() {
1761                                           return Err(SpaceErr::server_error("reset"));
1762                                        } else if method.to_string() == "Close".to_string(){
1763                                            runner.status_tx.send(HyperConnectionStatus::Closed).await;
1764                                            return Ok(());
1765                                        }
1766                                    } else {
1767                                        match ext.tx.send(wave).await {
1768                                            Ok(_) => {}
1769                                            Err(err) => {
1770                                                // wave gets lost... need to requeue it somehow...
1771                                                //                                    runner.to_client_tx.try_send(err.0);
1772                                                return Err(SpaceErr::server_error("ext failure"));
1773                                            }
1774                                        }
1775                                    }
1776                                      }
1777                                      None => {
1778                                        //runner.logger.warn("from_client_rx.recv() returned None");
1779                                        break;
1780                                      }
1781                                    }
1782                        }
1783
1784                        wave = ext.rx.recv() => {
1785                            match wave {
1786                                Some( wave ) => {
1787                                   runner.to_client_tx.send(wave).await;
1788                                }
1789                                None => {
1790                                   runner.logger.warn("client hyperway_endpoint has been closed.  This can happen if the client sender (tx) has been dropped.");
1791                                    break;
1792                                }
1793                            }
1794                        }
1795                    );
1796                }
1797
1798                //                runner.logger.warn("client relay interrupted");
1799
1800                Ok(())
1801            }
1802
1803            loop {
1804                match connect(&mut self).await {
1805                    Ok(_) => {}
1806                    Err(HyperConnectionErr::Fatal(message)) => {
1807                        // need to log the fatal error message somehow
1808                        self.status_tx
1809                            .send(HyperConnectionStatus::Fatal)
1810                            .await
1811                            .unwrap_or_default();
1812                        return;
1813                    }
1814                    Err(HyperConnectionErr::Retry(m)) => {
1815                        self.status_tx
1816                            .send(HyperConnectionStatus::Panic)
1817                            .await
1818                            .unwrap_or_default();
1819                    }
1820                }
1821
1822                match relay(&mut self).await {
1823                    Ok(_) => {
1824                        // natural end... this runner is ready to be dropped
1825                        break;
1826                    }
1827                    Err(err) => {
1828                        //self.logger.error(format!("{}", err.to_string()));
1829                        // some error occurred when relaying therefore we need to reconnect
1830                        self.ext = None;
1831                    }
1832                }
1833            }
1834        }
1835    }
1836}
1837
1838#[async_trait]
1839pub trait HyperwayEndpointFactory: Send + Sync {
1840    async fn create(
1841        &self,
1842        status_tx: mpsc::Sender<HyperConnectionDetails>,
1843    ) -> Result<HyperwayEndpoint, SpaceErr>;
1844}
1845
1846pub struct LocalHyperwayGateUnlocker {
1847    pub knock: Knock,
1848    pub gate: Arc<dyn HyperGate>,
1849}
1850
1851impl LocalHyperwayGateUnlocker {
1852    pub fn new(remote: Surface, gate: Arc<dyn HyperGate>) -> Self {
1853        let knock = Knock::new(InterchangeKind::Singleton, remote, Substance::Empty);
1854        Self { knock, gate }
1855    }
1856}
1857
1858#[async_trait]
1859impl HyperwayEndpointFactory for LocalHyperwayGateUnlocker {
1860    async fn create(
1861        &self,
1862        status_tx: mpsc::Sender<HyperConnectionDetails>,
1863    ) -> Result<HyperwayEndpoint, SpaceErr> {
1864        self.gate.knock(self.knock.clone()).await
1865    }
1866}
1867
1868pub struct LocalHyperwayGateJumper {
1869    pub kind: InterchangeKind,
1870    pub stub: HyperwayStub,
1871    pub gate: Arc<dyn HyperGate>,
1872}
1873
1874impl LocalHyperwayGateJumper {
1875    pub fn new(kind: InterchangeKind, stub: HyperwayStub, gate: Arc<dyn HyperGate>) -> Self {
1876        Self { kind, stub, gate }
1877    }
1878}
1879
1880#[async_trait]
1881impl HyperwayEndpointFactory for LocalHyperwayGateJumper {
1882    async fn create(
1883        &self,
1884        status_tx: mpsc::Sender<HyperConnectionDetails>,
1885    ) -> Result<HyperwayEndpoint, SpaceErr> {
1886        self.gate.jump(self.kind.clone(), self.stub.clone()).await
1887    }
1888}
1889
1890/*
1891pub struct DirectInterchangeMountHyperwayExtFactory {
1892    pub stub: HyperwayStub,
1893    pub interchange: Arc<HyperwayInterchange>,
1894}
1895
1896impl DirectInterchangeMountHyperwayExtFactory {
1897    pub fn new(stub: HyperwayStub, interchange: Arc<HyperwayInterchange>) -> Self {
1898        Self { stub, interchange }
1899    }
1900}
1901
1902#[async_trait]
1903impl HyperwayExtFactory for DirectInterchangeMountHyperwayExtFactory {
1904    async fn create(&self) -> Result<HyperwayExt, HyperConnectionErr> {
1905        match self.interchange.mount(self.stub.clone()).await {
1906            Ok(mount) => {
1907                let knock = Knock::new(
1908                    InterchangeKind::Singleton,
1909                    self.stub.remote.clone(),
1910                    Substance::Empty,
1911                );
1912                let wave: Wave<Ping> = knock.into();
1913                let wave = wave.to_ultra();
1914                mount.tx.send(wave).await;
1915                Ok(mount)
1916            }
1917            Err(_) => Err(HyperConnectionErr::Fatal(format!(
1918                "invalid mount point '{}'",
1919                self.stub.remote.to_string()
1920            ))),
1921        }
1922    }
1923}
1924
1925 */
1926
1927// connects two interchanges one local, the other via client
1928pub struct Bridge {
1929    client: HyperClient,
1930}
1931
1932impl Bridge {
1933    pub fn new(
1934        mut local_hyperway_endpoint: HyperwayEndpoint,
1935        remote_factory: Box<dyn HyperwayEndpointFactory>,
1936        logger: PointLogger,
1937    ) -> Result<Self, SpaceErr> {
1938        let client = HyperClient::new(remote_factory, logger)?;
1939        let client_router = client.router();
1940        let local_hyperway_endpoint_tx = local_hyperway_endpoint.tx.clone();
1941        tokio::spawn(async move {
1942            while let Some(wave) = local_hyperway_endpoint.rx.recv().await {
1943                client_router.route(wave).await;
1944            }
1945        });
1946
1947        let mut rx = client.rx();
1948        tokio::spawn(async move {
1949            while let Ok(wave) = rx.recv().await {
1950                local_hyperway_endpoint_tx.send(wave).await;
1951            }
1952        });
1953
1954        Ok(Self { client })
1955    }
1956
1957    pub fn reset(&self) {
1958        self.client.reset();
1959    }
1960
1961    pub fn close(&self) {
1962        self.client.close();
1963    }
1964
1965    pub fn status(&self) -> HyperConnectionStatus {
1966        self.client.status_rx.borrow().clone()
1967    }
1968}
1969
1970#[cfg(test)]
1971mod tests {
1972    use std::collections::HashMap;
1973    use std::str::FromStr;
1974    use std::sync::Arc;
1975
1976    use chrono::{DateTime, Utc};
1977    use dashmap::DashMap;
1978
1979    use cosmic_space::command::direct::create::PointFactoryU64;
1980    use cosmic_space::hyper::{InterchangeKind, Knock};
1981    use cosmic_space::point::Point;
1982    use cosmic_space::loc::Uuid;
1983    use cosmic_space::log::RootLogger;
1984    use cosmic_space::substance::Substance;
1985    use cosmic_space::wave::HyperWave;
1986
1987    use crate::{
1988        AnonHyperAuthenticator, HyperGate, HyperGateSelector, HyperRouter, HyperwayInterchange,
1989        InterchangeGate,
1990    };
1991
1992    #[no_mangle]
1993    pub(crate) extern "C" fn cosmic_uuid() -> String {
1994        uuid::Uuid::new_v4().to_string()
1995    }
1996
1997    #[no_mangle]
1998    pub(crate) extern "C" fn cosmic_timestamp() -> DateTime<Utc> {
1999        Utc::now()
2000    }
2001
2002    pub struct DummyRouter {}
2003
2004    #[async_trait]
2005    impl HyperRouter for DummyRouter {
2006        async fn route(&self, wave: HyperWave) {
2007            println!("received hyperwave!");
2008        }
2009    }
2010
2011    /*
2012    #[tokio::mem]
2013    async fn hyper_test() {
2014        let point = Point::from_str("mem").unwrap();
2015        let logger = RootLogger::default().point(point.clone());
2016        let interchange = Arc::new(HyperwayInterchange::new(
2017            logger.push("interchange").unwrap(),
2018        ));
2019
2020        let point_factory =
2021            PointFactoryU64::new(point.push("portals").unwrap(), "portal-".to_string());
2022        let auth = AnonHyperAuthenticator::new(
2023            Arc::new(point_factory),
2024            logger.logger.clone(),
2025        );
2026
2027        let gate = InterchangeGate::new(auth, interchange, logger.push("gate").unwrap());
2028
2029        let mut map = Arc::new(DashMap::new());
2030        map.insert(InterchangeKind::Cli, Box::new(gate));
2031
2032        let entry_router = HyperGateSelector::new(map);
2033
2034        let knock = Knock {
2035            kind: InterchangeKind::Cli,
2036            auth: Box::new(Substance::Empty),
2037            remote: Some(point.push("cli").unwrap()),
2038        };
2039
2040        entry_router.knock(knock).await.unwrap();
2041    }
2042
2043     */
2044}
2045
2046pub mod test_util {
2047    use std::collections::{HashMap, HashSet};
2048    use std::str::FromStr;
2049    use std::sync::Arc;
2050    use std::time::Duration;
2051
2052    use dashmap::DashMap;
2053    use lazy_static::lazy_static;
2054    use tokio::sync::{broadcast, mpsc, oneshot};
2055
2056    use cosmic_space::command::direct::create::PointFactoryU64;
2057    use cosmic_space::err::SpaceErr;
2058    use cosmic_space::hyper::{Greet, InterchangeKind, Knock};
2059    use cosmic_space::loc::{Layer, Surface, ToPoint, ToSurface};
2060    use cosmic_space::log::{PointLogger, RootLogger};
2061    use cosmic_space::point::Point;
2062    use cosmic_space::settings::Timeouts;
2063    use cosmic_space::substance::{Substance, Token};
2064    use cosmic_space::wave::core::cmd::CmdMethod;
2065    use cosmic_space::wave::core::ext::ExtMethod;
2066    use cosmic_space::wave::core::{Method, ReflectedCore};
2067    use cosmic_space::wave::exchange::asynch::{
2068        Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
2069    };
2070    use cosmic_space::wave::exchange::SetStrategy;
2071    use cosmic_space::wave::{
2072        Agent, DirectedKind, DirectedProto, HyperWave, Pong, ReflectedKind, ReflectedProto,
2073        ReflectedWave, UltraWave, Wave,
2074    };
2075
2076    use crate::{
2077        AnonHyperAuthenticator, AnonHyperAuthenticatorAssignEndPoint, Bridge, HyperClient,
2078        HyperConnectionDetails, HyperConnectionErr, HyperGate, HyperGateSelector, HyperGreeter,
2079        Hyperlane, HyperRouter, Hyperway, HyperwayEndpoint, HyperwayEndpointFactory,
2080        HyperwayInterchange, HyperwayStub, InterchangeGate, LocalHyperwayGateJumper,
2081        LocalHyperwayGateUnlocker, MountInterchangeGate, TokenAuthenticatorWithRemoteWhitelist,
2082    };
2083
2084    lazy_static! {
2085        pub static ref LESS: Point = Point::from_str("space:users:less").expect("point");
2086        pub static ref FAE: Point = Point::from_str("space:users:fae").expect("point");
2087    }
2088
2089    #[derive(Clone)]
2090    pub struct SingleInterchangePlatform {
2091        pub interchange: Arc<HyperwayInterchange>,
2092        pub gate: Arc<HyperGateSelector>,
2093    }
2094
2095    impl SingleInterchangePlatform {
2096        pub async fn new() -> Self {
2097            let root_logger = RootLogger::default();
2098            let logger = root_logger.point(Point::from_str("point").unwrap());
2099            let interchange = Arc::new(HyperwayInterchange::new(
2100                logger.push_point("interchange").unwrap(),
2101            ));
2102
2103            interchange
2104                .add(Hyperway::new(
2105                    LESS.clone().to_surface(),
2106                    LESS.to_agent(),
2107                    Default::default(),
2108                ))
2109                .await;
2110            interchange
2111                .add(Hyperway::new(
2112                    FAE.clone().to_surface(),
2113                    FAE.to_agent(),
2114                    Default::default(),
2115                ))
2116                .await;
2117            let auth = AnonHyperAuthenticator::new();
2118            let gate = Arc::new(MountInterchangeGate::new(
2119                auth,
2120                TestGreeter::new(),
2121                interchange.clone(),
2122                logger.push_point("gate").unwrap(),
2123            ));
2124            let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> =
2125                Arc::new(DashMap::new());
2126            gates.insert(InterchangeKind::Singleton, gate);
2127            let gate = Arc::new(HyperGateSelector::new(gates));
2128
2129            Self { interchange, gate }
2130        }
2131
2132        pub fn knock(&self, surface: Surface) -> Knock {
2133            Knock::new(InterchangeKind::Singleton, surface, Substance::Empty)
2134        }
2135
2136        pub fn local_hyperway_endpoint_factory(
2137            &self,
2138            port: Surface,
2139        ) -> Box<dyn HyperwayEndpointFactory> {
2140            Box::new(LocalHyperwayGateUnlocker::new(port, self.gate.clone()))
2141        }
2142    }
2143    pub struct LargeFrameTest {
2144        fae_factory: Box<dyn HyperwayEndpointFactory>,
2145        less_factory: Box<dyn HyperwayEndpointFactory>,
2146    }
2147
2148    impl LargeFrameTest {
2149        pub fn new(
2150            fae_factory: Box<dyn HyperwayEndpointFactory>,
2151            less_factory: Box<dyn HyperwayEndpointFactory>,
2152        ) -> Self {
2153            Self {
2154                fae_factory,
2155                less_factory,
2156            }
2157        }
2158
2159        pub async fn go(self) -> Result<(), SpaceErr> {
2160            let less_exchanger = Exchanger::new(
2161                LESS.push("exchanger").unwrap().to_surface(),
2162                Timeouts::default(),
2163                PointLogger::default(),
2164            );
2165            let fae_exchanger = Exchanger::new(
2166                FAE.push("exchanger").unwrap().to_surface(),
2167                Timeouts::default(),
2168                PointLogger::default(),
2169            );
2170
2171            let root_logger = RootLogger::default();
2172            let logger = root_logger.point(Point::from_str("less-client").unwrap());
2173            let less_client = HyperClient::new_with_exchanger(
2174                self.less_factory,
2175                Some(less_exchanger.clone()),
2176                logger,
2177            )
2178            .unwrap();
2179            let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2180            let fae_client = HyperClient::new_with_exchanger(
2181                self.fae_factory,
2182                Some(fae_exchanger.clone()),
2183                logger,
2184            )
2185            .unwrap();
2186
2187            let mut less_rx = less_client.rx();
2188            let mut fae_rx = fae_client.rx();
2189
2190            let less_router = less_client.router();
2191            let less_transmitter =
2192                ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2193
2194            let fae_router = fae_client.router();
2195            let fae_transmitter =
2196                ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2197
2198            {
2199                let fae = FAE.clone();
2200                tokio::spawn(async move {
2201                    fae_client.wait_for_greet().await.unwrap();
2202                    let wave = fae_rx.recv().await.unwrap();
2203                    let mut reflected = ReflectedProto::new();
2204                    reflected.kind(ReflectedKind::Pong);
2205                    reflected.status(200u16);
2206                    reflected.to(wave.from().clone());
2207                    reflected.from(fae.to_surface());
2208                    reflected.intended(wave.to());
2209                    reflected.reflection_of(wave.id());
2210                    let wave = reflected.build().unwrap();
2211                    let wave = wave.to_ultra();
2212                    fae_transmitter.route(wave).await;
2213                });
2214            }
2215
2216            let (rtn, mut rtn_rx) = oneshot::channel();
2217            tokio::spawn(async move {
2218                less_client.wait_for_greet().await.unwrap();
2219                let mut hello = DirectedProto::ping();
2220                hello.to(FAE.clone().to_surface());
2221                hello.from(LESS.clone().to_surface());
2222                hello.method(ExtMethod::new("Hello").unwrap());
2223                let size = 3_000_000usize;
2224                let mut body = Vec::with_capacity(size);
2225                for _ in 0..size {
2226                    body.push(0u8);
2227                }
2228                hello.body(Substance::Bin(Arc::new(body)));
2229                let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2230                rtn.send(pong.core.status.as_u16() == 200u16);
2231            });
2232
2233            let result = tokio::time::timeout(Duration::from_secs(30), rtn_rx)
2234                .await
2235                .unwrap()
2236                .unwrap();
2237            assert!(result);
2238            Ok(())
2239        }
2240    }
2241
2242    pub struct WaveTest {
2243        fae_factory: Box<dyn HyperwayEndpointFactory>,
2244        less_factory: Box<dyn HyperwayEndpointFactory>,
2245    }
2246
2247    impl WaveTest {
2248        pub fn new(
2249            fae_factory: Box<dyn HyperwayEndpointFactory>,
2250            less_factory: Box<dyn HyperwayEndpointFactory>,
2251        ) -> Self {
2252            Self {
2253                fae_factory,
2254                less_factory,
2255            }
2256        }
2257
2258        pub async fn go(self) -> Result<(), SpaceErr> {
2259            let less_exchanger = Exchanger::new(
2260                LESS.push("exchanger").unwrap().to_surface(),
2261                Timeouts::default(),
2262                PointLogger::default(),
2263            );
2264            let fae_exchanger = Exchanger::new(
2265                FAE.push("exchanger").unwrap().to_surface(),
2266                Timeouts::default(),
2267                PointLogger::default(),
2268            );
2269
2270            let root_logger = RootLogger::default();
2271            let logger = root_logger.point(Point::from_str("less-client").unwrap());
2272            let less_client = HyperClient::new_with_exchanger(
2273                self.less_factory,
2274                Some(less_exchanger.clone()),
2275                logger,
2276            )
2277            .unwrap();
2278            let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2279            let fae_client = HyperClient::new_with_exchanger(
2280                self.fae_factory,
2281                Some(fae_exchanger.clone()),
2282                logger,
2283            )
2284            .unwrap();
2285
2286            let mut less_rx = less_client.rx();
2287            let mut fae_rx = fae_client.rx();
2288
2289            let less_router = less_client.router();
2290            let less_transmitter =
2291                ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2292
2293            let fae_router = fae_client.router();
2294            let fae_transmitter =
2295                ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2296
2297            {
2298                let fae = FAE.clone();
2299                tokio::spawn(async move {
2300                    fae_client.wait_for_greet().await.unwrap();
2301                    let wave = fae_rx.recv().await.unwrap();
2302                    let mut reflected = ReflectedProto::new();
2303                    reflected.kind(ReflectedKind::Pong);
2304                    reflected.status(200u16);
2305                    reflected.to(wave.from().clone());
2306                    reflected.from(fae.to_surface());
2307                    reflected.intended(wave.to());
2308                    reflected.reflection_of(wave.id());
2309                    let wave = reflected.build().unwrap();
2310                    let wave = wave.to_ultra();
2311                    fae_transmitter.route(wave).await;
2312                });
2313            }
2314
2315            let (rtn, mut rtn_rx) = oneshot::channel();
2316            tokio::spawn(async move {
2317                less_client.wait_for_greet().await.unwrap();
2318                let mut hello = DirectedProto::ping();
2319                hello.to(FAE.clone().to_surface());
2320                hello.from(LESS.clone().to_surface());
2321                hello.method(ExtMethod::new("Hello").unwrap());
2322                hello.body(Substance::Empty);
2323                let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2324                rtn.send(pong.core.status.as_u16() == 200u16);
2325            });
2326
2327            let result = tokio::time::timeout(Duration::from_secs(30), rtn_rx)
2328                .await
2329                .unwrap()
2330                .unwrap();
2331            assert!(result);
2332            Ok(())
2333        }
2334    }
2335
2336    #[derive(Clone)]
2337    pub struct TestGreeter;
2338
2339    impl TestGreeter {
2340        pub fn new() -> Self {
2341            Self
2342        }
2343    }
2344
2345    #[async_trait]
2346    impl HyperGreeter for TestGreeter {
2347        async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr> {
2348            Ok(Greet {
2349                surface: stub.remote.clone(),
2350                agent: stub.agent.clone(),
2351                hop: Point::remote_endpoint()
2352                    .to_surface()
2353                    .with_layer(Layer::Core),
2354                transport: stub.remote.clone(),
2355            })
2356        }
2357    }
2358}
2359
2360#[cfg(test)]
2361pub mod test {
2362    use std::collections::{HashMap, HashSet};
2363    use std::str::FromStr;
2364    use std::sync::Arc;
2365    use std::time::Duration;
2366
2367    use dashmap::DashMap;
2368    use lazy_static::lazy_static;
2369    use tokio::sync::{broadcast, mpsc, oneshot};
2370
2371    use cosmic_space::command::direct::create::PointFactoryU64;
2372    use cosmic_space::err::SpaceErr;
2373    use cosmic_space::hyper::{Greet, InterchangeKind, Knock};
2374    use cosmic_space::loc::{Layer, Surface, ToPoint, ToSurface};
2375    use cosmic_space::log::RootLogger;
2376    use cosmic_space::point::Point;
2377    use cosmic_space::settings::Timeouts;
2378    use cosmic_space::substance::{Substance, Token};
2379    use cosmic_space::wave::core::cmd::CmdMethod;
2380    use cosmic_space::wave::core::ext::ExtMethod;
2381    use cosmic_space::wave::core::{Method, ReflectedCore};
2382    use cosmic_space::wave::exchange::asynch::{
2383        Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
2384    };
2385    use cosmic_space::wave::exchange::SetStrategy;
2386    use cosmic_space::wave::{
2387        Agent, DirectedKind, DirectedProto, HyperWave, Pong, ReflectedKind, ReflectedProto,
2388        ReflectedWave, UltraWave, Wave,
2389    };
2390
2391    use crate::test_util::{FAE, LESS, SingleInterchangePlatform, TestGreeter, WaveTest};
2392    use crate::{
2393        AnonHyperAuthenticator, AnonHyperAuthenticatorAssignEndPoint, Bridge, HyperClient,
2394        HyperConnectionDetails, HyperConnectionErr, HyperGate, HyperGateSelector, HyperGreeter,
2395        Hyperlane, HyperRouter, Hyperway, HyperwayEndpoint, HyperwayEndpointFactory,
2396        HyperwayInterchange, HyperwayStub, InterchangeGate, LocalHyperwayGateJumper,
2397        LocalHyperwayGateUnlocker, MountInterchangeGate, TokenAuthenticatorWithRemoteWhitelist,
2398    };
2399
2400    pub struct TestRouter {}
2401
2402    #[async_trait]
2403    impl HyperRouter for TestRouter {
2404        async fn route(&self, wave: HyperWave) {
2405            println!("Test Router routing!");
2406            //    todo!()
2407        }
2408    }
2409
2410    fn hello_wave() -> UltraWave {
2411        let mut hello = DirectedProto::ping();
2412        hello.to(FAE.clone().to_surface());
2413        hello.from(LESS.clone().to_surface());
2414        hello.method(ExtMethod::new("Hello").unwrap());
2415        hello.body(Substance::Empty);
2416        let directed = hello.build().unwrap();
2417        let wave = directed.to_ultra();
2418        wave
2419    }
2420
2421    #[tokio::test]
2422    pub async fn test_hyperlane() {
2423        let hyperlane = Hyperlane::new("mem");
2424        let mut rx = hyperlane.rx(None).await;
2425        let wave = hello_wave();
2426        let wave_id = wave.id().clone();
2427        hyperlane.send(wave).await.unwrap();
2428        let wave = tokio::time::timeout(Duration::from_secs(5u64), rx.recv())
2429            .await
2430            .unwrap()
2431            .unwrap();
2432        assert_eq!(wave.id(), wave_id);
2433    }
2434
2435    #[tokio::test]
2436    pub async fn test_hyperway() {
2437        let hyperway = Hyperway::new(
2438            LESS.clone().to_surface(),
2439            LESS.to_agent(),
2440            Default::default(),
2441        );
2442        let wave = hello_wave();
2443        let wave_id = wave.id().clone();
2444        hyperway.outbound.send(wave).await;
2445        let wave = tokio::time::timeout(
2446            Duration::from_secs(5u64),
2447            hyperway.outbound.rx(None).await.recv(),
2448        )
2449        .await
2450        .unwrap()
2451        .unwrap();
2452
2453        let wave = hello_wave();
2454        let wave_id = wave.id().clone();
2455        hyperway.inbound.send(wave).await;
2456        let wave = tokio::time::timeout(
2457            Duration::from_secs(5u64),
2458            hyperway.inbound.rx(None).await.recv(),
2459        )
2460        .await
2461        .unwrap()
2462        .unwrap();
2463        assert_eq!(wave.id(), wave_id);
2464    }
2465
2466    /*
2467    #[tokio::mem]
2468    pub async fn test_hyperway_ext() {
2469        let hyperway = Hyperway::new(LESS.clone().to_port(), LESS.to_agent());
2470
2471        let mut ext = hyperway.mount().await;
2472        let wave = hello_wave();
2473        let wave_id = wave.id().clone();
2474        ext.tx.send(wave).await;
2475        let wave = tokio::time::timeout(
2476            Duration::from_secs(5u64),
2477            hyperway.inbound.rx().await.recv(),
2478        )
2479        .await
2480        .unwrap()
2481        .unwrap();
2482        assert_eq!(wave.id(), wave_id);
2483
2484        let wave = hello_wave();
2485        let wave_id = wave.id().clone();
2486        hyperway.outbound.send(wave).await;
2487        let wave = tokio::time::timeout(Duration::from_secs(5u64), ext.rx.recv())
2488            .await
2489            .unwrap()
2490            .unwrap();
2491        assert_eq!(wave.id(), wave_id);
2492    }
2493
2494     */
2495
2496    // disabled for now
2497    //#[tokio::test]
2498    pub async fn test_hyperclient() {
2499        pub struct TestFactory {
2500            pub hyperway: Hyperway,
2501        }
2502
2503        impl TestFactory {
2504            pub fn new() -> Self {
2505                let hyperway = Hyperway::new(
2506                    LESS.clone().to_surface(),
2507                    LESS.to_agent(),
2508                    Default::default(),
2509                );
2510                Self { hyperway }
2511            }
2512
2513            pub fn inbound_tx(&self) -> mpsc::Sender<UltraWave> {
2514                self.hyperway.inbound.tx()
2515            }
2516
2517            pub async fn inbound_rx(&self) -> mpsc::Receiver<UltraWave> {
2518                self.hyperway.inbound.rx(None).await
2519            }
2520
2521            pub async fn outbound_rx(&self) -> broadcast::Receiver<UltraWave> {
2522                self.hyperway.outbound.eavesdrop()
2523            }
2524
2525            pub fn outbound_tx(&self) -> mpsc::Sender<UltraWave> {
2526                self.hyperway.outbound.tx()
2527            }
2528        }
2529
2530        #[async_trait]
2531        impl HyperwayEndpointFactory for TestFactory {
2532            async fn create(
2533                &self,
2534                status_tx: mpsc::Sender<HyperConnectionDetails>,
2535            ) -> Result<HyperwayEndpoint, SpaceErr> {
2536                Ok(self.hyperway.hyperway_endpoint_far(None).await)
2537            }
2538        }
2539
2540        {
2541            let factory = Box::new(TestFactory::new());
2542            let mut inbound_rx = factory.inbound_rx().await;
2543            let root_logger = RootLogger::default();
2544            let logger = root_logger.point(Point::from_str("client").unwrap());
2545            let client = HyperClient::new(factory, logger).unwrap();
2546
2547            let client_listener_rx = client.rx();
2548
2549            client.reset();
2550
2551            let router = client.router();
2552            let wave = hello_wave();
2553            let wave_id = wave.id().clone();
2554            router.route(wave).await;
2555            let wave = tokio::time::timeout(Duration::from_secs(5u64), inbound_rx.recv())
2556                .await
2557                .unwrap()
2558                .unwrap();
2559            assert_eq!(wave.id(), wave_id);
2560        }
2561
2562        {
2563            let factory = Box::new(TestFactory::new());
2564            let outbound_tx = factory.outbound_tx();
2565            let root_logger = RootLogger::default();
2566            let logger = root_logger.point(Point::from_str("client").unwrap());
2567            let client = HyperClient::new(factory, logger).unwrap();
2568
2569            let mut client_listener_rx = client.rx();
2570
2571            let wave = hello_wave();
2572            let wave_id = wave.id().clone();
2573            outbound_tx.send(wave).await.unwrap();
2574            let wave = tokio::time::timeout(Duration::from_secs(5u64), client_listener_rx.recv())
2575                .await
2576                .unwrap()
2577                .unwrap();
2578            assert_eq!(wave.id(), wave_id);
2579        }
2580    }
2581
2582    #[tokio::test]
2583    pub async fn test_single_interchange() {
2584        let test = SingleInterchangePlatform::new().await;
2585        let less_factory = test.local_hyperway_endpoint_factory(LESS.to_surface());
2586        let fae_factory = test.local_hyperway_endpoint_factory(FAE.to_surface());
2587        let test = WaveTest::new(fae_factory, less_factory);
2588        test.go().await.unwrap();
2589    }
2590
2591    #[tokio::test]
2592    pub async fn test_dual_interchange() {
2593        let root_logger = RootLogger::default();
2594        let logger = root_logger.point(Point::from_str("point").unwrap());
2595        let interchange = Arc::new(HyperwayInterchange::new(
2596            logger.push_point("interchange").unwrap(),
2597        ));
2598
2599        interchange
2600            .add(Hyperway::new(
2601                LESS.clone().to_surface(),
2602                LESS.to_agent(),
2603                Default::default(),
2604            ))
2605            .await;
2606        interchange
2607            .add(Hyperway::new(
2608                FAE.clone().to_surface(),
2609                FAE.to_agent(),
2610                Default::default(),
2611            ))
2612            .await;
2613
2614        let auth = AnonHyperAuthenticator::new();
2615        let gate = Arc::new(MountInterchangeGate::new(
2616            auth,
2617            TestGreeter::new(),
2618            interchange.clone(),
2619            logger.push_point("gate").unwrap(),
2620        ));
2621        let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> = Arc::new(DashMap::new());
2622        gates.insert(InterchangeKind::Singleton, gate);
2623        let gate = Arc::new(HyperGateSelector::new(gates));
2624
2625        let less_factory = Box::new(LocalHyperwayGateUnlocker::new(
2626            LESS.clone().to_surface(),
2627            gate.clone(),
2628        ));
2629
2630        let fae_factory = Box::new(LocalHyperwayGateUnlocker::new(
2631            FAE.clone().to_surface(),
2632            gate.clone(),
2633        ));
2634
2635        let less_exchanger = Exchanger::new(
2636            LESS.push("exchanger").unwrap().to_surface(),
2637            Timeouts::default(),
2638            Default::default(),
2639        );
2640        let fae_exchanger = Exchanger::new(
2641            FAE.push("exchanger").unwrap().to_surface(),
2642            Timeouts::default(),
2643            Default::default(),
2644        );
2645
2646        let root_logger = RootLogger::default();
2647        let logger = root_logger.point(Point::from_str("less-client").unwrap());
2648        let less_client =
2649            HyperClient::new_with_exchanger(less_factory, Some(less_exchanger.clone()), logger)
2650                .unwrap();
2651        let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2652        let fae_client =
2653            HyperClient::new_with_exchanger(fae_factory, Some(fae_exchanger.clone()), logger)
2654                .unwrap();
2655
2656        let mut less_rx = less_client.rx();
2657        let mut fae_rx = fae_client.rx();
2658
2659        let less_router = less_client.router();
2660        let less_transmitter = ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2661
2662        let fae_router = fae_client.router();
2663        let fae_transmitter = ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2664
2665        {
2666            let fae = FAE.clone();
2667            tokio::spawn(async move {
2668                let wave = fae_rx.recv().await.unwrap();
2669                let mut reflected = ReflectedProto::new();
2670                reflected.kind(ReflectedKind::Pong);
2671                reflected.status(200u16);
2672                reflected.to(wave.from().clone());
2673                reflected.from(fae.to_surface());
2674                reflected.intended(wave.to());
2675                reflected.reflection_of(wave.id());
2676                let wave = reflected.build().unwrap();
2677                let wave = wave.to_ultra();
2678                fae_transmitter.route(wave).await;
2679            });
2680        }
2681
2682        let (rtn, mut rtn_rx) = oneshot::channel();
2683        tokio::spawn(async move {
2684            let mut hello = DirectedProto::ping();
2685            hello.to(FAE.clone().to_surface());
2686            hello.from(LESS.clone().to_surface());
2687            hello.method(ExtMethod::new("Hello").unwrap());
2688            hello.body(Substance::Empty);
2689            let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2690            rtn.send(pong.core.status.as_u16() == 200u16);
2691        });
2692
2693        let result = tokio::time::timeout(Duration::from_secs(5), rtn_rx)
2694            .await
2695            .unwrap()
2696            .unwrap();
2697        assert!(result);
2698    }
2699
2700    #[tokio::test]
2701    pub async fn test_bridge() {
2702        pub fn create(name: &str) -> (Arc<HyperwayInterchange>, Arc<dyn HyperGate>) {
2703            let root_logger = RootLogger::default();
2704            let logger = root_logger.point(Point::from_str(name).unwrap());
2705            let interchange = Arc::new(HyperwayInterchange::new(
2706                logger.push_point("interchange").unwrap(),
2707            ));
2708
2709            let auth = AnonHyperAuthenticator::new();
2710            let gate = Arc::new(MountInterchangeGate::new(
2711                auth,
2712                TestGreeter::new(),
2713                interchange.clone(),
2714                logger.push_point("gate").unwrap(),
2715            ));
2716            let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> =
2717                Arc::new(DashMap::new());
2718            gates.insert(InterchangeKind::Singleton, gate);
2719            (interchange, Arc::new(HyperGateSelector::new(gates)))
2720        }
2721
2722        let (less_interchange, less_gate) = create("less");
2723        let (fae_interchange, fae_gate) = create("fae");
2724
2725        {
2726            let hyperway = Hyperway::new(
2727                FAE.to_surface().with_layer(Layer::Core),
2728                Agent::HyperUser,
2729                Default::default(),
2730            );
2731            less_interchange.add(hyperway).await;
2732            let access = Hyperway::new(
2733                LESS.to_surface().with_layer(Layer::Core),
2734                Agent::HyperUser,
2735                Default::default(),
2736            );
2737            less_interchange.add(access).await;
2738        }
2739        {
2740            let hyperway = Hyperway::new(
2741                LESS.to_surface().with_layer(Layer::Core),
2742                Agent::HyperUser,
2743                Default::default(),
2744            );
2745            fae_interchange.add(hyperway).await;
2746            let access = Hyperway::new(
2747                FAE.to_surface().with_layer(Layer::Core),
2748                Agent::HyperUser,
2749                Default::default(),
2750            );
2751            fae_interchange.add(access).await;
2752        }
2753
2754        let fae_endpoint_from_less = less_interchange
2755            .mount(
2756                HyperwayStub {
2757                    remote: FAE.to_surface().with_layer(Layer::Core),
2758                    agent: Agent::HyperUser,
2759                },
2760                None,
2761            )
2762            .await
2763            .unwrap();
2764        let fae_factory = Box::new(LocalHyperwayGateUnlocker::new(
2765            LESS.clone().to_surface(),
2766            fae_gate.clone(),
2767        ));
2768        let logger = RootLogger::default().point(Point::from_str("bridge").unwrap());
2769        let bridge = Bridge::new(fae_endpoint_from_less, fae_factory, logger);
2770
2771        let mut less_access = less_interchange
2772            .mount(
2773                HyperwayStub {
2774                    remote: LESS.to_surface().with_layer(Layer::Core),
2775                    agent: Agent::HyperUser,
2776                },
2777                None,
2778            )
2779            .await
2780            .unwrap();
2781        let mut fae_access = fae_interchange
2782            .mount(
2783                HyperwayStub {
2784                    remote: FAE.to_surface().with_layer(Layer::Core),
2785                    agent: Agent::HyperUser,
2786                },
2787                None,
2788            )
2789            .await
2790            .unwrap();
2791
2792        tokio::spawn(async move {
2793            while let Some(wave) = fae_access.rx.recv().await {
2794                if wave.is_directed() {
2795                    let directed = wave.to_directed().unwrap();
2796                    let reflection = directed.reflection().unwrap();
2797                    let reflection = reflection.make(
2798                        ReflectedCore::ok(),
2799                        FAE.to_surface().with_layer(Layer::Core),
2800                    );
2801                    fae_access.tx.send(reflection.to_ultra()).await.unwrap();
2802                }
2803            }
2804        });
2805
2806        let exchanger = Exchanger::new(LESS.to_surface(), Timeouts::default(), Default::default());
2807        let less_tx = less_access.tx.clone();
2808
2809        {
2810            let exchanger = exchanger.clone();
2811            tokio::spawn(async move {
2812                while let Some(wave) = less_access.rx.recv().await {
2813                    if wave.is_reflected() {
2814                        exchanger
2815                            .reflected(wave.to_reflected().unwrap())
2816                            .await
2817                            .unwrap();
2818                    }
2819                }
2820            });
2821        }
2822        let mut transmitter =
2823            ProtoTransmitterBuilder::new(Arc::new(TxRouter::new(less_tx.clone())), exchanger);
2824        transmitter.from = SetStrategy::Override(LESS.to_surface());
2825        transmitter.agent = SetStrategy::Override(Agent::HyperUser);
2826        let transmitter = transmitter.build();
2827        let mut wave = DirectedProto::ping();
2828        wave.method(Method::Cmd(CmdMethod::Bounce));
2829        wave.to(FAE.to_surface().with_layer(Layer::Core));
2830        let reply: Wave<Pong> =
2831            tokio::time::timeout(Duration::from_secs(5), transmitter.direct(wave))
2832                .await
2833                .unwrap()
2834                .unwrap();
2835        assert!(reply.core.status.is_success());
2836        assert_eq!(reply.core.body, Substance::Empty);
2837        assert_eq!(reply.to, LESS.to_surface());
2838        assert_eq!(reply.from, FAE.to_surface());
2839        println!("Ok");
2840    }
2841}