1#![allow(warnings)]
2
3#[macro_use]
4extern crate async_trait;
5#[macro_use]
6extern crate lazy_static;
7
8use std::cell::{Cell, RefCell};
9use std::collections::{HashMap, HashSet};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::ops::{Deref, DerefMut};
13use std::str::FromStr;
14use std::sync::atomic::{AtomicU16, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17
18use dashmap::DashMap;
19use futures::future::select_all;
20use futures::FutureExt;
21use tokio::io::AsyncWriteExt;
22use tokio::select;
23use tokio::sync::mpsc::error::{SendError, SendTimeoutError, TrySendError};
24use tokio::sync::mpsc::Receiver;
25use tokio::sync::oneshot::Sender;
26use tokio::sync::{broadcast, mpsc, Mutex, oneshot, RwLock, watch};
27
28use cosmic_space::command::direct::create::{PointFactoryU64, PointSegTemplate};
29use cosmic_space::err::SpaceErr;
30use cosmic_space::frame::PrimitiveFrame;
31use cosmic_space::hyper::{Greet, HyperSubstance, InterchangeKind, Knock};
32use cosmic_space::loc::{Layer, PointFactory, Surface, ToPoint, ToSurface, Version};
33use cosmic_space::log::{PointLogger, RootLogger, Tracker};
34use cosmic_space::particle::Status;
35use cosmic_space::point::Point;
36use cosmic_space::settings::Timeouts;
37use cosmic_space::substance::{FormErrs, Substance, SubstanceKind, Token};
38use cosmic_space::util::uuid;
39use cosmic_space::wave::core::ext::ExtMethod;
40use cosmic_space::wave::core::hyp::HypMethod;
41use cosmic_space::wave::core::Method;
42use cosmic_space::wave::exchange::asynch::{
43 Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
44};
45use cosmic_space::wave::exchange::SetStrategy;
46use cosmic_space::wave::{
47 Agent, DirectedKind, DirectedProto, Handling, HyperWave, Ping, Pong, Reflectable,
48 ReflectedKind, ReflectedProto, ReflectedWave, UltraWave, Wave, WaveId, WaveKind,
49};
50use cosmic_space::VERSION;
51
52lazy_static! {
53 pub static ref LOCAL_CLIENT: Point = Point::from_str("LOCAL::client").expect("point");
54 pub static ref LOCAL_CLIENT_RUNNER: Point =
55 Point::from_str("LOCAL::client:runner").expect("point");
56 pub static ref HYPERLANE_INDEX: AtomicU16 = AtomicU16::new(0);
57}
58
59pub enum HyperwayKind {
60 Mount,
61 Ephemeral,
62}
63
64pub struct Hyperway {
65 pub remote: Surface,
66 outbound: Hyperlane,
67 inbound: Hyperlane,
68 logger: PointLogger,
69
70 #[cfg(test)]
71 pub diagnostic: HyperwayDiagnostic,
72}
73
74impl Hyperway {
75 pub fn new(remote: Surface, agent: Agent, logger: PointLogger) -> Self {
76 let logger = logger.point(remote.point.clone());
77 let mut inbound = Hyperlane::new(format!("{}<Inbound>", remote.to_string()));
78 inbound
79 .tx
80 .try_send(HyperlaneCall::Transform(Box::new(FromTransform::new(
81 remote.clone(),
82 ))));
83 inbound
84 .tx
85 .try_send(HyperlaneCall::Transform(Box::new(AgentTransform::new(
86 agent,
87 ))));
88 Self {
89 outbound: Hyperlane::new(format!("{}<Outbound>", remote.to_string())),
90 remote,
91 inbound,
92 logger,
93 #[cfg(test)]
94 diagnostic: HyperwayDiagnostic::new(),
95 }
96 }
97
98 pub fn transform_inbound(&self, transform: Box<dyn HyperTransform>) {
99 self.inbound
100 .tx
101 .try_send(HyperlaneCall::Transform(transform));
102 }
103
104 pub fn transform_to(&self, to: Surface) {
105 self.inbound
106 .tx
107 .try_send(HyperlaneCall::Transform(Box::new(ToTransform::new(to))));
108 }
109
110 pub async fn hyperway_endpoint_near(&self, init_wave: Option<UltraWave>) -> HyperwayEndpoint {
111 let drop_tx = None;
112
113 HyperwayEndpoint {
114 tx: self.outbound.tx(),
115 rx: self.inbound.rx(init_wave).await,
116 drop_tx,
117 logger: self.logger.clone(),
118 }
119 }
120
121 pub async fn hyperway_endpoint_near_with_drop_event(
122 &self,
123 drop_tx: oneshot::Sender<()>,
124 init_wave: Option<UltraWave>,
125 ) -> HyperwayEndpoint {
126 let drop_tx = Some(drop_tx);
127
128 HyperwayEndpoint {
129 tx: self.outbound.tx(),
130 rx: self.inbound.rx(init_wave).await,
131 drop_tx,
132 logger: self.logger.clone(),
133 }
134 }
135
136 pub async fn hyperway_endpoint_far(&self, init_wave: Option<UltraWave>) -> HyperwayEndpoint {
137 HyperwayEndpoint {
138 tx: self.inbound.tx(),
139 rx: self.outbound.rx(init_wave).await,
140 drop_tx: None,
141 logger: self.logger.clone(),
142 }
143 }
144
145 pub async fn hyperway_endpoint_far_drop_event(
146 &self,
147 init_wave: Option<UltraWave>,
148 drop_tx: oneshot::Sender<()>,
149 ) -> HyperwayEndpoint {
150 HyperwayEndpoint {
151 tx: self.inbound.tx(),
152 rx: self.outbound.rx(init_wave).await,
153 drop_tx: Some(drop_tx),
154 logger: self.logger.clone(),
155 }
156 }
157}
158
159#[cfg(test)]
160pub struct HyperwayDiagnostic {
161 pub replaced_ext: broadcast::Sender<Result<(), SpaceErr>>,
162}
163
164#[cfg(test)]
165impl HyperwayDiagnostic {
166 pub fn new() -> Self {
167 let (replaced_ext, _) = broadcast::channel(128);
168 Self { replaced_ext }
169 }
170}
171
172pub struct HyperwayEndpoint {
173 drop_tx: Option<oneshot::Sender<()>>,
174 pub tx: mpsc::Sender<UltraWave>,
175 pub rx: mpsc::Receiver<UltraWave>,
176 pub logger: PointLogger,
177}
178
179impl HyperwayEndpoint {
180 pub fn new(
181 tx: mpsc::Sender<UltraWave>,
182 rx: mpsc::Receiver<UltraWave>,
183 logger: PointLogger,
184 ) -> Self {
185 let drop_tx = None;
186 Self {
187 tx,
188 rx,
189 drop_tx,
190 logger,
191 }
192 }
193
194 pub fn new_with_drop(
195 tx: mpsc::Sender<UltraWave>,
196 rx: mpsc::Receiver<UltraWave>,
197 drop_tx: oneshot::Sender<()>,
198 logger: PointLogger,
199 ) -> Self {
200 let drop_tx = Some(drop_tx);
201 Self {
202 tx,
203 rx,
204 drop_tx,
205 logger,
206 }
207 }
208
209 pub fn connect(mut self, mut endpoint: HyperwayEndpoint) {
210 tokio::spawn(async move {
211 let logger = endpoint.logger.clone();
212 let end_tx = endpoint.tx.clone();
213 {
214 let my_tx = self.tx.clone();
215 tokio::spawn(async move {
216 let logger = endpoint.logger.push_mark("mux:tx").unwrap();
217 while let Some(wave) = endpoint.rx.recv().await {
218 logger.track(&wave, || Tracker::new("hyperway-endpoint", "Rx"));
219 match logger.result(my_tx.send(wave).await) {
220 Ok(_) => {}
221 Err(_) => break,
222 }
223 }
224 });
225 }
226
227 let logger = logger.push_mark("mux:rx").unwrap();
228 while let Some(wave) = self.rx.recv().await {
229 logger.track(&wave, || Tracker::new("hyperway-endpoint", "Tx"));
230 match logger.result(end_tx.send(wave).await) {
231 Ok(_) => {}
232 Err(_) => break,
233 }
234 }
235 });
236 }
237
238 pub fn add_drop_tx(&mut self, drop_tx: oneshot::Sender<()>) {
239 self.drop_tx.replace(drop_tx);
240 }
241
242 pub fn router(&self) -> TxRouter {
243 TxRouter::new(self.tx.clone())
244 }
245}
246
247impl Drop for HyperwayEndpoint {
248 fn drop(&mut self) {
249 match self.drop_tx.take() {
250 None => {}
251 Some(drop_tx) => {
252 drop_tx.send(());
253 }
254 }
255 }
256}
257
258#[derive(Clone)]
259pub struct HyperwayStub {
260 pub agent: Agent,
261 pub remote: Surface,
262}
263
264impl From<Greet> for HyperwayStub {
265 fn from(greet: Greet) -> Self {
266 Self {
267 agent: greet.agent,
268 remote: greet.surface,
269 }
270 }
271}
272
273impl HyperwayStub {
274 pub fn from_port(remote: Surface) -> Self {
275 Self {
276 agent: remote.to_agent(),
277 remote,
278 }
279 }
280
281 pub fn new(remote: Surface, agent: Agent) -> Self {
282 Self { agent, remote }
283 }
284}
285
286pub enum HyperwayInterchangeCall {
287 Wave(UltraWave),
288 Internal(Hyperway),
289 Remove(Surface),
290 Mount {
291 stub: HyperwayStub,
292 init_wave: Option<UltraWave>,
293 rtn: oneshot::Sender<Result<HyperwayEndpoint, SpaceErr>>,
294 },
295}
296
297pub enum HyperlaneCall {
298 Drain,
299 Ext(mpsc::Sender<UltraWave>),
300 ResetExt,
301 Wave(UltraWave),
302 Transform(Box<dyn HyperTransform>),
303}
304
305pub trait HyperTransform: Send + Sync {
306 fn filter(&self, wave: UltraWave) -> UltraWave;
307}
308
309#[derive(Clone)]
310pub struct AgentTransform {
311 agent: Agent,
312}
313
314impl AgentTransform {
315 pub fn new(agent: Agent) -> Self {
316 Self { agent }
317 }
318}
319
320impl HyperTransform for AgentTransform {
321 fn filter(&self, mut wave: UltraWave) -> UltraWave {
322 wave.set_agent(self.agent.clone());
323 wave
324 }
325}
326
327#[derive(Clone)]
328pub struct LayerTransform {
329 layer: Layer,
330}
331
332impl LayerTransform {
333 pub fn new(layer: Layer) -> Self {
334 Self { layer }
335 }
336}
337
338impl HyperTransform for LayerTransform {
339 fn filter(&self, mut wave: UltraWave) -> UltraWave {
340 let to = wave
341 .to()
342 .clone()
343 .to_single()
344 .unwrap()
345 .with_layer(self.layer.clone());
346 wave.set_to(to);
347 wave
348 }
349}
350
351#[derive(Clone)]
352pub struct TransportTransform {
353 transport_to: Surface,
354}
355
356impl TransportTransform {
357 pub fn new(transport_to: Surface) -> Self {
358 Self { transport_to }
359 }
360}
361
362impl HyperTransform for TransportTransform {
363 fn filter(&self, wave: UltraWave) -> UltraWave {
364 let from = wave.from().clone();
365 let transport = wave.wrap_in_transport(from, self.transport_to.clone());
366 let wave = transport.build().unwrap();
367 wave.to_ultra()
368 }
369}
370
371#[derive(Clone)]
372pub struct HopTransform {
373 hop_to: Surface,
374}
375
376impl HopTransform {
377 pub fn new(hop_to: Surface) -> Self {
378 Self { hop_to }
379 }
380}
381
382impl HyperTransform for HopTransform {
383 fn filter(&self, wave: UltraWave) -> UltraWave {
384 let signal = wave.to_signal().unwrap();
385 let from = signal.from.clone();
386 let wave = signal.wrap_in_hop(from, self.hop_to.clone());
387 let wave = wave.build().unwrap();
388 wave.to_ultra()
389 }
390}
391
392pub struct ToTransform {
393 to: Surface,
394}
395
396impl ToTransform {
397 pub fn new(to: Surface) -> Self {
398 Self { to }
399 }
400}
401
402impl HyperTransform for ToTransform {
403 fn filter(&self, mut wave: UltraWave) -> UltraWave {
404 wave.set_to(self.to.clone());
405 wave
406 }
407}
408
409pub struct FromTransform {
410 from: Surface,
411}
412
413impl FromTransform {
414 pub fn new(from: Surface) -> Self {
415 Self { from }
416 }
417}
418
419impl HyperTransform for FromTransform {
420 fn filter(&self, mut wave: UltraWave) -> UltraWave {
421 wave.set_from(self.from.clone());
422 wave
423 }
424}
425#[derive(Clone)]
426pub struct Hyperlane {
427 tx: mpsc::Sender<HyperlaneCall>,
428 #[cfg(test)]
429 eavesdrop_tx: broadcast::Sender<UltraWave>,
430 label: String,
431}
432
433impl Hyperlane {
434 pub fn new<S: ToString>(label: S) -> Self {
435 #[cfg(test)]
436 let (eavesdrop_tx, _) = broadcast::channel(16);
437
438 let label = format!(
439 "{}::{}",
440 label.to_string(),
441 HYPERLANE_INDEX.fetch_add(1, Ordering::Relaxed)
442 );
443
444 let (tx, mut rx) = mpsc::channel(1024);
445 {
446 let label = label.clone();
447 let tx = tx.clone();
448 #[cfg(test)]
449 let eavesdrop_tx = eavesdrop_tx.clone();
450
451 tokio::spawn(async move {
452 let mut ext = None;
453 let mut queue = vec![];
454 let mut transforms = vec![];
455 while let Some(call) = rx.recv().await {
456 match call {
457 HyperlaneCall::Ext(ext_tx) => {
458 ext.replace(ext_tx);
459 }
460 HyperlaneCall::Transform(filter) => {
461 transforms.push(filter);
462 }
463 HyperlaneCall::Wave(mut wave) => {
464 while queue.len() > 1024 {
465 queue.remove(0);
467 }
468 for transform in transforms.iter() {
469 wave = transform.filter(wave);
470 }
471 queue.push(wave);
472 }
473 HyperlaneCall::Drain => {
474 }
476 HyperlaneCall::ResetExt => {
477 ext = None;
478 }
479 }
480 if !queue.is_empty() {
481 if let Some(ext_tx) = ext.as_mut() {
482 for wave in queue.drain(..) {
483 #[cfg(test)]
484 let wave_cp = wave.clone();
485
486 match ext_tx.send(wave).await {
487 Ok(_) => {
488 #[cfg(test)]
489 eavesdrop_tx.send(wave_cp);
490 }
491 Err(err) => {
492 tx.send(HyperlaneCall::ResetExt).await;
493 tx.try_send(HyperlaneCall::Wave(err.0));
494 }
495 }
496 }
497 } else {
498 }
499 }
500 }
501 });
502 }
503
504 Self {
505 tx,
506 label,
507 #[cfg(test)]
508 eavesdrop_tx,
509 }
510 }
511
512 #[cfg(test)]
513 pub fn eavesdrop(&self) -> broadcast::Receiver<UltraWave> {
514 self.eavesdrop_tx.subscribe()
515 }
516
517 pub async fn send(&self, wave: UltraWave) -> Result<(), SpaceErr> {
518 Ok(self
519 .tx
520 .send_timeout(HyperlaneCall::Wave(wave), Duration::from_secs(5))
521 .await?)
522 }
523
524 pub fn tx(&self) -> mpsc::Sender<UltraWave> {
525 let (tx, mut rx) = mpsc::channel(1024);
526 let call_tx = self.tx.clone();
527 tokio::spawn(async move {
528 while let Some(wave) = rx.recv().await {
529 call_tx.send(HyperlaneCall::Wave(wave)).await;
530 }
531 });
532 tx
533 }
534
535 pub async fn rx(&self, init_wave: Option<UltraWave>) -> mpsc::Receiver<UltraWave> {
536 let (tx, rx) = mpsc::channel(1024);
537 if let Some(init_wave) = init_wave {
538 tx.send(init_wave).await;
539 }
540 self.tx.send(HyperlaneCall::Ext(tx)).await;
541 rx
542 }
543}
544
545pub struct HyperwayInterchange {
546 call_tx: mpsc::Sender<HyperwayInterchangeCall>,
547 logger: PointLogger,
548 singular_to: Option<Surface>,
549}
550
551impl HyperwayInterchange {
552 pub fn new(logger: PointLogger) -> Self {
553 let (call_tx, mut call_rx) = mpsc::channel(1024);
554
555 {
556 let call_tx = call_tx.clone();
557 let logger = logger.clone();
558 tokio::spawn(async move {
559 let mut hyperways = HashMap::new();
560 while let Some(call) = call_rx.recv().await {
561 match call {
562 HyperwayInterchangeCall::Internal(hyperway) => {
563 let mut rx = hyperway.inbound.rx(None).await;
564 hyperways.insert(hyperway.remote.clone(), hyperway);
565 let call_tx = call_tx.clone();
566 let logger = logger.clone();
567 tokio::spawn(async move {
568 while let Some(wave) = rx.recv().await {
569 call_tx
570 .send_timeout(
571 HyperwayInterchangeCall::Wave(wave),
572 Duration::from_secs(60u64),
573 )
574 .await;
575 }
576 });
577 }
578 HyperwayInterchangeCall::Remove(point) => {
579 hyperways.remove(&point);
580 }
581 HyperwayInterchangeCall::Wave(wave) => match wave.to().single_or() {
582 Ok(to) => match hyperways.get(&to) {
583 None => {
584 logger.warn(
585 format!("wave is addressed to hyperway that this interchagne does not have from: {} to: {} ",
586 wave.from().to_string(),
587 wave.to().to_string()
588 )
589 );
590 }
591 Some(hyperway) => {
592 hyperway.outbound.send(wave).await;
593 }
594 },
595 Err(_) => {
596 logger.warn("Hyperway Interchange cannot route Ripples, instead wrap in a Hop or Transport");
597 }
598 },
599 HyperwayInterchangeCall::Mount {
600 stub,
601 init_wave,
602 rtn,
603 } => match hyperways.get(&stub.remote) {
604 None => {
605 logger.error(format!(
606 "mount hyperway {} not found",
607 stub.remote.to_string()
608 ));
609 rtn.send(Err(format!(
610 "hyperway {} not found",
611 stub.remote.to_string()
612 )
613 .into()));
614 }
615 Some(hyperway) => {
616 let endpoint = hyperway.hyperway_endpoint_far(init_wave).await;
617 rtn.send(Ok(endpoint));
618 }
619 },
620 }
621 }
622 });
623 }
624
625 Self {
626 call_tx,
627 logger,
628 singular_to: None,
629 }
630 }
631
632 pub fn router(&self) -> Box<dyn Router> {
633 Box::new(OutboundRouter::new(
634 self.call_tx.clone(),
635 self.logger.clone(),
636 ))
637 }
638
639 pub fn point(&self) -> &Point {
640 &self.logger.point
641 }
642
643 pub async fn mount(
644 &self,
645 stub: HyperwayStub,
646 init_wave: Option<UltraWave>,
647 ) -> Result<HyperwayEndpoint, SpaceErr> {
648 let call_tx = self.call_tx.clone();
649 let (tx, rx) = oneshot::channel();
650 call_tx
651 .send(HyperwayInterchangeCall::Mount {
652 stub: stub.clone(),
653 init_wave,
654 rtn: tx,
655 })
656 .await;
657 rx.await?
658 }
659
660 pub fn singular_to(&mut self, to: Surface) {
661 self.singular_to.replace(to);
662 }
663
664 pub async fn add(&self, mut hyperway: Hyperway) {
665 if let Some(to) = self.singular_to.as_ref() {
666 hyperway.transform_to(to.clone());
667 }
668
669 self.call_tx
670 .send(HyperwayInterchangeCall::Internal(hyperway))
671 .await;
672 }
673
674 pub fn remove(&self, hyperway: Surface) {
675 let call_tx = self.call_tx.clone();
676 tokio::spawn(async move {
677 call_tx
678 .send(HyperwayInterchangeCall::Remove(hyperway))
679 .await;
680 });
681 }
682
683 pub async fn route(&self, wave: UltraWave) {
684 self.call_tx.send(HyperwayInterchangeCall::Wave(wave)).await;
685 }
686}
687
688#[async_trait]
689pub trait HyperRouter: Send + Sync {
690 async fn route(&self, wave: HyperWave);
691}
692
693pub struct OutboundRouter {
694 pub logger: PointLogger,
695 pub call_tx: mpsc::Sender<HyperwayInterchangeCall>,
696}
697
698impl OutboundRouter {
699 pub fn new(call_tx: mpsc::Sender<HyperwayInterchangeCall>, logger: PointLogger) -> Self {
700 Self { call_tx, logger }
701 }
702}
703
704#[async_trait]
705impl Router for OutboundRouter {
706 async fn route(&self, wave: UltraWave) {
707 self.logger
708 .track(&wave, || Tracker::new(format!("outbound:router"), "Route"));
709
710 self.call_tx.send(HyperwayInterchangeCall::Wave(wave)).await;
711 }
712}
713
714#[async_trait]
715pub trait HyperGreeter: Send + Sync + Clone + Sized {
716 async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr>;
717}
718
719#[derive(Clone)]
720pub struct SimpleGreeter {
721 hop: Surface,
722 transport: Surface,
723}
724
725impl SimpleGreeter {
726 pub fn new(hop: Surface, transport: Surface) -> Self {
727 Self { hop, transport }
728 }
729}
730
731#[async_trait]
732impl HyperGreeter for SimpleGreeter {
733 async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr> {
734 Ok(Greet {
735 surface: stub.remote,
736 agent: stub.agent,
737 hop: self.hop.clone(),
738 transport: self.transport.clone(),
739 })
740 }
741}
742
743#[async_trait]
744pub trait HyperAuthenticator: Send + Sync + Clone + Sized {
745 async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr>;
746}
747
748#[derive(Clone)]
749pub struct TokenAuthenticator {
750 pub token: Token,
751 pub agent: Agent,
752}
753
754impl TokenAuthenticator {
755 pub fn new(agent: Agent, token: Token) -> Self {
756 Self { agent, token }
757 }
758}
759
760#[async_trait]
761impl HyperAuthenticator for TokenAuthenticator {
762 async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
763 if let Substance::Token(token) = &*knock.auth {
764 if *token == self.token {
765 Ok(HyperwayStub {
766 agent: self.agent.clone(),
767 remote: knock
768 .remote
769 .ok_or::<SpaceErr>("expected a remote entry selection".into())?,
770 })
771 } else {
772 Err(SpaceErr::new(500, "invalid token"))
773 }
774 } else {
775 Err(SpaceErr::new(500, "expected Subtance: Token"))
776 }
777 }
778}
779
780#[derive(Clone)]
781pub struct AnonHyperAuthenticator;
782
783impl AnonHyperAuthenticator {
784 pub fn new() -> Self {
785 Self {}
786 }
787}
788
789#[derive(Clone)]
790pub struct TokenAuthenticatorWithRemoteWhitelist {
791 pub token: Token,
792 pub agent: Agent,
793 pub whitelist: HashSet<Point>,
794}
795
796impl TokenAuthenticatorWithRemoteWhitelist {
797 pub fn new(agent: Agent, token: Token, whitelist: HashSet<Point>) -> Self {
798 Self {
799 agent,
800 token,
801 whitelist,
802 }
803 }
804}
805
806#[async_trait]
807impl HyperAuthenticator for TokenAuthenticatorWithRemoteWhitelist {
808 async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
809 if let Substance::Token(token) = &*knock.auth {
810 if *token == self.token {
811 let remote = knock
812 .remote
813 .ok_or(SpaceErr::new(500, "expected a remote entry selection"))?;
814 if self.whitelist.contains(&remote) {
815 Ok(HyperwayStub {
816 agent: self.agent.clone(),
817 remote,
818 })
819 } else {
820 Err(SpaceErr::new(500, "remote is not part of the whitelist"))
821 }
822 } else {
823 Err(SpaceErr::new(500, "invalid token"))
824 }
825 } else {
826 Err(SpaceErr::new(500, "expecting Substance: Token"))
827 }
828 }
829}
830
831#[async_trait]
832impl HyperAuthenticator for AnonHyperAuthenticator {
833 async fn auth(&self, req: Knock) -> Result<HyperwayStub, SpaceErr> {
834 let remote = req
835 .remote
836 .ok_or(SpaceErr::new(500, "required remote point request"))?;
837
838 Ok(HyperwayStub {
839 agent: Agent::Anonymous,
840 remote,
841 })
842 }
843}
844
845#[derive(Clone)]
846pub struct AnonHyperAuthenticatorAssignEndPoint {
847 pub logger: PointLogger,
848 pub remote_point_factory: Arc<dyn PointFactory>,
849}
850
851impl AnonHyperAuthenticatorAssignEndPoint {
852 pub fn new(remote_point_factory: Arc<dyn PointFactory>, logger: PointLogger) -> Self {
853 Self {
854 remote_point_factory,
855 logger,
856 }
857 }
858}
859
860#[async_trait]
861impl HyperAuthenticator for AnonHyperAuthenticatorAssignEndPoint {
862 async fn auth(&self, knock: Knock) -> Result<HyperwayStub, SpaceErr> {
863 let remote = self
864 .logger
865 .result(self.remote_point_factory.create().await)?
866 .to_surface();
867 Ok(HyperwayStub {
868 agent: Agent::Anonymous,
869 remote,
870 })
871 }
872}
873
874#[derive(Clone)]
875pub struct TokensFromHeavenHyperAuthenticatorAssignEndPoint {
876 pub logger: RootLogger,
877 pub tokens: Arc<DashMap<Token, HyperwayStub>>,
878}
879
880impl TokensFromHeavenHyperAuthenticatorAssignEndPoint {
881 pub fn new(tokens: Arc<DashMap<Token, HyperwayStub>>, logger: RootLogger) -> Self {
882 Self { logger, tokens }
883 }
884}
885
886#[async_trait]
887impl HyperAuthenticator for TokensFromHeavenHyperAuthenticatorAssignEndPoint {
888 async fn auth(&self, auth_req: Knock) -> Result<HyperwayStub, SpaceErr> {
889 match &*auth_req.auth {
890 Substance::Token(token) => {
891 if let Some((_, stub)) = self.tokens.remove(token) {
892 return Ok(stub);
893 } else {
894 return Err(SpaceErr::new(500, "invalid token"));
895 }
896 }
897 _ => {
898 return Err(SpaceErr::new(500, "expected Substance: Token"));
899 }
900 }
901 }
902}
903
904pub struct TokenDispensingHyperwayInterchange {
905 pub agent: Agent,
906 pub logger: PointLogger,
907 pub tokens: Arc<DashMap<Token, HyperwayStub>>,
908 pub lane_point_factory: Box<dyn PointFactory>,
909 pub remote_point_factory: Box<dyn PointFactory>,
910 pub interchange: HyperwayInterchange,
911}
912
913impl TokenDispensingHyperwayInterchange {
914 pub fn new(
915 agent: Agent,
916 router: Box<dyn HyperRouter>,
917 lane_point_factory: Box<dyn PointFactory>,
918 end_point_factory: Box<dyn PointFactory>,
919 logger: PointLogger,
920 ) -> Self {
921 let tokens = Arc::new(DashMap::new());
922 let authenticator = Box::new(TokensFromHeavenHyperAuthenticatorAssignEndPoint::new(
923 tokens.clone(),
924 logger.logger.clone(),
925 ));
926 let interchange = HyperwayInterchange::new(logger.clone());
927 Self {
928 agent,
929 tokens,
930 logger,
931 lane_point_factory,
932 remote_point_factory: end_point_factory,
933 interchange,
934 }
935 }
936
937 pub async fn dispense(&self) -> Result<(Token, HyperwayStub), SpaceErr> {
938 let token = Token::new_uuid();
939 let remote_point = self.remote_point_factory.create().await?.to_surface();
940 let lane_point = self.lane_point_factory.create().await?;
941 let logger = self.logger.point(lane_point);
942 let stub = HyperwayStub {
943 agent: self.agent.clone(),
944 remote: remote_point,
945 };
946 self.tokens.insert(token.clone(), stub.clone());
947 Ok((token, stub))
948 }
949}
950
951impl Deref for TokenDispensingHyperwayInterchange {
952 type Target = HyperwayInterchange;
953
954 fn deref(&self) -> &Self::Target {
955 &self.interchange
956 }
957}
958
959impl DerefMut for TokenDispensingHyperwayInterchange {
960 fn deref_mut(&mut self) -> &mut Self::Target {
961 &mut self.interchange
962 }
963}
964
965pub struct VersionGate {
966 selector: HyperGateSelector,
967}
968
969impl VersionGate {
970 pub fn new(selector: HyperGateSelector) -> Self {
971 Self { selector }
972 }
973 pub async fn unlock(&self, version: semver::Version) -> Result<HyperGateSelector, String> {
974 if version == *VERSION {
975 Ok(self.selector.clone())
976 } else {
977 Err("version mismatch".to_string())
978 }
979 }
980}
981
982#[async_trait]
983pub trait HyperGate: Send + Sync {
984 async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr>;
985
986 async fn jump(
987 &self,
988 kind: InterchangeKind,
989 stub: HyperwayStub,
990 ) -> Result<HyperwayEndpoint, SpaceErr>;
991}
992
993pub struct HopRouter {
994 greet: Greet,
995 tx: mpsc::Sender<UltraWave>,
996}
997
998impl HopRouter {
999 fn to_hop(&self, mut wave: UltraWave) -> Result<UltraWave, SpaceErr> {
1000 wave.set_agent(self.greet.agent.clone());
1001 let mut transport = wave
1002 .wrap_in_transport(self.greet.surface.clone(), self.greet.transport.clone())
1003 .build()?
1004 .to_signal()?;
1005 let hop = transport
1006 .wrap_in_hop(Point::local_portal().to_surface(), self.greet.hop.clone())
1007 .build()?
1008 .to_ultra();
1009 Ok(hop)
1010 }
1011}
1012
1013#[async_trait]
1014impl Router for HopRouter {
1015 async fn route(&self, wave: UltraWave) {
1016 match self.to_hop(wave) {
1017 Ok(hop) => {
1018 self.tx.send(hop).await.unwrap_or_default();
1019 }
1020 Err(err) => {
1021 println!("{}", err.to_string());
1022 }
1023 }
1024 }
1025}
1026
1027pub struct HyperApi {
1028 greet: Greet,
1029 hyperway: HyperwayEndpoint,
1030 exchanger: Exchanger,
1031}
1032
1033impl HyperApi {
1034 pub fn new(hyperway: HyperwayEndpoint, greet: Greet, logger: PointLogger) -> Self {
1035 let exchanger = Exchanger::new(greet.surface.clone(), Default::default(), logger);
1036 Self {
1037 greet,
1038 hyperway,
1039 exchanger,
1040 }
1041 }
1042
1043 pub fn router(&self) -> HopRouter {
1044 HopRouter {
1045 greet: self.greet.clone(),
1046 tx: self.hyperway.tx.clone(),
1047 }
1048 }
1049
1050 pub fn transmitter(&self) -> ProtoTransmitter {
1051 let mut builder =
1052 ProtoTransmitterBuilder::new(Arc::new(self.router()), self.exchanger.clone());
1053 builder.agent = SetStrategy::Override(self.greet.agent.clone());
1054 builder.build()
1055 }
1056}
1057
1058#[derive(Clone)]
1059pub struct HyperGateSelector {
1060 map: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>>,
1061}
1062
1063impl Default for HyperGateSelector {
1064 fn default() -> Self {
1065 Self::new(Arc::new(DashMap::new()))
1066 }
1067}
1068
1069impl HyperGateSelector {
1070 pub fn new(map: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>>) -> Self {
1071 Self { map }
1072 }
1073
1074 pub fn add(&self, kind: InterchangeKind, gate: Arc<dyn HyperGate>) -> Result<(), SpaceErr> {
1075 if self.map.contains_key(&kind) {
1076 Err(format!("already have an interchange of kind: {}", kind.to_string()).into())
1077 } else {
1078 self.map.insert(kind, gate);
1079 Ok(())
1080 }
1081 }
1082}
1083
1084#[async_trait]
1085impl HyperGate for HyperGateSelector {
1086 async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1087 if let Some(gate) = self.map.get(&knock.kind) {
1088 gate.value().knock(knock).await
1089 } else {
1090 Err(SpaceErr::new(
1091 500,
1092 format!("interchange not available: {}", knock.kind.to_string()).as_str(),
1093 ))
1094 }
1095 }
1096
1097 async fn jump(
1098 &self,
1099 kind: InterchangeKind,
1100 stub: HyperwayStub,
1101 ) -> Result<HyperwayEndpoint, SpaceErr> {
1102 self.map
1103 .get(&kind)
1104 .ok_or(SpaceErr::new(
1105 500,
1106 format!("interchange kind not available: {}", kind.to_string()).as_str(),
1107 ))?
1108 .value()
1109 .jump(kind, stub)
1110 .await
1111 }
1112}
1113
1114pub trait HyperwayConfigurator: Send + Sync {
1115 fn config(&self, greet: &Greet, hyperway: &mut Hyperway);
1116}
1117
1118pub struct DefaultHyperwayConfigurator;
1119
1120impl HyperwayConfigurator for DefaultHyperwayConfigurator {
1121 fn config(&self, greet: &Greet, hyperway: &mut Hyperway) {}
1122}
1123
1124#[derive(Clone)]
1125pub struct InterchangeGate<A, G, C>
1126where
1127 A: HyperAuthenticator,
1128 G: HyperGreeter,
1129 C: HyperwayConfigurator,
1130{
1131 logger: PointLogger,
1132 auth: A,
1133 greeter: G,
1134 interchange: Arc<HyperwayInterchange>,
1135 configurator: C,
1136}
1137impl<A, G, C> InterchangeGate<A, G, C>
1138where
1139 A: HyperAuthenticator,
1140 G: HyperGreeter,
1141 C: HyperwayConfigurator,
1142{
1143 pub fn new(
1144 auth: A,
1145 greeter: G,
1146 configurator: C,
1147 interchange: Arc<HyperwayInterchange>,
1148 logger: PointLogger,
1149 ) -> Self {
1150 Self {
1151 auth,
1152 greeter,
1153 configurator,
1154 interchange,
1155 logger,
1156 }
1157 }
1158}
1159
1160impl<A, G, C> InterchangeGate<A, G, C>
1161where
1162 A: HyperAuthenticator,
1163 G: HyperGreeter,
1164 C: HyperwayConfigurator,
1165{
1166 async fn enter(&self, greet: Greet) -> Result<HyperwayEndpoint, SpaceErr> {
1167 let mut hyperway = Hyperway::new(
1168 greet.surface.clone(),
1169 greet.agent.clone(),
1170 self.logger.clone(),
1171 );
1172 self.configurator.config(&greet, &mut hyperway);
1173
1174 self.interchange.add(hyperway).await;
1175
1176 let port = greet.surface.clone();
1177 let stub = HyperwayStub {
1178 agent: greet.agent.clone(),
1179 remote: greet.surface.clone(),
1180 };
1181
1182 let mut ext = self.logger.result_ctx(
1183 "InterchangeGate.enter",
1184 self.interchange.mount(stub, Some(greet.into())).await,
1185 )?;
1186
1187 let (drop_tx, drop_rx) = oneshot::channel();
1188 ext.drop_tx = Some(drop_tx);
1189
1190 let interchange = self.interchange.clone();
1191 tokio::spawn(async move {
1192 drop_rx.await;
1193 interchange.remove(port);
1194 });
1195
1196 Ok(ext)
1197 }
1198}
1199
1200#[async_trait]
1201impl<A, G, C> HyperGate for InterchangeGate<A, G, C>
1202where
1203 A: HyperAuthenticator,
1204 G: HyperGreeter,
1205 C: HyperwayConfigurator,
1206{
1207 async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1208 let stub = self.auth.auth(knock).await?;
1209 let greet = self.greeter.greet(stub).await?;
1210 self.enter(greet).await
1211 }
1212
1213 async fn jump(
1214 &self,
1215 _kind: InterchangeKind,
1216 stub: HyperwayStub,
1217 ) -> Result<HyperwayEndpoint, SpaceErr> {
1218 let greet = self.greeter.greet(stub).await?;
1219 self.enter(greet).await
1220 }
1221}
1222
1223#[derive(Clone)]
1224pub struct MountInterchangeGate<A, G>
1225where
1226 A: HyperAuthenticator,
1227 G: HyperGreeter,
1228{
1229 logger: PointLogger,
1230 auth: A,
1231 greeter: G,
1232 interchange: Arc<HyperwayInterchange>,
1233}
1234
1235impl<A, G> MountInterchangeGate<A, G>
1236where
1237 A: HyperAuthenticator,
1238 G: HyperGreeter,
1239{
1240 pub fn new(
1241 auth: A,
1242 greeter: G,
1243 interchange: Arc<HyperwayInterchange>,
1244 logger: PointLogger,
1245 ) -> Self {
1246 Self {
1247 auth,
1248 greeter,
1249 interchange,
1250 logger,
1251 }
1252 }
1253
1254 async fn enter(&self, greet: Greet) -> Result<HyperwayEndpoint, SpaceErr> {
1255 let stub = HyperwayStub::new(greet.surface.clone(), greet.agent.clone());
1256 let ext = self
1257 .interchange
1258 .mount(stub.clone(), Some(greet.into()))
1259 .await?;
1260 Ok(ext)
1261 }
1262}
1263
1264#[async_trait]
1265impl<A, G> HyperGate for MountInterchangeGate<A, G>
1266where
1267 A: HyperAuthenticator,
1268 G: HyperGreeter,
1269{
1270 async fn knock(&self, knock: Knock) -> Result<HyperwayEndpoint, SpaceErr> {
1271 let stub = self.auth.auth(knock).await?;
1272 let greet = self.greeter.greet(stub).await?;
1273 let ext = self.enter(greet).await?;
1274 Ok(ext)
1275 }
1276
1277 async fn jump(
1278 &self,
1279 _kind: InterchangeKind,
1280 stub: HyperwayStub,
1281 ) -> Result<HyperwayEndpoint, SpaceErr> {
1282 let greet = self.greeter.greet(stub).await?;
1283 let ext = self.enter(greet).await?;
1284 Ok(ext)
1285 }
1286}
1287
1288pub struct HyperClient {
1289 tx: mpsc::Sender<UltraWave>,
1290 status_rx: watch::Receiver<HyperConnectionStatus>,
1291 to_client_listener_tx: broadcast::Sender<UltraWave>,
1292 logger: PointLogger,
1293 greet_rx: watch::Receiver<Option<Greet>>,
1294 exchanger: Option<Exchanger>,
1295}
1296
1297impl HyperClient {
1298 pub fn new(
1299 factory: Box<dyn HyperwayEndpointFactory>,
1300 logger: PointLogger,
1301 ) -> Result<HyperClient, SpaceErr> {
1302 Self::new_with_exchanger(factory, None, logger)
1303 }
1304
1305 pub fn new_with_exchanger(
1306 factory: Box<dyn HyperwayEndpointFactory>,
1307 exchanger: Option<Exchanger>,
1308 logger: PointLogger,
1309 ) -> Result<HyperClient, SpaceErr> {
1310 let (to_client_listener_tx, _) = broadcast::channel(1024);
1311 let (to_hyperway_tx, from_client_rx) = mpsc::channel(1024);
1312 let (status_watch_tx, mut status_rx) = watch::channel(HyperConnectionStatus::Pending);
1313
1314 let (status_mpsc_tx, mut status_mpsc_rx): (
1315 mpsc::Sender<HyperConnectionStatus>,
1316 mpsc::Receiver<HyperConnectionStatus>,
1317 ) = mpsc::channel(128);
1318
1319 tokio::spawn(async move {
1320 while let Some(status) = status_mpsc_rx.recv().await {
1321 let result = status_watch_tx.send(status.clone());
1322 if status == HyperConnectionStatus::Fatal {
1323 break;
1324 }
1325 if status == HyperConnectionStatus::Closed {
1326 break;
1327 }
1328 if let Err(_) = result {
1329 break;
1330 }
1331 }
1332 });
1333
1334 let mut from_runner_rx = HyperClientRunner::new(
1335 factory,
1336 from_client_rx,
1337 status_mpsc_tx.clone(),
1338 logger.clone(),
1339 );
1340
1341 let (greet_tx, greet_rx) = watch::channel(None);
1342
1343 let mut client = Self {
1344 tx: to_hyperway_tx,
1345 status_rx: status_rx.clone(),
1346 to_client_listener_tx: to_client_listener_tx.clone(),
1347 logger: logger.clone(),
1348 greet_rx,
1349 exchanger: exchanger.clone(),
1350 };
1351
1352 {
1353 let logger = logger.clone();
1354 tokio::spawn(async move {
1355 while let Ok(_) = status_rx.changed().await {
1356 let status = status_rx.borrow().clone();
1357 }
1359 });
1360 }
1361
1362 {
1363 let logger = logger.clone();
1364 let status_tx = status_mpsc_tx.clone();
1365 tokio::spawn(async move {
1366 async fn relay(
1367 mut from_runner_rx: mpsc::Receiver<UltraWave>,
1368 to_client_listener_tx: broadcast::Sender<UltraWave>,
1369 status_tx: mpsc::Sender<HyperConnectionStatus>,
1370 greet_tx: watch::Sender<Option<Greet>>,
1371 exchanger: Option<Exchanger>,
1372 logger: PointLogger,
1373 ) -> Result<(), SpaceErr> {
1374 if let Some(wave) = from_runner_rx.recv().await {
1375 logger.track(&wave, || Tracker::new("client", "ReceiveReflected"));
1376
1377 match wave.to_reflected() {
1378 Ok(reflected) => {
1379 if !reflected.core().status.is_success() {
1380 match reflected.core().status.as_u16() {
1381 400 => {
1382 status_tx
1383 .send(HyperConnectionStatus::Fatal)
1384 .await
1385 .unwrap_or_default();
1386 let err = "400: Bad Request: FATAL: something in the knock was incorrect";
1387 return Err(err.into());
1388 }
1389 401 => {
1390 status_tx
1391 .send(HyperConnectionStatus::Fatal)
1392 .await
1393 .unwrap_or_default();
1394 let err = "401: Unauthorized: FATAL: authentication failed (bad credentials?)";
1395 return Err(err.into());
1396 }
1397 403 => {
1398 status_tx
1399 .send(HyperConnectionStatus::Fatal)
1400 .await
1401 .unwrap_or_default();
1402 let err = "403: Forbidden: FATAL: authentication succeeded however the authenticated agent does not have permission to connect to this service";
1403 return Err(err.into());
1404 }
1405 408 => {
1406 status_tx
1407 .send(HyperConnectionStatus::Panic)
1408 .await
1409 .unwrap_or_default();
1410 let err = "408: Request Timeout: PANIC";
1411 return Err(err.into());
1412 }
1413 301 => {
1414 status_tx
1415 .send(HyperConnectionStatus::Fatal)
1416 .await
1417 .unwrap_or_default();
1418 let err = "301: Moved Permanently: FATAL: please update to new connection address";
1419 return Err(err.into());
1420 }
1421 503 => {
1422 status_tx
1423 .send(HyperConnectionStatus::Panic)
1424 .await
1425 .unwrap_or_default();
1426 let err =
1427 "503: Service Unavailable: PANIC: try again later";
1428 return Err(err.into());
1429 }
1430 _ => {
1431 status_tx
1432 .send(HyperConnectionStatus::Panic)
1433 .await
1434 .unwrap_or_default();
1435 let err = format!(
1436 "{}: {}: PANIC: expected 200",
1437 reflected.core().status.as_u16(),
1438 reflected.core().status.to_string()
1439 );
1440 return Err(err.into());
1441 }
1442 }
1443 }
1444 if let Substance::Greet(greet) = &reflected.core().body {
1445 greet_tx.send(Some(greet.clone()));
1446 } else {
1447 status_tx
1448 .send(HyperConnectionStatus::Fatal)
1449 .await
1450 .unwrap_or_default();
1451 let err = "HyperClient expected first wave Substance to be a reflected Greeting";
1452 return Err(err.into());
1453 }
1454 }
1455 Err(err) => {
1456 status_tx
1457 .send(HyperConnectionStatus::Fatal)
1458 .await
1459 .unwrap_or_default();
1460 let err = format!("HyperClient expected first wave Substance to be a reflected Greeting. Instead when attempting to convert to a reflected wave err occured: {}", err.to_string());
1461 return Err(err.into());
1462 }
1463 }
1464 }
1465
1466 while let Some(wave) = from_runner_rx.recv().await {
1467 if exchanger.is_some() {
1468 if wave.is_directed() {
1469 to_client_listener_tx.send(wave)?;
1470 } else {
1471 exchanger
1472 .as_ref()
1473 .unwrap()
1474 .reflected(wave.to_reflected()?)
1475 .await?;
1476 }
1477 } else {
1478 to_client_listener_tx.send(wave)?;
1479 }
1480 }
1481 Ok(())
1482 }
1483
1484 relay(
1485 from_runner_rx,
1486 to_client_listener_tx,
1487 status_tx,
1488 greet_tx,
1489 exchanger,
1490 logger.clone(),
1491 )
1492 .await
1493 .unwrap_or_default();
1494 });
1495 }
1496
1497 Ok(client)
1498 }
1499
1500 pub fn exchanger(&self) -> Option<Exchanger> {
1501 self.exchanger.clone()
1502 }
1503
1504 pub async fn transmitter_builder(&self) -> Result<ProtoTransmitterBuilder, SpaceErr> {
1505 self.wait_for_ready(Duration::from_secs(30)).await?;
1506 let mut builder = ProtoTransmitterBuilder::new(
1507 Arc::new(self.router()),
1508 self.exchanger
1509 .as_ref()
1510 .ok_or(SpaceErr::server_error(
1511 "cannot create a transmitter on a client that does not have an exchanger",
1512 ))?
1513 .clone(),
1514 );
1515 let greet = self
1516 .get_greeting()
1517 .ok_or::<SpaceErr>("expected greeting to already be set in HyperClient".into())?;
1518 builder.agent = SetStrategy::Fill(greet.agent.clone());
1519 builder.from = SetStrategy::Fill(greet.surface.clone());
1520 Ok(builder)
1521 }
1522
1523 pub fn reset(&self) {
1524 let mut wave = DirectedProto::signal();
1525 wave.to(LOCAL_CLIENT_RUNNER.clone().to_surface());
1526 wave.method(ExtMethod::new("Reset").unwrap());
1527 let wave = wave.build().unwrap();
1528 let wave = wave.to_ultra();
1529 let tx = self.tx.clone();
1530 tokio::spawn(async move {
1531 tx.send(wave).await.unwrap_or_default();
1532 });
1533 }
1534
1535 pub async fn close(&self) {
1536 let mut wave = DirectedProto::signal();
1537 wave.from(LOCAL_CLIENT.clone().to_surface());
1538 wave.to(LOCAL_CLIENT_RUNNER.clone().to_surface());
1539 wave.method(ExtMethod::new("Close").unwrap());
1540 let wave = wave.build().unwrap();
1541 let wave = wave.to_ultra();
1542 let tx = self.tx.clone();
1543 tokio::spawn(async move {
1544 tx.send(wave).await.unwrap_or_default();
1545 });
1546 }
1547
1548 pub fn router(&self) -> TxRouter {
1549 TxRouter::new(self.tx.clone())
1550 }
1551
1552 pub fn rx(&self) -> broadcast::Receiver<UltraWave> {
1553 self.to_client_listener_tx.subscribe()
1554 }
1555
1556 pub fn get_greeting(&self) -> Option<Greet> {
1557 self.greet_rx.borrow().clone()
1558 }
1559
1560 pub async fn wait_for_greet(&self) -> Result<Greet, SpaceErr> {
1561 let mut greet_rx = self.greet_rx.clone();
1562 loop {
1563 let greet = greet_rx.borrow().clone();
1564 if greet.is_some() {
1565 return Ok(greet.unwrap());
1566 } else {
1567 greet_rx.changed().await?;
1568 }
1569 }
1570 }
1571
1572 pub async fn wait_for_ready(&self, duration: Duration) -> Result<(), SpaceErr> {
1573 let mut status_rx = self.status_rx.clone();
1574 let (rtn, mut rtn_rx) = oneshot::channel();
1575
1576 tokio::spawn(async move {
1577 loop {
1578 let status = status_rx.borrow().clone();
1579 match status {
1580 HyperConnectionStatus::Ready => {
1581 rtn.send(Ok(()));
1582 break;
1583 }
1584 HyperConnectionStatus::Fatal => {
1585 rtn.send(Err(SpaceErr::server_error(
1586 "Fatal status from HyperClient while waiting for Ready",
1587 )));
1588 break;
1589 }
1590 _ => {}
1591 }
1592 }
1593 });
1594
1595 tokio::time::timeout(duration, rtn_rx).await??
1596 }
1597}
1598
1599#[derive(Clone, Eq, PartialEq)]
1600pub struct HyperConnectionDetails {
1601 pub status: HyperConnectionStatus,
1602 pub info: String,
1603}
1604
1605impl HyperConnectionDetails {
1606 pub fn new<S: ToString>(status: HyperConnectionStatus, info: S) -> Self {
1607 Self {
1608 status,
1609 info: info.to_string(),
1610 }
1611 }
1612}
1613
1614#[derive(Clone, strum_macros::Display, Eq, PartialEq)]
1615pub enum HyperConnectionStatus {
1616 Unknown,
1617 Pending,
1618 Connecting,
1619 Handshake,
1620 Auth,
1621 Ready,
1622 Panic,
1623 Fatal,
1624 Closed,
1625}
1626
1627pub enum HyperClientCall {
1628 Close,
1629}
1630
1631pub enum HyperConnectionErr {
1632 Fatal(String),
1633 Retry(String),
1634}
1635
1636impl ToString for HyperConnectionErr {
1637 fn to_string(&self) -> String {
1638 match self {
1639 HyperConnectionErr::Fatal(m) => format!("Fatal({})", m),
1640 HyperConnectionErr::Retry(m) => format!("Retry({})", m),
1641 }
1642 }
1643}
1644
1645impl From<SpaceErr> for HyperConnectionErr {
1646 fn from(err: SpaceErr) -> Self {
1647 HyperConnectionErr::Retry(err.to_string())
1648 }
1649}
1650
1651pub struct HyperClientRunner {
1652 ext: Option<HyperwayEndpoint>,
1653 factory: Box<dyn HyperwayEndpointFactory>,
1654 status_tx: mpsc::Sender<HyperConnectionStatus>,
1655 to_client_tx: mpsc::Sender<UltraWave>,
1656 from_client_rx: mpsc::Receiver<UltraWave>,
1657 logger: PointLogger,
1658}
1659
1660impl HyperClientRunner {
1661 pub fn new(
1662 factory: Box<dyn HyperwayEndpointFactory>,
1663 from_client_rx: mpsc::Receiver<UltraWave>,
1664 status_tx: mpsc::Sender<HyperConnectionStatus>,
1665 logger: PointLogger,
1666 ) -> mpsc::Receiver<UltraWave> {
1667 let (to_client_tx, from_runner_rx) = mpsc::channel(1024);
1668 let logger = logger.push_point("runner").unwrap();
1669 let runner = Self {
1670 ext: None,
1671 factory,
1672 to_client_tx,
1673 from_client_rx,
1674 status_tx,
1675 logger,
1676 };
1677
1678 tokio::spawn(async move {
1679 runner.start().await;
1680 });
1681
1682 from_runner_rx
1683 }
1684
1685 async fn start(mut self) {
1686 self.status_tx
1687 .send(HyperConnectionStatus::Pending)
1688 .await
1689 .unwrap_or_default();
1690
1691 loop {
1692 async fn connect(runner: &mut HyperClientRunner) -> Result<(), HyperConnectionErr> {
1693 if let Err(_) = runner
1694 .status_tx
1695 .send(HyperConnectionStatus::Connecting)
1696 .await
1697 {
1698 return Err(HyperConnectionErr::Fatal("can no longer update HyperClient status (probably due to previous Fatal status)".to_string()));
1699 }
1700 let (details_tx, mut details_rx): (
1701 mpsc::Sender<HyperConnectionDetails>,
1702 mpsc::Receiver<HyperConnectionDetails>,
1703 ) = mpsc::channel(1024);
1704 {
1705 let logger = runner.logger.clone();
1706 tokio::spawn(async move {
1707 while let Some(detail) = details_rx.recv().await {
1708 logger.info(format!("{} | {}", detail.status.to_string(), detail.info));
1709 }
1710 });
1711 }
1712 loop {
1713 match runner.logger.result_ctx(
1714 "connect",
1715 tokio::time::timeout(
1716 Duration::from_secs(60),
1717 runner.factory.create(details_tx.clone()),
1718 )
1719 .await,
1720 ) {
1721 Ok(Ok(ext)) => {
1722 runner.ext.replace(ext);
1723 if let Err(_) =
1724 runner.status_tx.send(HyperConnectionStatus::Ready).await
1725 {
1726 runner.ext.take();
1727 return Err(HyperConnectionErr::Fatal("can no longer update HyperClient status (probably due to previous Fatal status)".to_string()));
1728 }
1729
1730 return Ok(());
1731 }
1732 Ok(Err(err)) => {
1733 }
1735 Err(err) => {
1736 runner.logger.error(format!("{}", err.to_string()));
1737 }
1738 }
1739 tokio::time::sleep(Duration::from_secs(1)).await;
1742 }
1743 }
1744
1745 async fn relay(runner: &mut HyperClientRunner) -> Result<(), SpaceErr> {
1746 let ext = runner
1747 .ext
1748 .as_mut()
1749 .ok_or::<SpaceErr>("must reconnect".into())?;
1750
1751 loop {
1752 tokio::select!(
1753 wave = runner.from_client_rx.recv() => {
1754 match wave {
1756 Some(wave) => {
1757 if wave.is_directed() && wave.to().is_single() && wave.to().unwrap_single().point == *LOCAL_CLIENT_RUNNER
1758 {
1759 let method: ExtMethod = wave.to_directed().unwrap().core().method.clone().try_into().unwrap();
1760 if method.to_string() == "Reset".to_string() {
1761 return Err(SpaceErr::server_error("reset"));
1762 } else if method.to_string() == "Close".to_string(){
1763 runner.status_tx.send(HyperConnectionStatus::Closed).await;
1764 return Ok(());
1765 }
1766 } else {
1767 match ext.tx.send(wave).await {
1768 Ok(_) => {}
1769 Err(err) => {
1770 return Err(SpaceErr::server_error("ext failure"));
1773 }
1774 }
1775 }
1776 }
1777 None => {
1778 break;
1780 }
1781 }
1782 }
1783
1784 wave = ext.rx.recv() => {
1785 match wave {
1786 Some( wave ) => {
1787 runner.to_client_tx.send(wave).await;
1788 }
1789 None => {
1790 runner.logger.warn("client hyperway_endpoint has been closed. This can happen if the client sender (tx) has been dropped.");
1791 break;
1792 }
1793 }
1794 }
1795 );
1796 }
1797
1798 Ok(())
1801 }
1802
1803 loop {
1804 match connect(&mut self).await {
1805 Ok(_) => {}
1806 Err(HyperConnectionErr::Fatal(message)) => {
1807 self.status_tx
1809 .send(HyperConnectionStatus::Fatal)
1810 .await
1811 .unwrap_or_default();
1812 return;
1813 }
1814 Err(HyperConnectionErr::Retry(m)) => {
1815 self.status_tx
1816 .send(HyperConnectionStatus::Panic)
1817 .await
1818 .unwrap_or_default();
1819 }
1820 }
1821
1822 match relay(&mut self).await {
1823 Ok(_) => {
1824 break;
1826 }
1827 Err(err) => {
1828 self.ext = None;
1831 }
1832 }
1833 }
1834 }
1835 }
1836}
1837
1838#[async_trait]
1839pub trait HyperwayEndpointFactory: Send + Sync {
1840 async fn create(
1841 &self,
1842 status_tx: mpsc::Sender<HyperConnectionDetails>,
1843 ) -> Result<HyperwayEndpoint, SpaceErr>;
1844}
1845
1846pub struct LocalHyperwayGateUnlocker {
1847 pub knock: Knock,
1848 pub gate: Arc<dyn HyperGate>,
1849}
1850
1851impl LocalHyperwayGateUnlocker {
1852 pub fn new(remote: Surface, gate: Arc<dyn HyperGate>) -> Self {
1853 let knock = Knock::new(InterchangeKind::Singleton, remote, Substance::Empty);
1854 Self { knock, gate }
1855 }
1856}
1857
1858#[async_trait]
1859impl HyperwayEndpointFactory for LocalHyperwayGateUnlocker {
1860 async fn create(
1861 &self,
1862 status_tx: mpsc::Sender<HyperConnectionDetails>,
1863 ) -> Result<HyperwayEndpoint, SpaceErr> {
1864 self.gate.knock(self.knock.clone()).await
1865 }
1866}
1867
1868pub struct LocalHyperwayGateJumper {
1869 pub kind: InterchangeKind,
1870 pub stub: HyperwayStub,
1871 pub gate: Arc<dyn HyperGate>,
1872}
1873
1874impl LocalHyperwayGateJumper {
1875 pub fn new(kind: InterchangeKind, stub: HyperwayStub, gate: Arc<dyn HyperGate>) -> Self {
1876 Self { kind, stub, gate }
1877 }
1878}
1879
1880#[async_trait]
1881impl HyperwayEndpointFactory for LocalHyperwayGateJumper {
1882 async fn create(
1883 &self,
1884 status_tx: mpsc::Sender<HyperConnectionDetails>,
1885 ) -> Result<HyperwayEndpoint, SpaceErr> {
1886 self.gate.jump(self.kind.clone(), self.stub.clone()).await
1887 }
1888}
1889
1890pub struct Bridge {
1929 client: HyperClient,
1930}
1931
1932impl Bridge {
1933 pub fn new(
1934 mut local_hyperway_endpoint: HyperwayEndpoint,
1935 remote_factory: Box<dyn HyperwayEndpointFactory>,
1936 logger: PointLogger,
1937 ) -> Result<Self, SpaceErr> {
1938 let client = HyperClient::new(remote_factory, logger)?;
1939 let client_router = client.router();
1940 let local_hyperway_endpoint_tx = local_hyperway_endpoint.tx.clone();
1941 tokio::spawn(async move {
1942 while let Some(wave) = local_hyperway_endpoint.rx.recv().await {
1943 client_router.route(wave).await;
1944 }
1945 });
1946
1947 let mut rx = client.rx();
1948 tokio::spawn(async move {
1949 while let Ok(wave) = rx.recv().await {
1950 local_hyperway_endpoint_tx.send(wave).await;
1951 }
1952 });
1953
1954 Ok(Self { client })
1955 }
1956
1957 pub fn reset(&self) {
1958 self.client.reset();
1959 }
1960
1961 pub fn close(&self) {
1962 self.client.close();
1963 }
1964
1965 pub fn status(&self) -> HyperConnectionStatus {
1966 self.client.status_rx.borrow().clone()
1967 }
1968}
1969
1970#[cfg(test)]
1971mod tests {
1972 use std::collections::HashMap;
1973 use std::str::FromStr;
1974 use std::sync::Arc;
1975
1976 use chrono::{DateTime, Utc};
1977 use dashmap::DashMap;
1978
1979 use cosmic_space::command::direct::create::PointFactoryU64;
1980 use cosmic_space::hyper::{InterchangeKind, Knock};
1981 use cosmic_space::point::Point;
1982 use cosmic_space::loc::Uuid;
1983 use cosmic_space::log::RootLogger;
1984 use cosmic_space::substance::Substance;
1985 use cosmic_space::wave::HyperWave;
1986
1987 use crate::{
1988 AnonHyperAuthenticator, HyperGate, HyperGateSelector, HyperRouter, HyperwayInterchange,
1989 InterchangeGate,
1990 };
1991
1992 #[no_mangle]
1993 pub(crate) extern "C" fn cosmic_uuid() -> String {
1994 uuid::Uuid::new_v4().to_string()
1995 }
1996
1997 #[no_mangle]
1998 pub(crate) extern "C" fn cosmic_timestamp() -> DateTime<Utc> {
1999 Utc::now()
2000 }
2001
2002 pub struct DummyRouter {}
2003
2004 #[async_trait]
2005 impl HyperRouter for DummyRouter {
2006 async fn route(&self, wave: HyperWave) {
2007 println!("received hyperwave!");
2008 }
2009 }
2010
2011 }
2045
2046pub mod test_util {
2047 use std::collections::{HashMap, HashSet};
2048 use std::str::FromStr;
2049 use std::sync::Arc;
2050 use std::time::Duration;
2051
2052 use dashmap::DashMap;
2053 use lazy_static::lazy_static;
2054 use tokio::sync::{broadcast, mpsc, oneshot};
2055
2056 use cosmic_space::command::direct::create::PointFactoryU64;
2057 use cosmic_space::err::SpaceErr;
2058 use cosmic_space::hyper::{Greet, InterchangeKind, Knock};
2059 use cosmic_space::loc::{Layer, Surface, ToPoint, ToSurface};
2060 use cosmic_space::log::{PointLogger, RootLogger};
2061 use cosmic_space::point::Point;
2062 use cosmic_space::settings::Timeouts;
2063 use cosmic_space::substance::{Substance, Token};
2064 use cosmic_space::wave::core::cmd::CmdMethod;
2065 use cosmic_space::wave::core::ext::ExtMethod;
2066 use cosmic_space::wave::core::{Method, ReflectedCore};
2067 use cosmic_space::wave::exchange::asynch::{
2068 Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
2069 };
2070 use cosmic_space::wave::exchange::SetStrategy;
2071 use cosmic_space::wave::{
2072 Agent, DirectedKind, DirectedProto, HyperWave, Pong, ReflectedKind, ReflectedProto,
2073 ReflectedWave, UltraWave, Wave,
2074 };
2075
2076 use crate::{
2077 AnonHyperAuthenticator, AnonHyperAuthenticatorAssignEndPoint, Bridge, HyperClient,
2078 HyperConnectionDetails, HyperConnectionErr, HyperGate, HyperGateSelector, HyperGreeter,
2079 Hyperlane, HyperRouter, Hyperway, HyperwayEndpoint, HyperwayEndpointFactory,
2080 HyperwayInterchange, HyperwayStub, InterchangeGate, LocalHyperwayGateJumper,
2081 LocalHyperwayGateUnlocker, MountInterchangeGate, TokenAuthenticatorWithRemoteWhitelist,
2082 };
2083
2084 lazy_static! {
2085 pub static ref LESS: Point = Point::from_str("space:users:less").expect("point");
2086 pub static ref FAE: Point = Point::from_str("space:users:fae").expect("point");
2087 }
2088
2089 #[derive(Clone)]
2090 pub struct SingleInterchangePlatform {
2091 pub interchange: Arc<HyperwayInterchange>,
2092 pub gate: Arc<HyperGateSelector>,
2093 }
2094
2095 impl SingleInterchangePlatform {
2096 pub async fn new() -> Self {
2097 let root_logger = RootLogger::default();
2098 let logger = root_logger.point(Point::from_str("point").unwrap());
2099 let interchange = Arc::new(HyperwayInterchange::new(
2100 logger.push_point("interchange").unwrap(),
2101 ));
2102
2103 interchange
2104 .add(Hyperway::new(
2105 LESS.clone().to_surface(),
2106 LESS.to_agent(),
2107 Default::default(),
2108 ))
2109 .await;
2110 interchange
2111 .add(Hyperway::new(
2112 FAE.clone().to_surface(),
2113 FAE.to_agent(),
2114 Default::default(),
2115 ))
2116 .await;
2117 let auth = AnonHyperAuthenticator::new();
2118 let gate = Arc::new(MountInterchangeGate::new(
2119 auth,
2120 TestGreeter::new(),
2121 interchange.clone(),
2122 logger.push_point("gate").unwrap(),
2123 ));
2124 let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> =
2125 Arc::new(DashMap::new());
2126 gates.insert(InterchangeKind::Singleton, gate);
2127 let gate = Arc::new(HyperGateSelector::new(gates));
2128
2129 Self { interchange, gate }
2130 }
2131
2132 pub fn knock(&self, surface: Surface) -> Knock {
2133 Knock::new(InterchangeKind::Singleton, surface, Substance::Empty)
2134 }
2135
2136 pub fn local_hyperway_endpoint_factory(
2137 &self,
2138 port: Surface,
2139 ) -> Box<dyn HyperwayEndpointFactory> {
2140 Box::new(LocalHyperwayGateUnlocker::new(port, self.gate.clone()))
2141 }
2142 }
2143 pub struct LargeFrameTest {
2144 fae_factory: Box<dyn HyperwayEndpointFactory>,
2145 less_factory: Box<dyn HyperwayEndpointFactory>,
2146 }
2147
2148 impl LargeFrameTest {
2149 pub fn new(
2150 fae_factory: Box<dyn HyperwayEndpointFactory>,
2151 less_factory: Box<dyn HyperwayEndpointFactory>,
2152 ) -> Self {
2153 Self {
2154 fae_factory,
2155 less_factory,
2156 }
2157 }
2158
2159 pub async fn go(self) -> Result<(), SpaceErr> {
2160 let less_exchanger = Exchanger::new(
2161 LESS.push("exchanger").unwrap().to_surface(),
2162 Timeouts::default(),
2163 PointLogger::default(),
2164 );
2165 let fae_exchanger = Exchanger::new(
2166 FAE.push("exchanger").unwrap().to_surface(),
2167 Timeouts::default(),
2168 PointLogger::default(),
2169 );
2170
2171 let root_logger = RootLogger::default();
2172 let logger = root_logger.point(Point::from_str("less-client").unwrap());
2173 let less_client = HyperClient::new_with_exchanger(
2174 self.less_factory,
2175 Some(less_exchanger.clone()),
2176 logger,
2177 )
2178 .unwrap();
2179 let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2180 let fae_client = HyperClient::new_with_exchanger(
2181 self.fae_factory,
2182 Some(fae_exchanger.clone()),
2183 logger,
2184 )
2185 .unwrap();
2186
2187 let mut less_rx = less_client.rx();
2188 let mut fae_rx = fae_client.rx();
2189
2190 let less_router = less_client.router();
2191 let less_transmitter =
2192 ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2193
2194 let fae_router = fae_client.router();
2195 let fae_transmitter =
2196 ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2197
2198 {
2199 let fae = FAE.clone();
2200 tokio::spawn(async move {
2201 fae_client.wait_for_greet().await.unwrap();
2202 let wave = fae_rx.recv().await.unwrap();
2203 let mut reflected = ReflectedProto::new();
2204 reflected.kind(ReflectedKind::Pong);
2205 reflected.status(200u16);
2206 reflected.to(wave.from().clone());
2207 reflected.from(fae.to_surface());
2208 reflected.intended(wave.to());
2209 reflected.reflection_of(wave.id());
2210 let wave = reflected.build().unwrap();
2211 let wave = wave.to_ultra();
2212 fae_transmitter.route(wave).await;
2213 });
2214 }
2215
2216 let (rtn, mut rtn_rx) = oneshot::channel();
2217 tokio::spawn(async move {
2218 less_client.wait_for_greet().await.unwrap();
2219 let mut hello = DirectedProto::ping();
2220 hello.to(FAE.clone().to_surface());
2221 hello.from(LESS.clone().to_surface());
2222 hello.method(ExtMethod::new("Hello").unwrap());
2223 let size = 3_000_000usize;
2224 let mut body = Vec::with_capacity(size);
2225 for _ in 0..size {
2226 body.push(0u8);
2227 }
2228 hello.body(Substance::Bin(Arc::new(body)));
2229 let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2230 rtn.send(pong.core.status.as_u16() == 200u16);
2231 });
2232
2233 let result = tokio::time::timeout(Duration::from_secs(30), rtn_rx)
2234 .await
2235 .unwrap()
2236 .unwrap();
2237 assert!(result);
2238 Ok(())
2239 }
2240 }
2241
2242 pub struct WaveTest {
2243 fae_factory: Box<dyn HyperwayEndpointFactory>,
2244 less_factory: Box<dyn HyperwayEndpointFactory>,
2245 }
2246
2247 impl WaveTest {
2248 pub fn new(
2249 fae_factory: Box<dyn HyperwayEndpointFactory>,
2250 less_factory: Box<dyn HyperwayEndpointFactory>,
2251 ) -> Self {
2252 Self {
2253 fae_factory,
2254 less_factory,
2255 }
2256 }
2257
2258 pub async fn go(self) -> Result<(), SpaceErr> {
2259 let less_exchanger = Exchanger::new(
2260 LESS.push("exchanger").unwrap().to_surface(),
2261 Timeouts::default(),
2262 PointLogger::default(),
2263 );
2264 let fae_exchanger = Exchanger::new(
2265 FAE.push("exchanger").unwrap().to_surface(),
2266 Timeouts::default(),
2267 PointLogger::default(),
2268 );
2269
2270 let root_logger = RootLogger::default();
2271 let logger = root_logger.point(Point::from_str("less-client").unwrap());
2272 let less_client = HyperClient::new_with_exchanger(
2273 self.less_factory,
2274 Some(less_exchanger.clone()),
2275 logger,
2276 )
2277 .unwrap();
2278 let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2279 let fae_client = HyperClient::new_with_exchanger(
2280 self.fae_factory,
2281 Some(fae_exchanger.clone()),
2282 logger,
2283 )
2284 .unwrap();
2285
2286 let mut less_rx = less_client.rx();
2287 let mut fae_rx = fae_client.rx();
2288
2289 let less_router = less_client.router();
2290 let less_transmitter =
2291 ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2292
2293 let fae_router = fae_client.router();
2294 let fae_transmitter =
2295 ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2296
2297 {
2298 let fae = FAE.clone();
2299 tokio::spawn(async move {
2300 fae_client.wait_for_greet().await.unwrap();
2301 let wave = fae_rx.recv().await.unwrap();
2302 let mut reflected = ReflectedProto::new();
2303 reflected.kind(ReflectedKind::Pong);
2304 reflected.status(200u16);
2305 reflected.to(wave.from().clone());
2306 reflected.from(fae.to_surface());
2307 reflected.intended(wave.to());
2308 reflected.reflection_of(wave.id());
2309 let wave = reflected.build().unwrap();
2310 let wave = wave.to_ultra();
2311 fae_transmitter.route(wave).await;
2312 });
2313 }
2314
2315 let (rtn, mut rtn_rx) = oneshot::channel();
2316 tokio::spawn(async move {
2317 less_client.wait_for_greet().await.unwrap();
2318 let mut hello = DirectedProto::ping();
2319 hello.to(FAE.clone().to_surface());
2320 hello.from(LESS.clone().to_surface());
2321 hello.method(ExtMethod::new("Hello").unwrap());
2322 hello.body(Substance::Empty);
2323 let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2324 rtn.send(pong.core.status.as_u16() == 200u16);
2325 });
2326
2327 let result = tokio::time::timeout(Duration::from_secs(30), rtn_rx)
2328 .await
2329 .unwrap()
2330 .unwrap();
2331 assert!(result);
2332 Ok(())
2333 }
2334 }
2335
2336 #[derive(Clone)]
2337 pub struct TestGreeter;
2338
2339 impl TestGreeter {
2340 pub fn new() -> Self {
2341 Self
2342 }
2343 }
2344
2345 #[async_trait]
2346 impl HyperGreeter for TestGreeter {
2347 async fn greet(&self, stub: HyperwayStub) -> Result<Greet, SpaceErr> {
2348 Ok(Greet {
2349 surface: stub.remote.clone(),
2350 agent: stub.agent.clone(),
2351 hop: Point::remote_endpoint()
2352 .to_surface()
2353 .with_layer(Layer::Core),
2354 transport: stub.remote.clone(),
2355 })
2356 }
2357 }
2358}
2359
2360#[cfg(test)]
2361pub mod test {
2362 use std::collections::{HashMap, HashSet};
2363 use std::str::FromStr;
2364 use std::sync::Arc;
2365 use std::time::Duration;
2366
2367 use dashmap::DashMap;
2368 use lazy_static::lazy_static;
2369 use tokio::sync::{broadcast, mpsc, oneshot};
2370
2371 use cosmic_space::command::direct::create::PointFactoryU64;
2372 use cosmic_space::err::SpaceErr;
2373 use cosmic_space::hyper::{Greet, InterchangeKind, Knock};
2374 use cosmic_space::loc::{Layer, Surface, ToPoint, ToSurface};
2375 use cosmic_space::log::RootLogger;
2376 use cosmic_space::point::Point;
2377 use cosmic_space::settings::Timeouts;
2378 use cosmic_space::substance::{Substance, Token};
2379 use cosmic_space::wave::core::cmd::CmdMethod;
2380 use cosmic_space::wave::core::ext::ExtMethod;
2381 use cosmic_space::wave::core::{Method, ReflectedCore};
2382 use cosmic_space::wave::exchange::asynch::{
2383 Exchanger, ProtoTransmitter, ProtoTransmitterBuilder, Router, TxRouter,
2384 };
2385 use cosmic_space::wave::exchange::SetStrategy;
2386 use cosmic_space::wave::{
2387 Agent, DirectedKind, DirectedProto, HyperWave, Pong, ReflectedKind, ReflectedProto,
2388 ReflectedWave, UltraWave, Wave,
2389 };
2390
2391 use crate::test_util::{FAE, LESS, SingleInterchangePlatform, TestGreeter, WaveTest};
2392 use crate::{
2393 AnonHyperAuthenticator, AnonHyperAuthenticatorAssignEndPoint, Bridge, HyperClient,
2394 HyperConnectionDetails, HyperConnectionErr, HyperGate, HyperGateSelector, HyperGreeter,
2395 Hyperlane, HyperRouter, Hyperway, HyperwayEndpoint, HyperwayEndpointFactory,
2396 HyperwayInterchange, HyperwayStub, InterchangeGate, LocalHyperwayGateJumper,
2397 LocalHyperwayGateUnlocker, MountInterchangeGate, TokenAuthenticatorWithRemoteWhitelist,
2398 };
2399
2400 pub struct TestRouter {}
2401
2402 #[async_trait]
2403 impl HyperRouter for TestRouter {
2404 async fn route(&self, wave: HyperWave) {
2405 println!("Test Router routing!");
2406 }
2408 }
2409
2410 fn hello_wave() -> UltraWave {
2411 let mut hello = DirectedProto::ping();
2412 hello.to(FAE.clone().to_surface());
2413 hello.from(LESS.clone().to_surface());
2414 hello.method(ExtMethod::new("Hello").unwrap());
2415 hello.body(Substance::Empty);
2416 let directed = hello.build().unwrap();
2417 let wave = directed.to_ultra();
2418 wave
2419 }
2420
2421 #[tokio::test]
2422 pub async fn test_hyperlane() {
2423 let hyperlane = Hyperlane::new("mem");
2424 let mut rx = hyperlane.rx(None).await;
2425 let wave = hello_wave();
2426 let wave_id = wave.id().clone();
2427 hyperlane.send(wave).await.unwrap();
2428 let wave = tokio::time::timeout(Duration::from_secs(5u64), rx.recv())
2429 .await
2430 .unwrap()
2431 .unwrap();
2432 assert_eq!(wave.id(), wave_id);
2433 }
2434
2435 #[tokio::test]
2436 pub async fn test_hyperway() {
2437 let hyperway = Hyperway::new(
2438 LESS.clone().to_surface(),
2439 LESS.to_agent(),
2440 Default::default(),
2441 );
2442 let wave = hello_wave();
2443 let wave_id = wave.id().clone();
2444 hyperway.outbound.send(wave).await;
2445 let wave = tokio::time::timeout(
2446 Duration::from_secs(5u64),
2447 hyperway.outbound.rx(None).await.recv(),
2448 )
2449 .await
2450 .unwrap()
2451 .unwrap();
2452
2453 let wave = hello_wave();
2454 let wave_id = wave.id().clone();
2455 hyperway.inbound.send(wave).await;
2456 let wave = tokio::time::timeout(
2457 Duration::from_secs(5u64),
2458 hyperway.inbound.rx(None).await.recv(),
2459 )
2460 .await
2461 .unwrap()
2462 .unwrap();
2463 assert_eq!(wave.id(), wave_id);
2464 }
2465
2466 pub async fn test_hyperclient() {
2499 pub struct TestFactory {
2500 pub hyperway: Hyperway,
2501 }
2502
2503 impl TestFactory {
2504 pub fn new() -> Self {
2505 let hyperway = Hyperway::new(
2506 LESS.clone().to_surface(),
2507 LESS.to_agent(),
2508 Default::default(),
2509 );
2510 Self { hyperway }
2511 }
2512
2513 pub fn inbound_tx(&self) -> mpsc::Sender<UltraWave> {
2514 self.hyperway.inbound.tx()
2515 }
2516
2517 pub async fn inbound_rx(&self) -> mpsc::Receiver<UltraWave> {
2518 self.hyperway.inbound.rx(None).await
2519 }
2520
2521 pub async fn outbound_rx(&self) -> broadcast::Receiver<UltraWave> {
2522 self.hyperway.outbound.eavesdrop()
2523 }
2524
2525 pub fn outbound_tx(&self) -> mpsc::Sender<UltraWave> {
2526 self.hyperway.outbound.tx()
2527 }
2528 }
2529
2530 #[async_trait]
2531 impl HyperwayEndpointFactory for TestFactory {
2532 async fn create(
2533 &self,
2534 status_tx: mpsc::Sender<HyperConnectionDetails>,
2535 ) -> Result<HyperwayEndpoint, SpaceErr> {
2536 Ok(self.hyperway.hyperway_endpoint_far(None).await)
2537 }
2538 }
2539
2540 {
2541 let factory = Box::new(TestFactory::new());
2542 let mut inbound_rx = factory.inbound_rx().await;
2543 let root_logger = RootLogger::default();
2544 let logger = root_logger.point(Point::from_str("client").unwrap());
2545 let client = HyperClient::new(factory, logger).unwrap();
2546
2547 let client_listener_rx = client.rx();
2548
2549 client.reset();
2550
2551 let router = client.router();
2552 let wave = hello_wave();
2553 let wave_id = wave.id().clone();
2554 router.route(wave).await;
2555 let wave = tokio::time::timeout(Duration::from_secs(5u64), inbound_rx.recv())
2556 .await
2557 .unwrap()
2558 .unwrap();
2559 assert_eq!(wave.id(), wave_id);
2560 }
2561
2562 {
2563 let factory = Box::new(TestFactory::new());
2564 let outbound_tx = factory.outbound_tx();
2565 let root_logger = RootLogger::default();
2566 let logger = root_logger.point(Point::from_str("client").unwrap());
2567 let client = HyperClient::new(factory, logger).unwrap();
2568
2569 let mut client_listener_rx = client.rx();
2570
2571 let wave = hello_wave();
2572 let wave_id = wave.id().clone();
2573 outbound_tx.send(wave).await.unwrap();
2574 let wave = tokio::time::timeout(Duration::from_secs(5u64), client_listener_rx.recv())
2575 .await
2576 .unwrap()
2577 .unwrap();
2578 assert_eq!(wave.id(), wave_id);
2579 }
2580 }
2581
2582 #[tokio::test]
2583 pub async fn test_single_interchange() {
2584 let test = SingleInterchangePlatform::new().await;
2585 let less_factory = test.local_hyperway_endpoint_factory(LESS.to_surface());
2586 let fae_factory = test.local_hyperway_endpoint_factory(FAE.to_surface());
2587 let test = WaveTest::new(fae_factory, less_factory);
2588 test.go().await.unwrap();
2589 }
2590
2591 #[tokio::test]
2592 pub async fn test_dual_interchange() {
2593 let root_logger = RootLogger::default();
2594 let logger = root_logger.point(Point::from_str("point").unwrap());
2595 let interchange = Arc::new(HyperwayInterchange::new(
2596 logger.push_point("interchange").unwrap(),
2597 ));
2598
2599 interchange
2600 .add(Hyperway::new(
2601 LESS.clone().to_surface(),
2602 LESS.to_agent(),
2603 Default::default(),
2604 ))
2605 .await;
2606 interchange
2607 .add(Hyperway::new(
2608 FAE.clone().to_surface(),
2609 FAE.to_agent(),
2610 Default::default(),
2611 ))
2612 .await;
2613
2614 let auth = AnonHyperAuthenticator::new();
2615 let gate = Arc::new(MountInterchangeGate::new(
2616 auth,
2617 TestGreeter::new(),
2618 interchange.clone(),
2619 logger.push_point("gate").unwrap(),
2620 ));
2621 let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> = Arc::new(DashMap::new());
2622 gates.insert(InterchangeKind::Singleton, gate);
2623 let gate = Arc::new(HyperGateSelector::new(gates));
2624
2625 let less_factory = Box::new(LocalHyperwayGateUnlocker::new(
2626 LESS.clone().to_surface(),
2627 gate.clone(),
2628 ));
2629
2630 let fae_factory = Box::new(LocalHyperwayGateUnlocker::new(
2631 FAE.clone().to_surface(),
2632 gate.clone(),
2633 ));
2634
2635 let less_exchanger = Exchanger::new(
2636 LESS.push("exchanger").unwrap().to_surface(),
2637 Timeouts::default(),
2638 Default::default(),
2639 );
2640 let fae_exchanger = Exchanger::new(
2641 FAE.push("exchanger").unwrap().to_surface(),
2642 Timeouts::default(),
2643 Default::default(),
2644 );
2645
2646 let root_logger = RootLogger::default();
2647 let logger = root_logger.point(Point::from_str("less-client").unwrap());
2648 let less_client =
2649 HyperClient::new_with_exchanger(less_factory, Some(less_exchanger.clone()), logger)
2650 .unwrap();
2651 let logger = root_logger.point(Point::from_str("fae-client").unwrap());
2652 let fae_client =
2653 HyperClient::new_with_exchanger(fae_factory, Some(fae_exchanger.clone()), logger)
2654 .unwrap();
2655
2656 let mut less_rx = less_client.rx();
2657 let mut fae_rx = fae_client.rx();
2658
2659 let less_router = less_client.router();
2660 let less_transmitter = ProtoTransmitter::new(Arc::new(less_router), less_exchanger.clone());
2661
2662 let fae_router = fae_client.router();
2663 let fae_transmitter = ProtoTransmitter::new(Arc::new(fae_router), fae_exchanger.clone());
2664
2665 {
2666 let fae = FAE.clone();
2667 tokio::spawn(async move {
2668 let wave = fae_rx.recv().await.unwrap();
2669 let mut reflected = ReflectedProto::new();
2670 reflected.kind(ReflectedKind::Pong);
2671 reflected.status(200u16);
2672 reflected.to(wave.from().clone());
2673 reflected.from(fae.to_surface());
2674 reflected.intended(wave.to());
2675 reflected.reflection_of(wave.id());
2676 let wave = reflected.build().unwrap();
2677 let wave = wave.to_ultra();
2678 fae_transmitter.route(wave).await;
2679 });
2680 }
2681
2682 let (rtn, mut rtn_rx) = oneshot::channel();
2683 tokio::spawn(async move {
2684 let mut hello = DirectedProto::ping();
2685 hello.to(FAE.clone().to_surface());
2686 hello.from(LESS.clone().to_surface());
2687 hello.method(ExtMethod::new("Hello").unwrap());
2688 hello.body(Substance::Empty);
2689 let pong: Wave<Pong> = less_transmitter.direct(hello).await.unwrap();
2690 rtn.send(pong.core.status.as_u16() == 200u16);
2691 });
2692
2693 let result = tokio::time::timeout(Duration::from_secs(5), rtn_rx)
2694 .await
2695 .unwrap()
2696 .unwrap();
2697 assert!(result);
2698 }
2699
2700 #[tokio::test]
2701 pub async fn test_bridge() {
2702 pub fn create(name: &str) -> (Arc<HyperwayInterchange>, Arc<dyn HyperGate>) {
2703 let root_logger = RootLogger::default();
2704 let logger = root_logger.point(Point::from_str(name).unwrap());
2705 let interchange = Arc::new(HyperwayInterchange::new(
2706 logger.push_point("interchange").unwrap(),
2707 ));
2708
2709 let auth = AnonHyperAuthenticator::new();
2710 let gate = Arc::new(MountInterchangeGate::new(
2711 auth,
2712 TestGreeter::new(),
2713 interchange.clone(),
2714 logger.push_point("gate").unwrap(),
2715 ));
2716 let mut gates: Arc<DashMap<InterchangeKind, Arc<dyn HyperGate>>> =
2717 Arc::new(DashMap::new());
2718 gates.insert(InterchangeKind::Singleton, gate);
2719 (interchange, Arc::new(HyperGateSelector::new(gates)))
2720 }
2721
2722 let (less_interchange, less_gate) = create("less");
2723 let (fae_interchange, fae_gate) = create("fae");
2724
2725 {
2726 let hyperway = Hyperway::new(
2727 FAE.to_surface().with_layer(Layer::Core),
2728 Agent::HyperUser,
2729 Default::default(),
2730 );
2731 less_interchange.add(hyperway).await;
2732 let access = Hyperway::new(
2733 LESS.to_surface().with_layer(Layer::Core),
2734 Agent::HyperUser,
2735 Default::default(),
2736 );
2737 less_interchange.add(access).await;
2738 }
2739 {
2740 let hyperway = Hyperway::new(
2741 LESS.to_surface().with_layer(Layer::Core),
2742 Agent::HyperUser,
2743 Default::default(),
2744 );
2745 fae_interchange.add(hyperway).await;
2746 let access = Hyperway::new(
2747 FAE.to_surface().with_layer(Layer::Core),
2748 Agent::HyperUser,
2749 Default::default(),
2750 );
2751 fae_interchange.add(access).await;
2752 }
2753
2754 let fae_endpoint_from_less = less_interchange
2755 .mount(
2756 HyperwayStub {
2757 remote: FAE.to_surface().with_layer(Layer::Core),
2758 agent: Agent::HyperUser,
2759 },
2760 None,
2761 )
2762 .await
2763 .unwrap();
2764 let fae_factory = Box::new(LocalHyperwayGateUnlocker::new(
2765 LESS.clone().to_surface(),
2766 fae_gate.clone(),
2767 ));
2768 let logger = RootLogger::default().point(Point::from_str("bridge").unwrap());
2769 let bridge = Bridge::new(fae_endpoint_from_less, fae_factory, logger);
2770
2771 let mut less_access = less_interchange
2772 .mount(
2773 HyperwayStub {
2774 remote: LESS.to_surface().with_layer(Layer::Core),
2775 agent: Agent::HyperUser,
2776 },
2777 None,
2778 )
2779 .await
2780 .unwrap();
2781 let mut fae_access = fae_interchange
2782 .mount(
2783 HyperwayStub {
2784 remote: FAE.to_surface().with_layer(Layer::Core),
2785 agent: Agent::HyperUser,
2786 },
2787 None,
2788 )
2789 .await
2790 .unwrap();
2791
2792 tokio::spawn(async move {
2793 while let Some(wave) = fae_access.rx.recv().await {
2794 if wave.is_directed() {
2795 let directed = wave.to_directed().unwrap();
2796 let reflection = directed.reflection().unwrap();
2797 let reflection = reflection.make(
2798 ReflectedCore::ok(),
2799 FAE.to_surface().with_layer(Layer::Core),
2800 );
2801 fae_access.tx.send(reflection.to_ultra()).await.unwrap();
2802 }
2803 }
2804 });
2805
2806 let exchanger = Exchanger::new(LESS.to_surface(), Timeouts::default(), Default::default());
2807 let less_tx = less_access.tx.clone();
2808
2809 {
2810 let exchanger = exchanger.clone();
2811 tokio::spawn(async move {
2812 while let Some(wave) = less_access.rx.recv().await {
2813 if wave.is_reflected() {
2814 exchanger
2815 .reflected(wave.to_reflected().unwrap())
2816 .await
2817 .unwrap();
2818 }
2819 }
2820 });
2821 }
2822 let mut transmitter =
2823 ProtoTransmitterBuilder::new(Arc::new(TxRouter::new(less_tx.clone())), exchanger);
2824 transmitter.from = SetStrategy::Override(LESS.to_surface());
2825 transmitter.agent = SetStrategy::Override(Agent::HyperUser);
2826 let transmitter = transmitter.build();
2827 let mut wave = DirectedProto::ping();
2828 wave.method(Method::Cmd(CmdMethod::Bounce));
2829 wave.to(FAE.to_surface().with_layer(Layer::Core));
2830 let reply: Wave<Pong> =
2831 tokio::time::timeout(Duration::from_secs(5), transmitter.direct(wave))
2832 .await
2833 .unwrap()
2834 .unwrap();
2835 assert!(reply.core.status.is_success());
2836 assert_eq!(reply.core.body, Substance::Empty);
2837 assert_eq!(reply.to, LESS.to_surface());
2838 assert_eq!(reply.from, FAE.to_surface());
2839 println!("Ok");
2840 }
2841}