1use crate::loc::{ToPoint, ToSurface};
2use crate::log::{PointLogger, RootLogger, Trackable, Tracker};
3use crate::particle::traversal::Traversal;
4use crate::settings::Timeouts;
5use crate::wave::core::cmd::CmdMethod;
6use crate::wave::core::http2::StatusCode;
7use crate::wave::core::CoreBounce;
8use crate::wave::exchange::{
9 BroadTxRouter, DirectedHandlerShellDef, InCtxDef, ProtoTransmitterBuilderDef,
10 ProtoTransmitterDef, RootInCtxDef, SetStrategy,
11};
12use crate::wave::{
13 BounceBacks, BounceProto, DirectedKind, DirectedProto, DirectedWave, Echo,
14 FromReflectedAggregate, Handling, Pong, RecipientSelector, ReflectedAggregate, ReflectedProto,
15 ReflectedWave, Scope, UltraWave, Wave, WaveId,
16};
17use crate::{Agent, ReflectedCore, SpaceErr, Substance, Surface, ToSubstance};
18use alloc::borrow::Cow;
19use dashmap::{DashMap, DashSet};
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::{mpsc, oneshot};
24use crate::point::Point;
25
26#[async_trait]
27impl Router for TxRouter {
28 async fn route(&self, wave: UltraWave) {
29 self.tx.send(wave).await;
30 }
31}
32
33#[async_trait]
34impl Router for BroadTxRouter {
35 async fn route(&self, wave: UltraWave) {
36 self.tx.send(wave);
37 }
38}
39
40#[async_trait]
41pub trait Router: Send + Sync {
42 async fn route(&self, wave: UltraWave);
43}
44
45#[async_trait]
46pub trait TraversalRouter: Send + Sync {
47 async fn traverse(&self, traversal: Traversal<UltraWave>) -> Result<(), SpaceErr>;
48}
49
50#[derive(Clone)]
51pub struct AsyncRouter {
52 pub router: Arc<dyn Router>,
53}
54
55impl AsyncRouter {
56 pub fn new(router: Arc<dyn Router>) -> Self {
57 Self { router }
58 }
59}
60
61#[async_trait]
62impl Router for AsyncRouter {
63 async fn route(&self, wave: UltraWave) {
64 self.router.route(wave).await
65 }
66}
67
68pub type ProtoTransmitter = ProtoTransmitterDef<AsyncRouter, Exchanger>;
69
70impl ProtoTransmitter {
71 pub fn new(router: Arc<dyn Router>, exchanger: Exchanger) -> ProtoTransmitter {
72 let router = AsyncRouter::new(router);
73 Self {
74 from: SetStrategy::None,
75 to: SetStrategy::None,
76 agent: SetStrategy::Fill(Agent::Anonymous),
77 scope: SetStrategy::Fill(Scope::None),
78 handling: SetStrategy::Fill(Handling::default()),
79 method: SetStrategy::None,
80 via: SetStrategy::None,
81 router,
82 exchanger,
83 }
84 }
85
86 pub async fn direct<D, W>(&self, wave: D) -> Result<W, SpaceErr>
87 where
88 W: FromReflectedAggregate,
89 D: Into<DirectedProto>,
90 {
91 let mut wave: DirectedProto = wave.into();
92
93 self.prep_direct(&mut wave);
94
95 let directed = wave.build()?;
96
97 match directed.bounce_backs() {
98 BounceBacks::None => {
99 self.router.route(directed.to_ultra()).await;
100 FromReflectedAggregate::from_reflected_aggregate(ReflectedAggregate::None)
101 }
102 _ => {
103 let reflected_rx = self.exchanger.exchange(&directed).await;
104 self.router.route(directed.to_ultra()).await;
105 let reflected_agg = reflected_rx.await?;
106 FromReflectedAggregate::from_reflected_aggregate(reflected_agg)
107 }
108 }
109 }
110
111 pub async fn ping<D>(&self, ping: D) -> Result<Wave<Pong>, SpaceErr>
112 where
113 D: Into<DirectedProto>,
114 {
115 let mut ping: DirectedProto = ping.into();
116 if let Some(DirectedKind::Ping) = ping.kind {
117 self.direct(ping).await
118 } else {
119 Err(SpaceErr::server_error("expected DirectedKind::Ping"))
120 }
121 }
122
123 pub async fn ripple<D>(&self, ripple: D) -> Result<Vec<Wave<Echo>>, SpaceErr>
124 where
125 D: Into<DirectedProto>,
126 {
127 let mut ripple: DirectedProto = ripple.into();
128 if let Some(DirectedKind::Ripple) = ripple.kind {
129 self.direct(ripple).await
130 } else {
131 Err(SpaceErr::server_error("expected DirectedKind::Ping"))
132 }
133 }
134
135 pub async fn signal<D>(&self, signal: D) -> Result<(), SpaceErr>
136 where
137 D: Into<DirectedProto>,
138 {
139 let mut signal: DirectedProto = signal.into();
140 if let Some(DirectedKind::Signal) = signal.kind {
141 self.direct(signal).await
142 } else {
143 Err(SpaceErr::server_error("expected DirectedKind::Ping"))
144 }
145 }
146
147 pub async fn bounce_from(&self, to: &Surface, from: &Surface) -> bool {
148 let mut directed = DirectedProto::ping();
149 directed.from(from.clone());
150 directed.to(to.clone());
151 directed.method(CmdMethod::Bounce);
152 match self.direct(directed).await {
153 Ok(pong) => {
154 let pong: Wave<Pong> = pong;
155 pong.is_ok()
156 }
157 Err(_) => false,
158 }
159 }
160
161 pub async fn bounce(&self, to: &Surface) -> bool {
162 let mut direct = DirectedProto::ping();
163 direct.to(to.clone());
164 direct.method(CmdMethod::Bounce);
165 match self.direct(direct).await {
166 Ok(pong) => {
167 let pong: Wave<Pong> = pong;
168 pong.is_ok()
169 }
170 Err(_) => false,
171 }
172 }
173
174 pub async fn route(&self, wave: UltraWave) {
175 self.router.route(wave).await
176 }
177
178 pub async fn reflect<W>(&self, wave: W) -> Result<(), SpaceErr>
179 where
180 W: Into<ReflectedProto>,
181 {
182 let mut wave: ReflectedProto = wave.into();
183
184 self.prep_reflect(&mut wave);
185
186 let wave = wave.build()?;
187 let wave = wave.to_ultra();
188 self.router.route(wave).await;
189
190 Ok(())
191 }
192}
193
194pub type ProtoTransmitterBuilder = ProtoTransmitterBuilderDef<AsyncRouter, Exchanger>;
195
196impl ProtoTransmitterBuilder {
197 pub fn new(router: Arc<dyn Router>, exchanger: Exchanger) -> ProtoTransmitterBuilder {
198 let router = AsyncRouter::new(router);
199 Self {
200 from: SetStrategy::None,
201 to: SetStrategy::None,
202 via: SetStrategy::None,
203 agent: SetStrategy::Fill(Agent::Anonymous),
204 scope: SetStrategy::Fill(Scope::None),
205 handling: SetStrategy::Fill(Handling::default()),
206 method: SetStrategy::None,
207 router,
208 exchanger,
209 }
210 }
211}
212
213pub type TraversalTransmitter = ProtoTransmitterDef<Arc<dyn TraversalRouter>, Exchanger>;
214
215impl TraversalTransmitter {
216 pub fn new(router: Arc<dyn TraversalRouter>, exchanger: Exchanger) -> Self {
217 Self {
218 agent: SetStrategy::None,
219 scope: SetStrategy::None,
220 handling: SetStrategy::None,
221 method: SetStrategy::None,
222 from: SetStrategy::None,
223 to: SetStrategy::None,
224 via: SetStrategy::None,
225 router,
226 exchanger,
227 }
228 }
229
230 pub async fn direct<W>(&self, traversal: Traversal<DirectedWave>) -> Result<W, SpaceErr>
231 where
232 W: FromReflectedAggregate,
233 {
234 match traversal.bounce_backs() {
235 BounceBacks::None => {
236 self.router.traverse(traversal.wrap()).await;
237 FromReflectedAggregate::from_reflected_aggregate(ReflectedAggregate::None)
238 }
239 _ => {
240 let reflected_rx = self.exchanger.exchange(&traversal.payload).await;
241 self.router.traverse(traversal.wrap()).await;
242 let reflected_agg = reflected_rx.await?;
243 FromReflectedAggregate::from_reflected_aggregate(reflected_agg)
244 }
245 }
246 }
247}
248
249pub type RootInCtx = RootInCtxDef<ProtoTransmitter>;
250
251pub type InCtx<'a, I> = InCtxDef<'a, I, ProtoTransmitter>;
252
253impl<'a, I> InCtx<'a, I> {
254 pub fn push_from(self, from: Surface) -> InCtx<'a, I> {
255 let mut transmitter = self.transmitter.clone();
256 transmitter.to_mut().from = SetStrategy::Override(from);
257 InCtx {
258 root: self.root,
259 input: self.input,
260 logger: self.logger.clone(),
261 transmitter,
262 }
263 }
264}
265
266#[async_trait]
267pub trait DirectedHandlerSelector {
268 fn select<'a>(&self, select: &'a RecipientSelector<'a>) -> Result<&dyn DirectedHandler, ()>;
269}
270
271#[async_trait]
272pub trait DirectedHandler: Send + Sync {
273 async fn handle(&self, ctx: RootInCtx) -> CoreBounce;
274
275 async fn bounce(&self, ctx: RootInCtx) -> CoreBounce {
276 CoreBounce::Reflected(ReflectedCore::ok())
277 }
278}
279
280#[derive(Clone)]
281pub struct TxRouter {
282 pub tx: mpsc::Sender<UltraWave>,
283}
284
285impl TxRouter {
286 pub fn new(tx: mpsc::Sender<UltraWave>) -> Self {
287 Self { tx }
288 }
289}
290
291#[derive(Clone)]
292pub struct Exchanger {
293 pub surface: Surface,
294 pub multis: Arc<DashMap<WaveId, mpsc::Sender<ReflectedWave>>>,
295 pub singles: Arc<DashMap<WaveId, oneshot::Sender<ReflectedAggregate>>>,
296 pub timeouts: Timeouts,
297 pub logger: PointLogger,
298 #[cfg(test)]
299 pub claimed: Arc<DashSet<String>>,
300}
301
302impl Exchanger {
303 pub fn new(surface: Surface, timeouts: Timeouts, logger: PointLogger) -> Self {
304 let logger = logger.point(surface.point.clone());
305 Self {
306 surface,
307 singles: Arc::new(DashMap::new()),
308 multis: Arc::new(DashMap::new()),
309 timeouts,
310 logger,
311 #[cfg(test)]
312 claimed: Arc::new(DashSet::new()),
313 }
314 }
315
316 pub fn with_surface(&self, surface: Surface) -> Self {
317 let logger = self.logger.point(surface.point.clone());
318 Self {
319 surface,
320 singles: self.singles.clone(),
321 multis: self.multis.clone(),
322 timeouts: self.timeouts.clone(),
323 logger,
324 #[cfg(test)]
325 claimed: self.claimed.clone(),
326 }
327 }
328
329 pub async fn reflected(&self, reflect: ReflectedWave) -> Result<(), SpaceErr> {
330 self.logger
331 .track(&reflect, || Tracker::new("exchange", "Reflected"));
332
333 if let Some(multi) = self.multis.get(reflect.reflection_of()) {
334 multi.value().send(reflect).await;
335 } else if let Some((_, tx)) = self.singles.remove(reflect.reflection_of()) {
336 #[cfg(test)]
337 self.claimed.insert(reflect.reflection_of().to_string());
338 tx.send(ReflectedAggregate::Single(reflect));
339 } else {
340 let reflect = reflect.to_ultra();
341 let kind = match &reflect {
342 UltraWave::Ping(_) => "Ping",
343 UltraWave::Pong(_) => "Pong",
344 UltraWave::Ripple(_) => "Ripple",
345 UltraWave::Echo(_) => "Echo",
346 UltraWave::Signal(_) => "Signal",
347 };
348 let reflect = reflect.to_reflected()?;
349
350 #[cfg(test)]
351 if self
352 .claimed
353 .contains(reflect.reflection_of().to_string().as_str())
354 {
355 return Err(SpaceErr::server_error(format!(
356 "Reflection already claimed for {} from: {} to: {} KIND: {} STATUS: {}",
357 reflect.reflection_of().to_short_string(),
358 reflect.from().to_string(),
359 reflect.to().to_string(),
360 kind,
361 reflect.core().status.to_string()
362 )));
363 }
364 return Err(SpaceErr::server_error(format!(
365 "Not expecting reflected message for {} from: {} to: {} KIND: {} STATUS: {}",
366 reflect.reflection_of().to_short_string(),
367 reflect.from().to_string(),
368 reflect.to().to_string(),
369 kind,
370 reflect.core().status.to_string()
371 )));
372 }
373 Ok(())
374 }
375
376 pub async fn exchange(&self, directed: &DirectedWave) -> oneshot::Receiver<ReflectedAggregate> {
377 let (tx, rx) = oneshot::channel();
378
379 let mut reflected = match directed.reflected_proto() {
380 BounceProto::Absorbed => {
381 return rx;
382 }
383 BounceProto::Reflected(reflected) => reflected,
384 };
385
386 reflected.from(self.surface.clone());
387
388 let reflection = directed.reflection().unwrap();
389
390 let timeout = self.timeouts.from(directed.handling().wait.clone());
391 self.singles.insert(directed.id().clone(), tx);
392 match directed.bounce_backs() {
393 BounceBacks::None => {
394 panic!("we already dealt with this")
395 }
396 BounceBacks::Single => {
397 let singles = self.singles.clone();
398 tokio::spawn(async move {
399 tokio::time::sleep(Duration::from_secs(timeout)).await;
400 let id = reflected.reflection_of.as_ref().unwrap();
401 if let Some((_, tx)) = singles.remove(id) {
402 reflected.status = Some(StatusCode::from_u16(408).unwrap());
403 reflected.body = Some(Substance::Empty);
404 reflected.intended = Some(reflection.intended);
405 let reflected = reflected.build().unwrap();
406 tx.send(ReflectedAggregate::Single(reflected));
407 }
408 });
409 }
410 BounceBacks::Count(count) => {
411 let (tx, mut rx) = mpsc::channel(count);
412 self.multis.insert(directed.id().clone(), tx);
413 let singles = self.singles.clone();
414 let id = directed.id().clone();
415 tokio::spawn(async move {
416 let mut agg = vec![];
417 loop {
418 if let Some(reflected) = rx.recv().await {
419 agg.push(reflected);
420 if count == agg.len() {
421 if let Some((_, tx)) = singles.remove(&id) {
422 tx.send(ReflectedAggregate::Multi(agg));
423 break;
424 }
425 }
426 } else {
427 if let Some((_, tx)) = singles.remove(&id) {
429 reflected.status = Some(StatusCode::from_u16(408).unwrap());
430 reflected.body = Some(Substance::Empty);
431 reflected.intended = Some(reflection.intended);
432 let reflected = reflected.build().unwrap();
433 tx.send(ReflectedAggregate::Multi(vec![reflected]));
434 break;
435 }
436 }
437 }
438 });
439
440 let id = directed.id().clone();
441 let multis = self.multis.clone();
442 tokio::spawn(async move {
443 tokio::time::sleep(Duration::from_secs(timeout)).await;
444 multis.remove(&id);
446 });
447 }
448 BounceBacks::Timer(wait) => {
449 let (tx, mut rx) = mpsc::channel(32);
450 self.multis.insert(directed.id().clone(), tx);
451 let singles = self.singles.clone();
452 let id = directed.id().clone();
453 tokio::spawn(async move {
454 let mut agg = vec![];
455 loop {
456 if let Some(reflected) = rx.recv().await {
457 agg.push(reflected);
458 } else {
459 if let Some((_, tx)) = singles.remove(&id) {
461 tx.send(ReflectedAggregate::Multi(agg));
462 break;
463 }
464 }
465 }
466 });
467
468 let id = directed.id().clone();
469 let multis = self.multis.clone();
470 let timeout = self.timeouts.from(wait);
471 tokio::spawn(async move {
472 tokio::time::sleep(Duration::from_secs(timeout)).await;
473 multis.remove(&id);
475 });
476 }
477 }
478
479 rx
480 }
481}
482
483impl Default for Exchanger {
484 fn default() -> Self {
485 Self::new(
486 Point::root().to_surface(),
487 Default::default(),
488 RootLogger::default().point(Point::root()),
489 )
490 }
491}
492
493pub type DirectedHandlerShell<D> = DirectedHandlerShellDef<D, ProtoTransmitterBuilder>;
494
495impl<D> DirectedHandlerShell<D>
496where
497 D: DirectedHandler,
498{
499 pub async fn handle(&self, wave: DirectedWave) {
500 let logger = self
501 .logger
502 .point(self.surface.clone().to_point())
503 .spanner(&wave);
504 let mut transmitter = self.builder.clone().build();
505 let reflection = wave.reflection();
506 let ctx = RootInCtx::new(wave, self.surface.clone(), logger, transmitter);
507 match self.handler.handle(ctx).await {
508 CoreBounce::Absorbed => {}
509 CoreBounce::Reflected(reflected) => {
510 let wave = reflection.unwrap().make(reflected, self.surface.clone());
511 let wave = wave.to_ultra();
512 let transmitter = self.builder.clone().build();
513 transmitter.route(wave).await;
514 }
515 }
516 }
517}
518
519impl RootInCtx {
520 pub fn push<'a, I>(&self) -> Result<InCtx<I>, SpaceErr>
521 where
522 Substance: ToSubstance<I>,
523 {
524 let input = match self.wave.to_substance_ref() {
525 Ok(input) => input,
526 Err(err) => return Err(err.into()),
527 };
528 Ok(InCtx {
529 root: self,
530 input,
531 logger: self.logger.clone(),
532 transmitter: Cow::Borrowed(&self.transmitter),
533 })
534 }
535}