1#![allow(clippy::type_complexity)]
2use core::marker::PhantomData;
14
15use crate::pipeline::{ChainCall, IntoRefStep, PipelineChain, RefStepCall};
16use crate::world::World;
17
18#[diagnostic::on_unimplemented(
46 message = "`{Source}` cannot be viewed as `{Self}`",
47 note = "implement `View<{Source}>` for your view marker type"
48)]
49pub unsafe trait View<Source> {
65 type ViewType<'a>
67 where
68 Source: 'a;
69
70 type StaticViewType: 'static;
73
74 fn view(source: &Source) -> Self::ViewType<'_>;
76}
77
78#[inline(always)]
95fn with_view<Source, V, R>(source: &Source, f: impl for<'a> FnOnce(&'a V::StaticViewType) -> R) -> R
96where
97 V: View<Source>,
98{
99 let view = V::view(source);
100 let static_ref: &V::StaticViewType =
110 unsafe { &*(std::ptr::from_ref(&view) as *const V::StaticViewType) };
111 let result = f(static_ref);
112 drop(view);
113 result
114}
115
116pub struct ViewScope<In, Out, V: View<Out>, PrevChain, InnerSteps> {
125 prev_chain: PrevChain,
126 inner: InnerSteps,
127 _marker: PhantomData<(fn(In) -> Out, V)>,
128}
129
130impl<In, Out, V: View<Out>, PrevChain> ViewScope<In, Out, V, PrevChain, ()> {
131 pub(crate) fn new(prev_chain: PrevChain) -> Self {
132 ViewScope {
133 prev_chain,
134 inner: (),
135 _marker: PhantomData,
136 }
137 }
138}
139
140impl<In, Out, V: View<Out>, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps> {
143 pub fn tap<Params, S: IntoRefStep<V::StaticViewType, (), Params>>(
146 self,
147 f: S,
148 registry: &crate::world::Registry,
149 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
150 ViewScope {
151 prev_chain: self.prev_chain,
152 inner: (self.inner, ViewTap(f.into_ref_step(registry))),
153 _marker: PhantomData,
154 }
155 }
156
157 pub fn inspect<S: IntoRefStep<V::StaticViewType, (), ()>>(
160 self,
161 f: S,
162 registry: &crate::world::Registry,
163 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
164 self.tap(f, registry)
165 }
166
167 pub fn filter<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
171 self,
172 f: S,
173 registry: &crate::world::Registry,
174 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
175 self.guard(f, registry)
176 }
177
178 pub fn guard<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
181 self,
182 f: S,
183 registry: &crate::world::Registry,
184 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
185 ViewScope {
186 prev_chain: self.prev_chain,
187 inner: (self.inner, ViewGuard(f.into_ref_step(registry))),
188 _marker: PhantomData,
189 }
190 }
191}
192
193#[doc(hidden)]
196pub struct ViewTap<S>(S);
197
198#[doc(hidden)]
199pub struct ViewGuard<S>(S);
200
201#[doc(hidden)]
204pub trait ViewSteps<V> {
205 fn run(&mut self, world: &mut World, view: &V) -> bool;
206}
207
208impl<V> ViewSteps<V> for () {
209 fn run(&mut self, _world: &mut World, _view: &V) -> bool {
210 true
211 }
212}
213
214impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = ()>> ViewSteps<V> for (Prev, ViewTap<S>) {
215 fn run(&mut self, world: &mut World, view: &V) -> bool {
216 if !self.0.run(world, view) {
217 return false;
218 }
219 self.1.0.call(world, view);
220 true
221 }
222}
223
224impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = bool>> ViewSteps<V> for (Prev, ViewGuard<S>) {
225 fn run(&mut self, world: &mut World, view: &V) -> bool {
226 if !self.0.run(world, view) {
227 return false;
228 }
229 self.1.0.call(world, view)
230 }
231}
232
233impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
238where
239 PrevChain: ChainCall<In, Out = Out>,
240 V: View<Out>,
241 InnerSteps: ViewSteps<V::StaticViewType>,
242{
243 pub fn end_view(self) -> PipelineChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
245 PipelineChain {
246 chain: ViewNode {
247 prev: self.prev_chain,
248 inner: self.inner,
249 _marker: PhantomData,
250 },
251 _marker: PhantomData,
252 }
253 }
254
255 pub fn end_view_guarded(
257 self,
258 ) -> PipelineChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
259 PipelineChain {
260 chain: ViewGuardedNode {
261 prev: self.prev_chain,
262 inner: self.inner,
263 _marker: PhantomData,
264 },
265 _marker: PhantomData,
266 }
267 }
268}
269
270#[doc(hidden)]
275pub struct ViewNode<Prev, Inner, V> {
276 prev: Prev,
277 inner: Inner,
278 _marker: PhantomData<V>,
279}
280
281impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewNode<Prev, Inner, V>
282where
283 Prev: ChainCall<In, Out = Out>,
284 V: View<Out>,
285 Inner: ViewSteps<V::StaticViewType>,
286{
287 type Out = Out;
288
289 fn call(&mut self, world: &mut World, input: In) -> Out {
290 let event = self.prev.call(world, input);
291 with_view::<Out, V, ()>(&event, |view| {
292 self.inner.run(world, view);
293 });
294 event
295 }
296}
297
298#[doc(hidden)]
299pub struct ViewGuardedNode<Prev, Inner, V> {
300 prev: Prev,
301 inner: Inner,
302 _marker: PhantomData<V>,
303}
304
305impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewGuardedNode<Prev, Inner, V>
306where
307 Prev: ChainCall<In, Out = Out>,
308 V: View<Out>,
309 Inner: ViewSteps<V::StaticViewType>,
310{
311 type Out = Option<Out>;
312
313 fn call(&mut self, world: &mut World, input: In) -> Option<Out> {
314 let event = self.prev.call(world, input);
315 let pass = with_view::<Out, V, bool>(&event, |view| self.inner.run(world, view));
316 if pass { Some(event) } else { None }
317 }
318}
319
320impl<In> crate::pipeline::PipelineBuilder<In> {
325 pub fn view<V: View<In>>(self) -> ViewScope<In, In, V, crate::pipeline::IdentityNode, ()> {
327 ViewScope::new(crate::pipeline::IdentityNode)
328 }
329}
330
331impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
332 pub fn view<V: View<Out>>(self) -> ViewScope<In, Out, V, Chain, ()> {
339 ViewScope::new(self.chain)
340 }
341}
342
343impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
348where
349 PrevChain: ChainCall<In, Out = Out>,
350 V: View<Out>,
351 InnerSteps: ViewSteps<V::StaticViewType>,
352 Out: 'static,
353{
354 pub fn end_view_dag(self) -> crate::dag::DagChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
356 crate::dag::DagChain {
357 chain: ViewNode {
358 prev: self.prev_chain,
359 inner: self.inner,
360 _marker: PhantomData,
361 },
362 _marker: PhantomData,
363 }
364 }
365
366 pub fn end_view_dag_guarded(
368 self,
369 ) -> crate::dag::DagChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
370 crate::dag::DagChain {
371 chain: ViewGuardedNode {
372 prev: self.prev_chain,
373 inner: self.inner,
374 _marker: PhantomData,
375 },
376 _marker: PhantomData,
377 }
378 }
379
380 pub fn end_view_arm(self) -> crate::dag::DagArm<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
382 crate::dag::DagArm {
383 chain: ViewNode {
384 prev: self.prev_chain,
385 inner: self.inner,
386 _marker: PhantomData,
387 },
388 _marker: PhantomData,
389 }
390 }
391
392 pub fn end_view_arm_guarded(
394 self,
395 ) -> crate::dag::DagArm<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
396 crate::dag::DagArm {
397 chain: ViewGuardedNode {
398 prev: self.prev_chain,
399 inner: self.inner,
400 _marker: PhantomData,
401 },
402 _marker: PhantomData,
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use crate::{PipelineBuilder, Res, ResMut, Resource, WorldBuilder};
411
412 struct AuditLog(Vec<String>);
415 impl Resource for AuditLog {}
416
417 struct RiskLimits {
418 max_qty: u64,
419 }
420 impl Resource for RiskLimits {}
421
422 struct OrderView<'a> {
424 symbol: &'a str,
425 qty: u64,
426 }
427
428 struct NewOrderCommand {
429 source: String,
430 symbol: String,
431 qty: u64,
432 #[allow(dead_code)]
433 price: f64,
434 }
435
436 struct AmendOrderCommand {
437 #[allow(dead_code)]
438 order_id: u64,
439 symbol: String,
440 qty: u64,
441 #[allow(dead_code)]
442 price: f64,
443 }
444
445 struct AsOrderView;
448
449 unsafe impl View<NewOrderCommand> for AsOrderView {
450 type ViewType<'a> = OrderView<'a>;
451 type StaticViewType = OrderView<'static>;
452 fn view(source: &NewOrderCommand) -> OrderView<'_> {
453 OrderView {
454 symbol: &source.symbol,
455 qty: source.qty,
456 }
457 }
458 }
459
460 unsafe impl View<AmendOrderCommand> for AsOrderView {
461 type ViewType<'a> = OrderView<'a>;
462 type StaticViewType = OrderView<'static>;
463 fn view(source: &AmendOrderCommand) -> OrderView<'_> {
464 OrderView {
465 symbol: &source.symbol,
466 qty: source.qty,
467 }
468 }
469 }
470
471 fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
474 log.0.push(format!("{} qty={}", v.symbol, v.qty));
475 }
476
477 fn check_risk(limits: Res<RiskLimits>, v: &OrderView) -> bool {
478 v.qty <= limits.max_qty
479 }
480
481 #[test]
484 fn tap_observes_view() {
485 let mut wb = WorldBuilder::new();
486 wb.register(AuditLog(Vec::new()));
487 let mut world = wb.build();
488 let reg = world.registry();
489
490 let mut p = PipelineBuilder::<NewOrderCommand>::new()
491 .view::<AsOrderView>()
492 .tap(log_order, reg)
493 .end_view()
494 .then(|_cmd: NewOrderCommand| {}, reg);
495
496 p.run(
497 &mut world,
498 NewOrderCommand {
499 source: "test".into(),
500 symbol: "BTC".into(),
501 qty: 50,
502 price: 42000.0,
503 },
504 );
505
506 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
507 }
508
509 #[test]
510 fn guard_rejects() {
511 let mut wb = WorldBuilder::new();
512 wb.register(AuditLog(Vec::new()));
513 wb.register(RiskLimits { max_qty: 100 });
514 let mut world = wb.build();
515 let reg = world.registry();
516
517 let mut p = PipelineBuilder::<NewOrderCommand>::new()
518 .view::<AsOrderView>()
519 .tap(log_order, reg)
520 .guard(check_risk, reg)
521 .end_view_guarded();
522
523 let result = p.run(
524 &mut world,
525 NewOrderCommand {
526 source: "a".into(),
527 symbol: "BTC".into(),
528 qty: 50,
529 price: 42000.0,
530 },
531 );
532 assert!(result.is_some());
533
534 let result = p.run(
535 &mut world,
536 NewOrderCommand {
537 source: "b".into(),
538 symbol: "ETH".into(),
539 qty: 200,
540 price: 3000.0,
541 },
542 );
543 assert!(result.is_none());
544
545 assert_eq!(world.resource::<AuditLog>().0.len(), 2);
548 }
549
550 #[test]
551 fn event_passes_through_unchanged() {
552 let mut wb = WorldBuilder::new();
553 wb.register(AuditLog(Vec::new()));
554 let mut world = wb.build();
555 let reg = world.registry();
556
557 fn sink(mut out: ResMut<AuditLog>, cmd: NewOrderCommand) {
558 out.0
559 .push(format!("sink: {} from {}", cmd.symbol, cmd.source));
560 }
561
562 let mut p = PipelineBuilder::<NewOrderCommand>::new()
563 .view::<AsOrderView>()
564 .tap(log_order, reg)
565 .end_view()
566 .then(sink, reg);
567
568 p.run(
569 &mut world,
570 NewOrderCommand {
571 source: "ops".into(),
572 symbol: "SOL".into(),
573 qty: 10,
574 price: 150.0,
575 },
576 );
577
578 let log = &world.resource::<AuditLog>().0;
579 assert_eq!(log[0], "SOL qty=10");
580 assert_eq!(log[1], "sink: SOL from ops");
581 }
582
583 #[test]
584 fn reusable_across_event_types() {
585 let mut wb = WorldBuilder::new();
586 wb.register(AuditLog(Vec::new()));
587 let mut world = wb.build();
588 let reg = world.registry();
589
590 let mut p_new = PipelineBuilder::<NewOrderCommand>::new()
591 .view::<AsOrderView>()
592 .tap(log_order, reg)
593 .end_view()
594 .then(|_: NewOrderCommand| {}, reg);
595
596 let mut p_amend = PipelineBuilder::<AmendOrderCommand>::new()
597 .view::<AsOrderView>()
598 .tap(log_order, reg) .end_view()
600 .then(|_: AmendOrderCommand| {}, reg);
601
602 p_new.run(
603 &mut world,
604 NewOrderCommand {
605 source: "a".into(),
606 symbol: "BTC".into(),
607 qty: 50,
608 price: 42000.0,
609 },
610 );
611 p_amend.run(
612 &mut world,
613 AmendOrderCommand {
614 order_id: 123,
615 symbol: "ETH".into(),
616 qty: 25,
617 price: 3000.0,
618 },
619 );
620
621 let log = &world.resource::<AuditLog>().0;
622 assert_eq!(log[0], "BTC qty=50");
623 assert_eq!(log[1], "ETH qty=25");
624 }
625
626 #[test]
627 fn multiple_taps_in_scope() {
628 let mut wb = WorldBuilder::new();
629 wb.register(AuditLog(Vec::new()));
630 let mut world = wb.build();
631 let reg = world.registry();
632
633 fn log_symbol(mut log: ResMut<AuditLog>, v: &OrderView) {
634 log.0.push(format!("symbol: {}", v.symbol));
635 }
636 fn log_qty(mut log: ResMut<AuditLog>, v: &OrderView) {
637 log.0.push(format!("qty: {}", v.qty));
638 }
639
640 let mut p = PipelineBuilder::<NewOrderCommand>::new()
641 .view::<AsOrderView>()
642 .tap(log_symbol, reg)
643 .tap(log_qty, reg)
644 .end_view()
645 .then(|_: NewOrderCommand| {}, reg);
646
647 p.run(
648 &mut world,
649 NewOrderCommand {
650 source: "a".into(),
651 symbol: "BTC".into(),
652 qty: 50,
653 price: 42000.0,
654 },
655 );
656
657 let log = &world.resource::<AuditLog>().0;
658 assert_eq!(log[0], "symbol: BTC");
659 assert_eq!(log[1], "qty: 50");
660 }
661
662 #[test]
663 fn sequential_views() {
664 struct SymbolView<'a> {
665 symbol: &'a str,
666 }
667 struct QtyView {
668 qty: u64,
669 }
670
671 struct AsSymbolView;
672 unsafe impl View<NewOrderCommand> for AsSymbolView {
673 type ViewType<'a> = SymbolView<'a>;
674 type StaticViewType = SymbolView<'static>;
675 fn view(source: &NewOrderCommand) -> SymbolView<'_> {
676 SymbolView {
677 symbol: &source.symbol,
678 }
679 }
680 }
681
682 struct AsQtyView;
683 unsafe impl View<NewOrderCommand> for AsQtyView {
684 type ViewType<'a> = QtyView;
685 type StaticViewType = QtyView;
686 fn view(source: &NewOrderCommand) -> QtyView {
687 QtyView { qty: source.qty }
688 }
689 }
690
691 fn log_sym(mut log: ResMut<AuditLog>, v: &SymbolView) {
692 log.0.push(format!("sym: {}", v.symbol));
693 }
694 fn log_qty_view(mut log: ResMut<AuditLog>, v: &QtyView) {
695 log.0.push(format!("qty: {}", v.qty));
696 }
697
698 let mut wb = WorldBuilder::new();
699 wb.register(AuditLog(Vec::new()));
700 let mut world = wb.build();
701 let reg = world.registry();
702
703 let mut p = PipelineBuilder::<NewOrderCommand>::new()
704 .view::<AsSymbolView>()
705 .tap(log_sym, reg)
706 .end_view()
707 .view::<AsQtyView>()
708 .tap(log_qty_view, reg)
709 .end_view()
710 .then(|_: NewOrderCommand| {}, reg);
711
712 p.run(
713 &mut world,
714 NewOrderCommand {
715 source: "a".into(),
716 symbol: "BTC".into(),
717 qty: 50,
718 price: 42000.0,
719 },
720 );
721
722 let log = &world.resource::<AuditLog>().0;
723 assert_eq!(log[0], "sym: BTC");
724 assert_eq!(log[1], "qty: 50");
725 }
726
727 #[test]
730 fn dag_view_tap() {
731 use crate::{DagBuilder, Handler};
732
733 let mut wb = WorldBuilder::new();
734 wb.register(AuditLog(Vec::new()));
735 let mut world = wb.build();
736 let reg = world.registry();
737
738 let dag = DagBuilder::<NewOrderCommand>::new()
739 .root(|cmd: NewOrderCommand| cmd, reg)
740 .view::<AsOrderView>()
741 .tap(log_order, reg)
742 .end_view_dag()
743 .then(|_cmd: &NewOrderCommand| {}, reg);
744
745 let mut handler = dag.build();
746 handler.run(
747 &mut world,
748 NewOrderCommand {
749 source: "test".into(),
750 symbol: "BTC".into(),
751 qty: 50,
752 price: 42000.0,
753 },
754 );
755
756 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
757 }
758
759 #[test]
760 fn dag_view_guard() {
761 use crate::{DagBuilder, Handler};
762
763 let mut wb = WorldBuilder::new();
764 wb.register(AuditLog(Vec::new()));
765 wb.register(RiskLimits { max_qty: 100 });
766 let mut world = wb.build();
767 let reg = world.registry();
768
769 fn sink(mut log: ResMut<AuditLog>, val: &Option<NewOrderCommand>) {
770 if val.is_some() {
771 log.0.push("accepted".into());
772 } else {
773 log.0.push("rejected".into());
774 }
775 }
776
777 let dag = DagBuilder::<NewOrderCommand>::new()
778 .root(|cmd: NewOrderCommand| cmd, reg)
779 .view::<AsOrderView>()
780 .tap(log_order, reg)
781 .guard(check_risk, reg)
782 .end_view_dag_guarded()
783 .then(sink, reg);
784
785 let mut handler = dag.build();
786 handler.run(
787 &mut world,
788 NewOrderCommand {
789 source: "a".into(),
790 symbol: "BTC".into(),
791 qty: 50,
792 price: 42000.0,
793 },
794 );
795 handler.run(
796 &mut world,
797 NewOrderCommand {
798 source: "b".into(),
799 symbol: "ETH".into(),
800 qty: 200,
801 price: 3000.0,
802 },
803 );
804
805 let log = &world.resource::<AuditLog>().0;
806 assert_eq!(log[0], "BTC qty=50");
807 assert_eq!(log[1], "accepted");
808 assert_eq!(log[2], "ETH qty=200");
809 assert_eq!(log[3], "rejected");
810 }
811
812 #[test]
813 fn inspect_no_params() {
814 let mut wb = WorldBuilder::new();
815 wb.register(AuditLog(Vec::new()));
816 let mut world = wb.build();
817 let reg = world.registry();
818
819 fn just_print(v: &OrderView) {
821 assert!(!v.symbol.is_empty());
822 }
823
824 let mut p = PipelineBuilder::<NewOrderCommand>::new()
825 .view::<AsOrderView>()
826 .inspect(just_print, reg)
827 .tap(log_order, reg)
828 .end_view()
829 .then(|_: NewOrderCommand| {}, reg);
830
831 p.run(
832 &mut world,
833 NewOrderCommand {
834 source: "a".into(),
835 symbol: "BTC".into(),
836 qty: 50,
837 price: 42000.0,
838 },
839 );
840
841 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
842 }
843
844 #[test]
845 fn filter_rejects() {
846 let mut wb = WorldBuilder::new();
847 wb.register(AuditLog(Vec::new()));
848 wb.register(RiskLimits { max_qty: 100 });
849 let mut world = wb.build();
850 let reg = world.registry();
851
852 let mut p = PipelineBuilder::<NewOrderCommand>::new()
853 .view::<AsOrderView>()
854 .tap(log_order, reg)
855 .filter(check_risk, reg)
856 .end_view_guarded();
857
858 let result = p.run(
859 &mut world,
860 NewOrderCommand {
861 source: "a".into(),
862 symbol: "BTC".into(),
863 qty: 50,
864 price: 42000.0,
865 },
866 );
867 assert!(result.is_some());
868
869 let result = p.run(
870 &mut world,
871 NewOrderCommand {
872 source: "b".into(),
873 symbol: "ETH".into(),
874 qty: 200,
875 price: 3000.0,
876 },
877 );
878 assert!(result.is_none());
879 }
880
881 #[test]
882 fn guard_short_circuits_subsequent_tap() {
883 let mut wb = WorldBuilder::new();
885 wb.register(AuditLog(Vec::new()));
886 wb.register(RiskLimits { max_qty: 100 });
887 let mut world = wb.build();
888 let reg = world.registry();
889
890 let mut p = PipelineBuilder::<NewOrderCommand>::new()
891 .view::<AsOrderView>()
892 .guard(check_risk, reg) .tap(log_order, reg) .end_view_guarded();
895
896 let result = p.run(
898 &mut world,
899 NewOrderCommand {
900 source: "a".into(),
901 symbol: "BTC".into(),
902 qty: 50,
903 price: 42000.0,
904 },
905 );
906 assert!(result.is_some());
907 assert_eq!(world.resource::<AuditLog>().0.len(), 1);
908
909 let result = p.run(
911 &mut world,
912 NewOrderCommand {
913 source: "b".into(),
914 symbol: "ETH".into(),
915 qty: 200,
916 price: 3000.0,
917 },
918 );
919 assert!(result.is_none());
920 assert_eq!(world.resource::<AuditLog>().0.len(), 1); }
922
923 struct FullOrderView<'a> {
926 source: &'a str,
927 symbol: &'a str,
928 qty: u64,
929 price: f64,
930 }
931
932 struct AsFullOrderView;
933 unsafe impl View<NewOrderCommand> for AsFullOrderView {
934 type ViewType<'a> = FullOrderView<'a>;
935 type StaticViewType = FullOrderView<'static>;
936 fn view(source: &NewOrderCommand) -> FullOrderView<'_> {
937 FullOrderView {
938 source: &source.source,
939 symbol: &source.symbol,
940 qty: source.qty,
941 price: source.price,
942 }
943 }
944 }
945
946 #[test]
947 fn view_with_multiple_borrowed_fields() {
948 let mut wb = WorldBuilder::new();
949 wb.register(AuditLog(Vec::new()));
950 let mut world = wb.build();
951 let reg = world.registry();
952
953 fn log_full(mut log: ResMut<AuditLog>, v: &FullOrderView) {
954 log.0.push(format!(
955 "{} {} qty={} px={}",
956 v.source, v.symbol, v.qty, v.price
957 ));
958 }
959
960 let mut p = PipelineBuilder::<NewOrderCommand>::new()
961 .view::<AsFullOrderView>()
962 .tap(log_full, reg)
963 .end_view()
964 .then(|_: NewOrderCommand| {}, reg);
965
966 p.run(
967 &mut world,
968 NewOrderCommand {
969 source: "desk-a".into(),
970 symbol: "BTC".into(),
971 qty: 50,
972 price: 42000.0,
973 },
974 );
975
976 assert_eq!(
977 world.resource::<AuditLog>().0,
978 vec!["desk-a BTC qty=50 px=42000"]
979 );
980 }
981
982 struct Payload {
985 data: Vec<u8>,
986 tag: String,
987 }
988
989 struct PayloadView<'a> {
990 data: &'a [u8],
991 tag: &'a str,
992 }
993
994 struct AsPayloadView;
995 unsafe impl View<Payload> for AsPayloadView {
996 type ViewType<'a> = PayloadView<'a>;
997 type StaticViewType = PayloadView<'static>;
998 fn view(source: &Payload) -> PayloadView<'_> {
999 PayloadView {
1000 data: &source.data,
1001 tag: &source.tag,
1002 }
1003 }
1004 }
1005
1006 #[test]
1007 fn view_of_non_copy_types() {
1008 let mut wb = WorldBuilder::new();
1009 wb.register(AuditLog(Vec::new()));
1010 let mut world = wb.build();
1011 let reg = world.registry();
1012
1013 fn log_payload(mut log: ResMut<AuditLog>, v: &PayloadView) {
1014 log.0.push(format!("tag={} len={}", v.tag, v.data.len()));
1015 }
1016
1017 let mut p = PipelineBuilder::<Payload>::new()
1018 .view::<AsPayloadView>()
1019 .tap(log_payload, reg)
1020 .end_view()
1021 .then(|_: Payload| {}, reg);
1022
1023 p.run(
1024 &mut world,
1025 Payload {
1026 data: vec![1, 2, 3],
1027 tag: "test".into(),
1028 },
1029 );
1030
1031 assert_eq!(world.resource::<AuditLog>().0, vec!["tag=test len=3"]);
1032 }
1033
1034 #[test]
1035 fn view_guard_preserves_non_copy_event() {
1036 let mut wb = WorldBuilder::new();
1037 wb.register(AuditLog(Vec::new()));
1038 let mut world = wb.build();
1039 let reg = world.registry();
1040
1041 fn check_tag(v: &PayloadView) -> bool {
1042 v.tag == "accept"
1043 }
1044
1045 let mut p = PipelineBuilder::<Payload>::new()
1046 .view::<AsPayloadView>()
1047 .guard(check_tag, reg)
1048 .end_view_guarded();
1049
1050 let accepted = p.run(
1051 &mut world,
1052 Payload {
1053 data: vec![1],
1054 tag: "accept".into(),
1055 },
1056 );
1057 assert!(accepted.is_some());
1058 assert_eq!(accepted.unwrap().data, vec![1]);
1059
1060 let rejected = p.run(
1061 &mut world,
1062 Payload {
1063 data: vec![2],
1064 tag: "reject".into(),
1065 },
1066 );
1067 assert!(rejected.is_none());
1068 }
1069
1070 #[test]
1071 fn view_guard_inside_view() {
1072 let mut wb = WorldBuilder::new();
1074 wb.register(AuditLog(Vec::new()));
1075 let mut world = wb.build();
1076 let reg = world.registry();
1077
1078 fn accept_tag(v: &PayloadView) -> bool {
1079 v.tag == "accept"
1080 }
1081
1082 let mut p = PipelineBuilder::<Payload>::new()
1083 .view::<AsPayloadView>()
1084 .guard(accept_tag, reg)
1085 .end_view_guarded();
1086
1087 let result = p.run(
1089 &mut world,
1090 Payload {
1091 data: vec![1],
1092 tag: "reject".into(),
1093 },
1094 );
1095 assert!(result.is_none());
1096
1097 let result = p.run(
1099 &mut world,
1100 Payload {
1101 data: vec![1, 2, 3],
1102 tag: "accept".into(),
1103 },
1104 );
1105 assert!(result.is_some());
1106 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
1107 }
1108
1109 #[test]
1110 fn view_tap_with_world_resources() {
1111 let mut wb = WorldBuilder::new();
1113 wb.register(AuditLog(Vec::new()));
1114 let mut world = wb.build();
1115 let reg = world.registry();
1116
1117 fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
1118 log.0.push(format!("{}:{}", v.symbol, v.qty));
1119 }
1120
1121 let mut p = PipelineBuilder::<NewOrderCommand>::new()
1122 .view::<AsOrderView>()
1123 .tap(log_order, reg)
1124 .end_view();
1125
1126 p.run(
1127 &mut world,
1128 NewOrderCommand {
1129 source: "test".into(),
1130 symbol: "BTC".into(),
1131 qty: 100,
1132 price: 50000.0,
1133 },
1134 );
1135 p.run(
1136 &mut world,
1137 NewOrderCommand {
1138 source: "test".into(),
1139 symbol: "ETH".into(),
1140 qty: 50,
1141 price: 3000.0,
1142 },
1143 );
1144
1145 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC:100", "ETH:50"]);
1146 }
1147
1148 #[test]
1149 fn view_repeated_dispatch() {
1150 let mut wb = WorldBuilder::new();
1152 wb.register(AuditLog(Vec::new()));
1153 let mut world = wb.build();
1154 let reg = world.registry();
1155
1156 fn count(mut log: ResMut<AuditLog>, _v: &OrderView) {
1157 log.0.push("hit".into());
1158 }
1159
1160 let mut p = PipelineBuilder::<NewOrderCommand>::new()
1161 .view::<AsOrderView>()
1162 .tap(count, reg)
1163 .end_view();
1164
1165 for _ in 0..100 {
1166 p.run(
1167 &mut world,
1168 NewOrderCommand {
1169 source: "stress".into(),
1170 symbol: "X".into(),
1171 qty: 1,
1172 price: 1.0,
1173 },
1174 );
1175 }
1176
1177 assert_eq!(world.resource::<AuditLog>().0.len(), 100);
1178 }
1179}