1#![allow(clippy::type_complexity)]
2use core::marker::PhantomData;
14
15use crate::pipeline::{ChainCall, IntoRefStep, PipelineChain, RefStepCall};
16use crate::world::World;
17
18#[diagnostic::on_unimplemented(
46 message = "`{Source}` cannot be viewed as `{Self}`",
47 note = "implement `View<{Source}>` for your view marker type"
48)]
49pub unsafe trait View<Source> {
59 type ViewType<'a>
61 where
62 Source: 'a;
63
64 type StaticViewType: 'static;
67
68 fn view(source: &Source) -> Self::ViewType<'_>;
70}
71
72#[inline(always)]
89fn with_view<Source, V, R>(source: &Source, f: impl for<'a> FnOnce(&'a V::StaticViewType) -> R) -> R
90where
91 V: View<Source>,
92{
93 let view = V::view(source);
94 let static_ref: &V::StaticViewType =
104 unsafe { &*(std::ptr::from_ref(&view) as *const V::StaticViewType) };
105 let result = f(static_ref);
106 drop(view);
107 result
108}
109
110pub struct ViewScope<In, Out, V: View<Out>, PrevChain, InnerSteps> {
119 prev_chain: PrevChain,
120 inner: InnerSteps,
121 _marker: PhantomData<(fn(In) -> Out, V)>,
122}
123
124impl<In, Out, V: View<Out>, PrevChain> ViewScope<In, Out, V, PrevChain, ()> {
125 pub(crate) fn new(prev_chain: PrevChain) -> Self {
126 ViewScope {
127 prev_chain,
128 inner: (),
129 _marker: PhantomData,
130 }
131 }
132}
133
134impl<In, Out, V: View<Out>, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps> {
137 pub fn tap<Params, S: IntoRefStep<V::StaticViewType, (), Params>>(
140 self,
141 f: S,
142 registry: &crate::world::Registry,
143 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
144 ViewScope {
145 prev_chain: self.prev_chain,
146 inner: (self.inner, ViewTap(f.into_ref_step(registry))),
147 _marker: PhantomData,
148 }
149 }
150
151 pub fn inspect<S: IntoRefStep<V::StaticViewType, (), ()>>(
154 self,
155 f: S,
156 registry: &crate::world::Registry,
157 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
158 self.tap(f, registry)
159 }
160
161 pub fn filter<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
165 self,
166 f: S,
167 registry: &crate::world::Registry,
168 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
169 self.guard(f, registry)
170 }
171
172 pub fn guard<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
175 self,
176 f: S,
177 registry: &crate::world::Registry,
178 ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
179 ViewScope {
180 prev_chain: self.prev_chain,
181 inner: (self.inner, ViewGuard(f.into_ref_step(registry))),
182 _marker: PhantomData,
183 }
184 }
185}
186
187#[doc(hidden)]
190pub struct ViewTap<S>(S);
191
192#[doc(hidden)]
193pub struct ViewGuard<S>(S);
194
195#[doc(hidden)]
198pub trait ViewSteps<V> {
199 fn run(&mut self, world: &mut World, view: &V) -> bool;
200}
201
202impl<V> ViewSteps<V> for () {
203 fn run(&mut self, _world: &mut World, _view: &V) -> bool {
204 true
205 }
206}
207
208impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = ()>> ViewSteps<V> for (Prev, ViewTap<S>) {
209 fn run(&mut self, world: &mut World, view: &V) -> bool {
210 if !self.0.run(world, view) {
211 return false;
212 }
213 self.1.0.call(world, view);
214 true
215 }
216}
217
218impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = bool>> ViewSteps<V> for (Prev, ViewGuard<S>) {
219 fn run(&mut self, world: &mut World, view: &V) -> bool {
220 if !self.0.run(world, view) {
221 return false;
222 }
223 self.1.0.call(world, view)
224 }
225}
226
227impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
232where
233 PrevChain: ChainCall<In, Out = Out>,
234 V: View<Out>,
235 InnerSteps: ViewSteps<V::StaticViewType>,
236{
237 pub fn end_view(self) -> PipelineChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
239 PipelineChain {
240 chain: ViewNode {
241 prev: self.prev_chain,
242 inner: self.inner,
243 _marker: PhantomData,
244 },
245 _marker: PhantomData,
246 }
247 }
248
249 pub fn end_view_guarded(
251 self,
252 ) -> PipelineChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
253 PipelineChain {
254 chain: ViewGuardedNode {
255 prev: self.prev_chain,
256 inner: self.inner,
257 _marker: PhantomData,
258 },
259 _marker: PhantomData,
260 }
261 }
262}
263
264#[doc(hidden)]
269pub struct ViewNode<Prev, Inner, V> {
270 prev: Prev,
271 inner: Inner,
272 _marker: PhantomData<V>,
273}
274
275impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewNode<Prev, Inner, V>
276where
277 Prev: ChainCall<In, Out = Out>,
278 V: View<Out>,
279 Inner: ViewSteps<V::StaticViewType>,
280{
281 type Out = Out;
282
283 fn call(&mut self, world: &mut World, input: In) -> Out {
284 let event = self.prev.call(world, input);
285 with_view::<Out, V, ()>(&event, |view| {
286 self.inner.run(world, view);
287 });
288 event
289 }
290}
291
292#[doc(hidden)]
293pub struct ViewGuardedNode<Prev, Inner, V> {
294 prev: Prev,
295 inner: Inner,
296 _marker: PhantomData<V>,
297}
298
299impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewGuardedNode<Prev, Inner, V>
300where
301 Prev: ChainCall<In, Out = Out>,
302 V: View<Out>,
303 Inner: ViewSteps<V::StaticViewType>,
304{
305 type Out = Option<Out>;
306
307 fn call(&mut self, world: &mut World, input: In) -> Option<Out> {
308 let event = self.prev.call(world, input);
309 let pass = with_view::<Out, V, bool>(&event, |view| self.inner.run(world, view));
310 if pass { Some(event) } else { None }
311 }
312}
313
314impl<In> crate::pipeline::PipelineBuilder<In> {
319 pub fn view<V: View<In>>(self) -> ViewScope<In, In, V, crate::pipeline::IdentityNode, ()> {
321 ViewScope::new(crate::pipeline::IdentityNode)
322 }
323}
324
325impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
326 pub fn view<V: View<Out>>(self) -> ViewScope<In, Out, V, Chain, ()> {
333 ViewScope::new(self.chain)
334 }
335}
336
337impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
342where
343 PrevChain: ChainCall<In, Out = Out>,
344 V: View<Out>,
345 InnerSteps: ViewSteps<V::StaticViewType>,
346 Out: 'static,
347{
348 pub fn end_view_dag(self) -> crate::dag::DagChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
350 crate::dag::DagChain {
351 chain: ViewNode {
352 prev: self.prev_chain,
353 inner: self.inner,
354 _marker: PhantomData,
355 },
356 _marker: PhantomData,
357 }
358 }
359
360 pub fn end_view_dag_guarded(
362 self,
363 ) -> crate::dag::DagChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
364 crate::dag::DagChain {
365 chain: ViewGuardedNode {
366 prev: self.prev_chain,
367 inner: self.inner,
368 _marker: PhantomData,
369 },
370 _marker: PhantomData,
371 }
372 }
373
374 pub fn end_view_arm(self) -> crate::dag::DagArm<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
376 crate::dag::DagArm {
377 chain: ViewNode {
378 prev: self.prev_chain,
379 inner: self.inner,
380 _marker: PhantomData,
381 },
382 _marker: PhantomData,
383 }
384 }
385
386 pub fn end_view_arm_guarded(
388 self,
389 ) -> crate::dag::DagArm<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
390 crate::dag::DagArm {
391 chain: ViewGuardedNode {
392 prev: self.prev_chain,
393 inner: self.inner,
394 _marker: PhantomData,
395 },
396 _marker: PhantomData,
397 }
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::{PipelineBuilder, Res, ResMut, Resource, WorldBuilder};
405
406 struct AuditLog(Vec<String>);
409 impl Resource for AuditLog {}
410
411 struct RiskLimits {
412 max_qty: u64,
413 }
414 impl Resource for RiskLimits {}
415
416 struct OrderView<'a> {
418 symbol: &'a str,
419 qty: u64,
420 }
421
422 struct NewOrderCommand {
423 source: String,
424 symbol: String,
425 qty: u64,
426 #[allow(dead_code)]
427 price: f64,
428 }
429
430 struct AmendOrderCommand {
431 #[allow(dead_code)]
432 order_id: u64,
433 symbol: String,
434 qty: u64,
435 #[allow(dead_code)]
436 price: f64,
437 }
438
439 struct AsOrderView;
442
443 unsafe impl View<NewOrderCommand> for AsOrderView {
444 type ViewType<'a> = OrderView<'a>;
445 type StaticViewType = OrderView<'static>;
446 fn view(source: &NewOrderCommand) -> OrderView<'_> {
447 OrderView {
448 symbol: &source.symbol,
449 qty: source.qty,
450 }
451 }
452 }
453
454 unsafe impl View<AmendOrderCommand> for AsOrderView {
455 type ViewType<'a> = OrderView<'a>;
456 type StaticViewType = OrderView<'static>;
457 fn view(source: &AmendOrderCommand) -> OrderView<'_> {
458 OrderView {
459 symbol: &source.symbol,
460 qty: source.qty,
461 }
462 }
463 }
464
465 fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
468 log.0.push(format!("{} qty={}", v.symbol, v.qty));
469 }
470
471 fn check_risk(limits: Res<RiskLimits>, v: &OrderView) -> bool {
472 v.qty <= limits.max_qty
473 }
474
475 #[test]
478 fn tap_observes_view() {
479 let mut wb = WorldBuilder::new();
480 wb.register(AuditLog(Vec::new()));
481 let mut world = wb.build();
482 let reg = world.registry();
483
484 let mut p = PipelineBuilder::<NewOrderCommand>::new()
485 .view::<AsOrderView>()
486 .tap(log_order, reg)
487 .end_view()
488 .then(|_cmd: NewOrderCommand| {}, reg);
489
490 p.run(
491 &mut world,
492 NewOrderCommand {
493 source: "test".into(),
494 symbol: "BTC".into(),
495 qty: 50,
496 price: 42000.0,
497 },
498 );
499
500 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
501 }
502
503 #[test]
504 fn guard_rejects() {
505 let mut wb = WorldBuilder::new();
506 wb.register(AuditLog(Vec::new()));
507 wb.register(RiskLimits { max_qty: 100 });
508 let mut world = wb.build();
509 let reg = world.registry();
510
511 let mut p = PipelineBuilder::<NewOrderCommand>::new()
512 .view::<AsOrderView>()
513 .tap(log_order, reg)
514 .guard(check_risk, reg)
515 .end_view_guarded();
516
517 let result = p.run(
518 &mut world,
519 NewOrderCommand {
520 source: "a".into(),
521 symbol: "BTC".into(),
522 qty: 50,
523 price: 42000.0,
524 },
525 );
526 assert!(result.is_some());
527
528 let result = p.run(
529 &mut world,
530 NewOrderCommand {
531 source: "b".into(),
532 symbol: "ETH".into(),
533 qty: 200,
534 price: 3000.0,
535 },
536 );
537 assert!(result.is_none());
538
539 assert_eq!(world.resource::<AuditLog>().0.len(), 2);
542 }
543
544 #[test]
545 fn event_passes_through_unchanged() {
546 let mut wb = WorldBuilder::new();
547 wb.register(AuditLog(Vec::new()));
548 let mut world = wb.build();
549 let reg = world.registry();
550
551 fn sink(mut out: ResMut<AuditLog>, cmd: NewOrderCommand) {
552 out.0
553 .push(format!("sink: {} from {}", cmd.symbol, cmd.source));
554 }
555
556 let mut p = PipelineBuilder::<NewOrderCommand>::new()
557 .view::<AsOrderView>()
558 .tap(log_order, reg)
559 .end_view()
560 .then(sink, reg);
561
562 p.run(
563 &mut world,
564 NewOrderCommand {
565 source: "ops".into(),
566 symbol: "SOL".into(),
567 qty: 10,
568 price: 150.0,
569 },
570 );
571
572 let log = &world.resource::<AuditLog>().0;
573 assert_eq!(log[0], "SOL qty=10");
574 assert_eq!(log[1], "sink: SOL from ops");
575 }
576
577 #[test]
578 fn reusable_across_event_types() {
579 let mut wb = WorldBuilder::new();
580 wb.register(AuditLog(Vec::new()));
581 let mut world = wb.build();
582 let reg = world.registry();
583
584 let mut p_new = PipelineBuilder::<NewOrderCommand>::new()
585 .view::<AsOrderView>()
586 .tap(log_order, reg)
587 .end_view()
588 .then(|_: NewOrderCommand| {}, reg);
589
590 let mut p_amend = PipelineBuilder::<AmendOrderCommand>::new()
591 .view::<AsOrderView>()
592 .tap(log_order, reg) .end_view()
594 .then(|_: AmendOrderCommand| {}, reg);
595
596 p_new.run(
597 &mut world,
598 NewOrderCommand {
599 source: "a".into(),
600 symbol: "BTC".into(),
601 qty: 50,
602 price: 42000.0,
603 },
604 );
605 p_amend.run(
606 &mut world,
607 AmendOrderCommand {
608 order_id: 123,
609 symbol: "ETH".into(),
610 qty: 25,
611 price: 3000.0,
612 },
613 );
614
615 let log = &world.resource::<AuditLog>().0;
616 assert_eq!(log[0], "BTC qty=50");
617 assert_eq!(log[1], "ETH qty=25");
618 }
619
620 #[test]
621 fn multiple_taps_in_scope() {
622 let mut wb = WorldBuilder::new();
623 wb.register(AuditLog(Vec::new()));
624 let mut world = wb.build();
625 let reg = world.registry();
626
627 fn log_symbol(mut log: ResMut<AuditLog>, v: &OrderView) {
628 log.0.push(format!("symbol: {}", v.symbol));
629 }
630 fn log_qty(mut log: ResMut<AuditLog>, v: &OrderView) {
631 log.0.push(format!("qty: {}", v.qty));
632 }
633
634 let mut p = PipelineBuilder::<NewOrderCommand>::new()
635 .view::<AsOrderView>()
636 .tap(log_symbol, reg)
637 .tap(log_qty, reg)
638 .end_view()
639 .then(|_: NewOrderCommand| {}, reg);
640
641 p.run(
642 &mut world,
643 NewOrderCommand {
644 source: "a".into(),
645 symbol: "BTC".into(),
646 qty: 50,
647 price: 42000.0,
648 },
649 );
650
651 let log = &world.resource::<AuditLog>().0;
652 assert_eq!(log[0], "symbol: BTC");
653 assert_eq!(log[1], "qty: 50");
654 }
655
656 #[test]
657 fn sequential_views() {
658 struct SymbolView<'a> {
659 symbol: &'a str,
660 }
661 struct QtyView {
662 qty: u64,
663 }
664
665 struct AsSymbolView;
666 unsafe impl View<NewOrderCommand> for AsSymbolView {
667 type ViewType<'a> = SymbolView<'a>;
668 type StaticViewType = SymbolView<'static>;
669 fn view(source: &NewOrderCommand) -> SymbolView<'_> {
670 SymbolView {
671 symbol: &source.symbol,
672 }
673 }
674 }
675
676 struct AsQtyView;
677 unsafe impl View<NewOrderCommand> for AsQtyView {
678 type ViewType<'a> = QtyView;
679 type StaticViewType = QtyView;
680 fn view(source: &NewOrderCommand) -> QtyView {
681 QtyView { qty: source.qty }
682 }
683 }
684
685 fn log_sym(mut log: ResMut<AuditLog>, v: &SymbolView) {
686 log.0.push(format!("sym: {}", v.symbol));
687 }
688 fn log_qty_view(mut log: ResMut<AuditLog>, v: &QtyView) {
689 log.0.push(format!("qty: {}", v.qty));
690 }
691
692 let mut wb = WorldBuilder::new();
693 wb.register(AuditLog(Vec::new()));
694 let mut world = wb.build();
695 let reg = world.registry();
696
697 let mut p = PipelineBuilder::<NewOrderCommand>::new()
698 .view::<AsSymbolView>()
699 .tap(log_sym, reg)
700 .end_view()
701 .view::<AsQtyView>()
702 .tap(log_qty_view, reg)
703 .end_view()
704 .then(|_: NewOrderCommand| {}, reg);
705
706 p.run(
707 &mut world,
708 NewOrderCommand {
709 source: "a".into(),
710 symbol: "BTC".into(),
711 qty: 50,
712 price: 42000.0,
713 },
714 );
715
716 let log = &world.resource::<AuditLog>().0;
717 assert_eq!(log[0], "sym: BTC");
718 assert_eq!(log[1], "qty: 50");
719 }
720
721 #[test]
724 fn dag_view_tap() {
725 use crate::{DagBuilder, Handler};
726
727 let mut wb = WorldBuilder::new();
728 wb.register(AuditLog(Vec::new()));
729 let mut world = wb.build();
730 let reg = world.registry();
731
732 let dag = DagBuilder::<NewOrderCommand>::new()
733 .root(|cmd: NewOrderCommand| cmd, reg)
734 .view::<AsOrderView>()
735 .tap(log_order, reg)
736 .end_view_dag()
737 .then(|_cmd: &NewOrderCommand| {}, reg);
738
739 let mut handler = dag.build();
740 handler.run(
741 &mut world,
742 NewOrderCommand {
743 source: "test".into(),
744 symbol: "BTC".into(),
745 qty: 50,
746 price: 42000.0,
747 },
748 );
749
750 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
751 }
752
753 #[test]
754 fn dag_view_guard() {
755 use crate::{DagBuilder, Handler};
756
757 let mut wb = WorldBuilder::new();
758 wb.register(AuditLog(Vec::new()));
759 wb.register(RiskLimits { max_qty: 100 });
760 let mut world = wb.build();
761 let reg = world.registry();
762
763 fn sink(mut log: ResMut<AuditLog>, val: &Option<NewOrderCommand>) {
764 if val.is_some() {
765 log.0.push("accepted".into());
766 } else {
767 log.0.push("rejected".into());
768 }
769 }
770
771 let dag = DagBuilder::<NewOrderCommand>::new()
772 .root(|cmd: NewOrderCommand| cmd, reg)
773 .view::<AsOrderView>()
774 .tap(log_order, reg)
775 .guard(check_risk, reg)
776 .end_view_dag_guarded()
777 .then(sink, reg);
778
779 let mut handler = dag.build();
780 handler.run(
781 &mut world,
782 NewOrderCommand {
783 source: "a".into(),
784 symbol: "BTC".into(),
785 qty: 50,
786 price: 42000.0,
787 },
788 );
789 handler.run(
790 &mut world,
791 NewOrderCommand {
792 source: "b".into(),
793 symbol: "ETH".into(),
794 qty: 200,
795 price: 3000.0,
796 },
797 );
798
799 let log = &world.resource::<AuditLog>().0;
800 assert_eq!(log[0], "BTC qty=50");
801 assert_eq!(log[1], "accepted");
802 assert_eq!(log[2], "ETH qty=200");
803 assert_eq!(log[3], "rejected");
804 }
805
806 #[test]
807 fn inspect_no_params() {
808 let mut wb = WorldBuilder::new();
809 wb.register(AuditLog(Vec::new()));
810 let mut world = wb.build();
811 let reg = world.registry();
812
813 fn just_print(v: &OrderView) {
815 assert!(!v.symbol.is_empty());
816 }
817
818 let mut p = PipelineBuilder::<NewOrderCommand>::new()
819 .view::<AsOrderView>()
820 .inspect(just_print, reg)
821 .tap(log_order, reg)
822 .end_view()
823 .then(|_: NewOrderCommand| {}, reg);
824
825 p.run(
826 &mut world,
827 NewOrderCommand {
828 source: "a".into(),
829 symbol: "BTC".into(),
830 qty: 50,
831 price: 42000.0,
832 },
833 );
834
835 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
836 }
837
838 #[test]
839 fn filter_rejects() {
840 let mut wb = WorldBuilder::new();
841 wb.register(AuditLog(Vec::new()));
842 wb.register(RiskLimits { max_qty: 100 });
843 let mut world = wb.build();
844 let reg = world.registry();
845
846 let mut p = PipelineBuilder::<NewOrderCommand>::new()
847 .view::<AsOrderView>()
848 .tap(log_order, reg)
849 .filter(check_risk, reg)
850 .end_view_guarded();
851
852 let result = p.run(
853 &mut world,
854 NewOrderCommand {
855 source: "a".into(),
856 symbol: "BTC".into(),
857 qty: 50,
858 price: 42000.0,
859 },
860 );
861 assert!(result.is_some());
862
863 let result = p.run(
864 &mut world,
865 NewOrderCommand {
866 source: "b".into(),
867 symbol: "ETH".into(),
868 qty: 200,
869 price: 3000.0,
870 },
871 );
872 assert!(result.is_none());
873 }
874
875 #[test]
876 fn guard_short_circuits_subsequent_tap() {
877 let mut wb = WorldBuilder::new();
879 wb.register(AuditLog(Vec::new()));
880 wb.register(RiskLimits { max_qty: 100 });
881 let mut world = wb.build();
882 let reg = world.registry();
883
884 let mut p = PipelineBuilder::<NewOrderCommand>::new()
885 .view::<AsOrderView>()
886 .guard(check_risk, reg) .tap(log_order, reg) .end_view_guarded();
889
890 let result = p.run(
892 &mut world,
893 NewOrderCommand {
894 source: "a".into(),
895 symbol: "BTC".into(),
896 qty: 50,
897 price: 42000.0,
898 },
899 );
900 assert!(result.is_some());
901 assert_eq!(world.resource::<AuditLog>().0.len(), 1);
902
903 let result = p.run(
905 &mut world,
906 NewOrderCommand {
907 source: "b".into(),
908 symbol: "ETH".into(),
909 qty: 200,
910 price: 3000.0,
911 },
912 );
913 assert!(result.is_none());
914 assert_eq!(world.resource::<AuditLog>().0.len(), 1); }
916
917 struct FullOrderView<'a> {
920 source: &'a str,
921 symbol: &'a str,
922 qty: u64,
923 price: f64,
924 }
925
926 struct AsFullOrderView;
927 unsafe impl View<NewOrderCommand> for AsFullOrderView {
928 type ViewType<'a> = FullOrderView<'a>;
929 type StaticViewType = FullOrderView<'static>;
930 fn view(source: &NewOrderCommand) -> FullOrderView<'_> {
931 FullOrderView {
932 source: &source.source,
933 symbol: &source.symbol,
934 qty: source.qty,
935 price: source.price,
936 }
937 }
938 }
939
940 #[test]
941 fn view_with_multiple_borrowed_fields() {
942 let mut wb = WorldBuilder::new();
943 wb.register(AuditLog(Vec::new()));
944 let mut world = wb.build();
945 let reg = world.registry();
946
947 fn log_full(mut log: ResMut<AuditLog>, v: &FullOrderView) {
948 log.0.push(format!(
949 "{} {} qty={} px={}",
950 v.source, v.symbol, v.qty, v.price
951 ));
952 }
953
954 let mut p = PipelineBuilder::<NewOrderCommand>::new()
955 .view::<AsFullOrderView>()
956 .tap(log_full, reg)
957 .end_view()
958 .then(|_: NewOrderCommand| {}, reg);
959
960 p.run(
961 &mut world,
962 NewOrderCommand {
963 source: "desk-a".into(),
964 symbol: "BTC".into(),
965 qty: 50,
966 price: 42000.0,
967 },
968 );
969
970 assert_eq!(
971 world.resource::<AuditLog>().0,
972 vec!["desk-a BTC qty=50 px=42000"]
973 );
974 }
975
976 struct Payload {
979 data: Vec<u8>,
980 tag: String,
981 }
982
983 struct PayloadView<'a> {
984 data: &'a [u8],
985 tag: &'a str,
986 }
987
988 struct AsPayloadView;
989 unsafe impl View<Payload> for AsPayloadView {
990 type ViewType<'a> = PayloadView<'a>;
991 type StaticViewType = PayloadView<'static>;
992 fn view(source: &Payload) -> PayloadView<'_> {
993 PayloadView {
994 data: &source.data,
995 tag: &source.tag,
996 }
997 }
998 }
999
1000 #[test]
1001 fn view_of_non_copy_types() {
1002 let mut wb = WorldBuilder::new();
1003 wb.register(AuditLog(Vec::new()));
1004 let mut world = wb.build();
1005 let reg = world.registry();
1006
1007 fn log_payload(mut log: ResMut<AuditLog>, v: &PayloadView) {
1008 log.0.push(format!("tag={} len={}", v.tag, v.data.len()));
1009 }
1010
1011 let mut p = PipelineBuilder::<Payload>::new()
1012 .view::<AsPayloadView>()
1013 .tap(log_payload, reg)
1014 .end_view()
1015 .then(|_: Payload| {}, reg);
1016
1017 p.run(
1018 &mut world,
1019 Payload {
1020 data: vec![1, 2, 3],
1021 tag: "test".into(),
1022 },
1023 );
1024
1025 assert_eq!(world.resource::<AuditLog>().0, vec!["tag=test len=3"]);
1026 }
1027
1028 #[test]
1029 fn view_guard_preserves_non_copy_event() {
1030 let mut wb = WorldBuilder::new();
1031 wb.register(AuditLog(Vec::new()));
1032 let mut world = wb.build();
1033 let reg = world.registry();
1034
1035 fn check_tag(v: &PayloadView) -> bool {
1036 v.tag == "accept"
1037 }
1038
1039 let mut p = PipelineBuilder::<Payload>::new()
1040 .view::<AsPayloadView>()
1041 .guard(check_tag, reg)
1042 .end_view_guarded();
1043
1044 let accepted = p.run(
1045 &mut world,
1046 Payload {
1047 data: vec![1],
1048 tag: "accept".into(),
1049 },
1050 );
1051 assert!(accepted.is_some());
1052 assert_eq!(accepted.unwrap().data, vec![1]);
1053
1054 let rejected = p.run(
1055 &mut world,
1056 Payload {
1057 data: vec![2],
1058 tag: "reject".into(),
1059 },
1060 );
1061 assert!(rejected.is_none());
1062 }
1063
1064 #[test]
1065 fn view_guard_inside_view() {
1066 let mut wb = WorldBuilder::new();
1068 wb.register(AuditLog(Vec::new()));
1069 let mut world = wb.build();
1070 let reg = world.registry();
1071
1072 fn accept_tag(v: &PayloadView) -> bool {
1073 v.tag == "accept"
1074 }
1075
1076 let mut p = PipelineBuilder::<Payload>::new()
1077 .view::<AsPayloadView>()
1078 .guard(accept_tag, reg)
1079 .end_view_guarded();
1080
1081 let result = p.run(
1083 &mut world,
1084 Payload {
1085 data: vec![1],
1086 tag: "reject".into(),
1087 },
1088 );
1089 assert!(result.is_none());
1090
1091 let result = p.run(
1093 &mut world,
1094 Payload {
1095 data: vec![1, 2, 3],
1096 tag: "accept".into(),
1097 },
1098 );
1099 assert!(result.is_some());
1100 assert_eq!(result.unwrap().data, vec![1, 2, 3]);
1101 }
1102
1103 #[test]
1104 fn view_tap_with_world_resources() {
1105 let mut wb = WorldBuilder::new();
1107 wb.register(AuditLog(Vec::new()));
1108 let mut world = wb.build();
1109 let reg = world.registry();
1110
1111 fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
1112 log.0.push(format!("{}:{}", v.symbol, v.qty));
1113 }
1114
1115 let mut p = PipelineBuilder::<NewOrderCommand>::new()
1116 .view::<AsOrderView>()
1117 .tap(log_order, reg)
1118 .end_view();
1119
1120 p.run(
1121 &mut world,
1122 NewOrderCommand {
1123 source: "test".into(),
1124 symbol: "BTC".into(),
1125 qty: 100,
1126 price: 50000.0,
1127 },
1128 );
1129 p.run(
1130 &mut world,
1131 NewOrderCommand {
1132 source: "test".into(),
1133 symbol: "ETH".into(),
1134 qty: 50,
1135 price: 3000.0,
1136 },
1137 );
1138
1139 assert_eq!(world.resource::<AuditLog>().0, vec!["BTC:100", "ETH:50"]);
1140 }
1141
1142 #[test]
1143 fn view_repeated_dispatch() {
1144 let mut wb = WorldBuilder::new();
1146 wb.register(AuditLog(Vec::new()));
1147 let mut world = wb.build();
1148 let reg = world.registry();
1149
1150 fn count(mut log: ResMut<AuditLog>, _v: &OrderView) {
1151 log.0.push("hit".into());
1152 }
1153
1154 let mut p = PipelineBuilder::<NewOrderCommand>::new()
1155 .view::<AsOrderView>()
1156 .tap(count, reg)
1157 .end_view();
1158
1159 for _ in 0..100 {
1160 p.run(
1161 &mut world,
1162 NewOrderCommand {
1163 source: "stress".into(),
1164 symbol: "X".into(),
1165 qty: 1,
1166 price: 1.0,
1167 },
1168 );
1169 }
1170
1171 assert_eq!(world.resource::<AuditLog>().0.len(), 100);
1172 }
1173}