1pub mod asynch;
2pub mod synch;
3
4use alloc::borrow::Cow;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::sync::Arc;
8use std::time::Duration;
9
10use asynch::{
11 DirectedHandler, Exchanger, InCtx, ProtoTransmitter, ProtoTransmitterBuilder, RootInCtx, Router,
12};
13use dashmap::DashMap;
14use tokio::sync::{broadcast, mpsc, oneshot};
15
16use crate::config::bind::RouteSelector;
17use crate::loc::{Topic, ToPoint, ToSurface};
18use crate::log::{PointLogger, RootLogger, SpanLogger};
19use crate::settings::Timeouts;
20use crate::wave::core::cmd::CmdMethod;
21use crate::wave::core::http2::StatusCode;
22use crate::wave::core::{CoreBounce, Method};
23use crate::wave::exchange::asynch::AsyncRouter;
24use crate::wave::{
25 Bounce, BounceBacks, BounceProto, DirectedProto, DirectedWave, Echo, FromReflectedAggregate,
26 Handling, Pong, Recipients, RecipientSelector, ReflectedAggregate, ReflectedProto,
27 ReflectedWave, Scope, Session, ToRecipients, UltraWave, Wave, WaveId,
28};
29use crate::{Agent, ReflectedCore, SpaceErr, Substance, Surface, ToSubstance, wave};
30use crate::point::Point;
31
32#[derive(Clone)]
33pub struct DirectedHandlerShellDef<D, T> {
34 logger: PointLogger,
35 handler: D,
36 surface: Surface,
37 builder: T,
38}
39
40impl<D, T> DirectedHandlerShellDef<D, T>
41where
42 D: Sized,
43{
44 pub fn new(handler: D, builder: T, surface: Surface, logger: RootLogger) -> Self {
45 let logger = logger.point(surface.point.clone());
46 Self {
47 handler,
48 builder,
49 surface,
50 logger,
51 }
52 }
53}
54
55pub struct InternalPipeline<H> {
56 pub selector: RouteSelector,
57 pub handler: H,
58}
59
60impl<H> InternalPipeline<H> {
61 pub fn new(selector: RouteSelector, mut handler: H) -> Self {
62 Self { selector, handler }
63 }
64}
65
66pub struct RootInCtxDef<T> {
67 pub to: Surface,
68 pub wave: DirectedWave,
69 pub session: Option<Session>,
70 pub logger: SpanLogger,
71 pub transmitter: T,
72}
73
74impl<T> RootInCtxDef<T> {
75 pub fn new(wave: DirectedWave, to: Surface, logger: SpanLogger, transmitter: T) -> Self {
76 Self {
77 wave,
78 to,
79 logger,
80 session: None,
81 transmitter,
82 }
83 }
84
85 pub fn status(self, status: u16, from: Surface) -> Bounce<ReflectedWave> {
86 match self.wave {
87 DirectedWave::Ping(ping) => Bounce::Reflected(ReflectedWave::Pong(Wave::new(
88 Pong::new(
89 ReflectedCore::status(status),
90 ping.from.clone(),
91 self.to.clone().to_recipients(),
92 ping.id.clone(),
93 ),
94 from,
95 ))),
96 DirectedWave::Ripple(ripple) => Bounce::Reflected(ReflectedWave::Echo(Wave::new(
97 Echo::new(
98 ReflectedCore::status(status),
99 ripple.from.clone(),
100 ripple.to.clone(),
101 ripple.id.clone(),
102 ),
103 from,
104 ))),
105 DirectedWave::Signal(_) => Bounce::Absorbed,
106 }
107 }
108
109 pub fn err(self, status: u16, from: Surface, msg: String) -> Bounce<ReflectedWave> {
110 match self.wave {
111 DirectedWave::Ping(ping) => Bounce::Reflected(ReflectedWave::Pong(Wave::new(
112 Pong::new(
113 ReflectedCore::fail(status, msg),
114 ping.from.clone(),
115 self.to.clone().to_recipients(),
116 ping.id.clone(),
117 ),
118 from,
119 ))),
120 DirectedWave::Ripple(ripple) => Bounce::Reflected(ReflectedWave::Echo(Wave::new(
121 Echo::new(
122 ReflectedCore::fail(status, msg),
123 ripple.from.clone(),
124 ripple.to.clone(),
125 ripple.id.clone(),
126 ),
127 from,
128 ))),
129 DirectedWave::Signal(_) => Bounce::Absorbed,
130 }
131 }
132
133 pub fn not_found(self) -> Bounce<ReflectedWave> {
134 let to = self.to.clone();
135 let msg = format!(
136 "<{}>{}",
137 self.wave.core().method.to_string(),
138 self.wave.core().uri.path().to_string()
139 );
140 self.err(404, to, msg)
141 }
142
143 pub fn timeout(self) -> Bounce<ReflectedWave> {
144 let to = self.to.clone();
145 self.status(408, to)
146 }
147
148 pub fn bad_request(self) -> Bounce<ReflectedWave> {
149 let to = self.to.clone();
150 let msg = format!(
151 "<{}>{} -[ {} ]->",
152 self.wave.core().method.to_string(),
153 self.wave.core().uri.path().to_string(),
154 self.wave.core().body.kind().to_string()
155 );
156 self.err(400, to, msg)
157 }
158
159 pub fn server_error(self) -> Bounce<ReflectedWave> {
160 let to = self.to.clone();
161 self.status(500, to)
162 }
163
164 pub fn forbidden(self) -> Bounce<ReflectedWave> {
165 let to = self.to.clone();
166 let msg = format!(
167 "<{}>{} -[ {} ]->",
168 self.wave.core().method.to_string(),
169 self.wave.core().uri.path().to_string(),
170 self.wave.core().body.kind().to_string()
171 );
172 self.err(401, to, msg)
173 }
174
175 pub fn unavailable(self) -> Bounce<ReflectedWave> {
176 let to = self.to.clone();
177 self.status(503, to)
178 }
179
180 pub fn unauthorized(self) -> Bounce<ReflectedWave> {
181 let to = self.to.clone();
182 self.status(403, to)
183 }
184}
185
186pub struct InCtxDef<'a, I, T>
187where
188 T: Clone,
189{
190 root: &'a RootInCtxDef<T>,
191 pub transmitter: Cow<'a, T>,
192 pub input: &'a I,
193 pub logger: SpanLogger,
194}
195
196impl<'a, I, T> Deref for InCtxDef<'a, I, T>
197where
198 T: Clone,
199{
200 type Target = I;
201
202 fn deref(&self) -> &Self::Target {
203 self.input
204 }
205}
206
207impl<'a, I, T> InCtxDef<'a, I, T>
208where
209 T: Clone,
210{
211 pub fn new(
212 root: &'a RootInCtxDef<T>,
213 input: &'a I,
214 tx: Cow<'a, T>,
215 logger: SpanLogger,
216 ) -> Self {
217 Self {
218 root,
219 input,
220 logger,
221 transmitter: tx,
222 }
223 }
224
225 pub fn from(&self) -> &Surface {
226 self.root.wave.from()
227 }
228
229 pub fn to(&self) -> &Surface {
230 &self.root.to
231 }
232
233 pub fn push(self) -> InCtxDef<'a, I, T> {
234 InCtxDef {
235 root: self.root,
236 input: self.input,
237 logger: self.logger.span(),
238 transmitter: self.transmitter.clone(),
239 }
240 }
241
242 pub fn push_input_ref<I2>(self, input: &'a I2) -> InCtxDef<'a, I2, T> {
243 InCtxDef {
244 root: self.root,
245 input,
246 logger: self.logger.clone(),
247 transmitter: self.transmitter.clone(),
248 }
249 }
250
251 pub fn wave(&self) -> &DirectedWave {
252 &self.root.wave
253 }
254
255 pub fn ok_body(self, substance: Substance) -> ReflectedCore {
263 self.root.wave.core().ok_body(substance)
264 }
265
266 pub fn not_found(self) -> ReflectedCore {
267 self.root.wave.core().not_found()
268 }
269
270 pub fn forbidden(self) -> ReflectedCore {
271 self.root.wave.core().forbidden()
272 }
273
274 pub fn bad_request(self) -> ReflectedCore {
275 self.root.wave.core().bad_request()
276 }
277
278 pub fn err(self, err: SpaceErr) -> ReflectedCore {
279 self.root.wave.core().err(err)
280 }
281}
282
283#[derive(Clone)]
284pub struct BroadTxRouter {
285 pub tx: broadcast::Sender<UltraWave>,
286}
287
288impl BroadTxRouter {
289 pub fn new(tx: broadcast::Sender<UltraWave>) -> Self {
290 Self { tx }
291 }
292}
293
294#[derive(Clone)]
295pub struct ProtoTransmitterBuilderDef<R, E> {
296 pub agent: SetStrategy<Agent>,
297 pub scope: SetStrategy<Scope>,
298 pub handling: SetStrategy<Handling>,
299 pub method: SetStrategy<Method>,
300 pub via: SetStrategy<Surface>,
301 pub from: SetStrategy<Surface>,
302 pub to: SetStrategy<Recipients>,
303 pub router: R,
304 pub exchanger: E,
305}
306
307impl<R, E> ProtoTransmitterBuilderDef<R, E> {
308 pub fn build(self) -> ProtoTransmitterDef<R, E> {
309 ProtoTransmitterDef {
310 agent: self.agent,
311 scope: self.scope,
312 handling: self.handling,
313 method: self.method,
314 from: self.from,
315 to: self.to,
316 via: self.via,
317 router: self.router,
318 exchanger: self.exchanger,
319 }
320 }
321}
322
323#[derive(Clone)]
324pub struct ProtoTransmitterDef<R, E> {
325 agent: SetStrategy<Agent>,
326 scope: SetStrategy<Scope>,
327 handling: SetStrategy<Handling>,
328 method: SetStrategy<Method>,
329 from: SetStrategy<Surface>,
330 to: SetStrategy<Recipients>,
331 via: SetStrategy<Surface>,
332 router: R,
333 exchanger: E,
334}
335
336impl<R, E> ProtoTransmitterDef<R, E> {
337 pub fn from_topic(&mut self, topic: Topic) -> Result<(), SpaceErr> {
338 self.from = match self.from.clone() {
339 SetStrategy::None => {
340 return Err(SpaceErr::server_error(
341 "cannot set Topic without first setting Surface",
342 ));
343 }
344 SetStrategy::Fill(from) => SetStrategy::Fill(from.with_topic(topic)),
345 SetStrategy::Override(from) => SetStrategy::Override(from.with_topic(topic)),
346 };
347 Ok(())
348 }
349
350 fn prep_direct(&self, wave: &mut DirectedProto) {
351 match &self.from {
352 SetStrategy::None => {}
353 SetStrategy::Fill(from) => wave.fill_from(from.clone()),
354 SetStrategy::Override(from) => wave.from(from.clone()),
355 }
356
357 match &self.to {
358 SetStrategy::None => {}
359 SetStrategy::Fill(to) => wave.fill_to(to.clone()),
360 SetStrategy::Override(to) => wave.to(to),
361 }
362
363 match &self.via {
364 SetStrategy::None => {}
365 SetStrategy::Fill(via) => wave.fill_via(via.clone()),
366 SetStrategy::Override(via) => wave.via(via),
367 }
368
369 match &self.agent {
370 SetStrategy::None => {}
371 SetStrategy::Fill(agent) => wave.fill_agent(agent),
372 SetStrategy::Override(agent) => wave.agent(agent.clone()),
373 }
374
375 match &self.scope {
376 SetStrategy::None => {}
377 SetStrategy::Fill(scope) => wave.fill_scope(scope),
378 SetStrategy::Override(scope) => wave.scope(scope.clone()),
379 }
380
381 match &self.handling {
382 SetStrategy::None => {}
383 SetStrategy::Fill(handling) => wave.fill_handling(handling),
384 SetStrategy::Override(handling) => wave.handling(handling.clone()),
385 }
386
387 match &self.method {
388 SetStrategy::None => {}
389 SetStrategy::Fill(method) => wave.fill_method(method),
390 SetStrategy::Override(handling) => wave.method(handling.clone()),
391 }
392 }
393
394 fn prep_reflect(&self, wave: &mut ReflectedProto) {
395 match &self.from {
396 SetStrategy::None => {}
397 SetStrategy::Fill(from) => wave.fill_from(from),
398 SetStrategy::Override(from) => wave.from(from.clone()),
399 }
400
401 match &self.agent {
402 SetStrategy::None => {}
403 SetStrategy::Fill(agent) => wave.fill_agent(agent),
404 SetStrategy::Override(agent) => wave.agent(agent.clone()),
405 }
406
407 match &self.scope {
408 SetStrategy::None => {}
409 SetStrategy::Fill(scope) => wave.fill_scope(scope),
410 SetStrategy::Override(scope) => wave.scope(scope.clone()),
411 }
412
413 match &self.handling {
414 SetStrategy::None => {}
415 SetStrategy::Fill(handling) => wave.fill_handling(handling),
416 SetStrategy::Override(handling) => wave.handling(handling.clone()),
417 }
418 }
419}
420
421#[derive(Clone, strum_macros::Display)]
422pub enum SetStrategy<T> {
423 None,
425 Fill(T),
428 Override(T),
431}
432
433impl<T> SetStrategy<T> {
434 pub fn unwrap(self) -> Result<T, SpaceErr> {
435 match self {
436 SetStrategy::None => Err("cannot unwrap a SetStrategy::None".into()),
437 SetStrategy::Fill(t) => Ok(t),
438 SetStrategy::Override(t) => Ok(t),
439 }
440 }
441}
442
443impl SetStrategy<Surface> {
444 pub fn with_topic(self, topic: Topic) -> Result<Self, SpaceErr> {
445 match self {
446 SetStrategy::None => Err("cannot set topic if Strategy is None".into()),
447 SetStrategy::Fill(surface) => Ok(SetStrategy::Fill(surface.with_topic(topic))),
448 SetStrategy::Override(surface) => Ok(SetStrategy::Override(surface.with_topic(topic))),
449 }
450 }
451}