Skip to main content

nexus_rt/
view.rs

1#![allow(clippy::type_complexity)]
2// Pipeline view scopes — borrow a projected view from an event.
3//
4// `.view::<V>()` opens a scope where steps operate on a read-only
5// view constructed from the event. `.end_view()` closes the scope
6// and the original event continues unchanged.
7//
8// Lifetime erasure: the view may borrow from the event (e.g.,
9// `OrderView<'a>` with `symbol: &'a str`). IntoRefStep resolves
10// against `StaticViewType` ('static stand-in). `with_view` bridges
11// the two via a scoped transmute — same pattern as std::thread::scope.
12
13use core::marker::PhantomData;
14
15use crate::pipeline::{ChainCall, IntoRefStep, PipelineChain, RefStepCall};
16use crate::world::World;
17
18// =============================================================================
19// View trait
20// =============================================================================
21
22/// Associates a source type with a projected view via a marker.
23///
24/// `ViewType<'a>` is the real view with borrowed fields.
25/// `StaticViewType` is the same struct with `'static` — used only for
26/// step resolution at build time, never observed at runtime.
27///
28/// # Examples
29///
30/// ```
31/// use nexus_rt::View;
32///
33/// struct OrderView<'a> { symbol: &'a str, qty: u64 }
34/// struct NewOrder { symbol: String, qty: u64 }
35///
36/// struct AsOrderView;
37/// unsafe impl View<NewOrder> for AsOrderView {
38///     type ViewType<'a> = OrderView<'a>;
39///     type StaticViewType = OrderView<'static>;
40///     fn view(source: &NewOrder) -> OrderView<'_> {
41///         OrderView { symbol: &source.symbol, qty: source.qty }
42///     }
43/// }
44/// ```
45#[diagnostic::on_unimplemented(
46    message = "`{Source}` cannot be viewed as `{Self}`",
47    note = "implement `View<{Source}>` for your view marker type"
48)]
49/// # Safety
50///
51/// `StaticViewType` must be layout-identical to `ViewType<'a>` for any `'a`.
52/// They must be the same struct with different lifetime parameters. The
53/// framework performs a pointer cast between them in `with_view()`.
54///
55/// Incorrect implementations (e.g., `StaticViewType` being a different struct)
56/// cause undefined behavior. Use the `#[derive(View)]` macro when available
57/// to generate correct implementations.
58pub unsafe trait View<Source> {
59    /// The view type with the source borrow lifetime.
60    type ViewType<'a>
61    where
62        Source: 'a;
63
64    /// The same type with `'static` — for `IntoRefStep` trait resolution.
65    /// Must be layout-identical to `ViewType<'a>` for any `'a`.
66    type StaticViewType: 'static;
67
68    /// Construct the view from a borrowed source.
69    fn view(source: &Source) -> Self::ViewType<'_>;
70}
71
72// =============================================================================
73// with_view — scoped lifetime erasure
74// =============================================================================
75
76/// Constructs a view, erases its lifetime, runs a closure with `&StaticViewType`,
77/// then drops the view. The closure cannot leak the reference.
78///
79/// # Safety argument
80///
81/// The view borrows from `source` which is alive for this entire call.
82/// The transmute erases the borrow lifetime to `'static` so that
83/// `RefStepCall<StaticViewType>` (resolved at build time) can accept it.
84/// The closure boundary prevents the `'static` reference from escaping —
85/// it's dropped before this function returns.
86///
87/// This is the same pattern as `std::thread::scope` and `crossbeam::scope`.
88#[inline(always)]
89fn with_view<Source, V, R>(source: &Source, f: impl for<'a> FnOnce(&'a V::StaticViewType) -> R) -> R
90where
91    V: View<Source>,
92{
93    let view = V::view(source);
94    // SAFETY: ViewType<'a> and StaticViewType are the same struct with
95    // different lifetime parameters. The pointer cast is sound because:
96    // 1. The layouts are identical (same repr, same fields) — guaranteed
97    //    by the `unsafe trait View` contract.
98    // 2. `source` outlives this entire function, so the borrow is valid.
99    // 3. The `for<'a> FnOnce(&'a ...)` bound prevents the closure from
100    //    storing the reference — it must work for ANY lifetime, so it
101    //    cannot assume 'static.
102    // 4. The view is explicitly dropped after the closure returns.
103    let static_ref: &V::StaticViewType =
104        unsafe { &*(std::ptr::from_ref(&view) as *const V::StaticViewType) };
105    let result = f(static_ref);
106    drop(view);
107    result
108}
109
110// =============================================================================
111// ViewScope — builder for steps inside a view scope
112// =============================================================================
113
114/// Builder for steps inside a `.view::<V>()` scope.
115///
116/// `V` is the view marker (implements [`View<Out>`]). Steps resolve
117/// against `V::StaticViewType` via `IntoRefStep`.
118pub struct ViewScope<In, Out, V: View<Out>, PrevChain, InnerSteps> {
119    prev_chain: PrevChain,
120    inner: InnerSteps,
121    _marker: PhantomData<(fn(In) -> Out, V)>,
122}
123
124impl<In, Out, V: View<Out>, PrevChain> ViewScope<In, Out, V, PrevChain, ()> {
125    pub(crate) fn new(prev_chain: PrevChain) -> Self {
126        ViewScope {
127            prev_chain,
128            inner: (),
129            _marker: PhantomData,
130        }
131    }
132}
133
134// --- Combinators ---
135
136impl<In, Out, V: View<Out>, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps> {
137    /// Observe the view. Side effects via `Res`/`ResMut`.
138    /// Step signature: `fn(Params..., &ViewType) -> ()`.
139    pub fn tap<Params, S: IntoRefStep<V::StaticViewType, (), Params>>(
140        self,
141        f: S,
142        registry: &crate::world::Registry,
143    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
144        ViewScope {
145            prev_chain: self.prev_chain,
146            inner: (self.inner, ViewTap(f.into_ref_step(registry))),
147            _marker: PhantomData,
148        }
149    }
150
151    /// Observe the view without side effects (no Params).
152    /// Step signature: `fn(&ViewType)`.
153    pub fn inspect<S: IntoRefStep<V::StaticViewType, (), ()>>(
154        self,
155        f: S,
156        registry: &crate::world::Registry,
157    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
158        self.tap(f, registry)
159    }
160
161    /// Filter the event based on the view. Same as `guard` — returns
162    /// `bool` to accept/reject.
163    /// Step signature: `fn(Params..., &ViewType) -> bool`.
164    pub fn filter<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
165        self,
166        f: S,
167        registry: &crate::world::Registry,
168    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
169        self.guard(f, registry)
170    }
171
172    /// Guard the event based on the view.
173    /// Step signature: `fn(Params..., &ViewType) -> bool`.
174    pub fn guard<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
175        self,
176        f: S,
177        registry: &crate::world::Registry,
178    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
179        ViewScope {
180            prev_chain: self.prev_chain,
181            inner: (self.inner, ViewGuard(f.into_ref_step(registry))),
182            _marker: PhantomData,
183        }
184    }
185}
186
187// --- Step wrappers ---
188
189#[doc(hidden)]
190pub struct ViewTap<S>(S);
191
192#[doc(hidden)]
193pub struct ViewGuard<S>(S);
194
195// --- ViewSteps trait ---
196
197#[doc(hidden)]
198pub trait ViewSteps<V> {
199    fn run(&mut self, world: &mut World, view: &V) -> bool;
200}
201
202impl<V> ViewSteps<V> for () {
203    fn run(&mut self, _world: &mut World, _view: &V) -> bool {
204        true
205    }
206}
207
208impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = ()>> ViewSteps<V> for (Prev, ViewTap<S>) {
209    fn run(&mut self, world: &mut World, view: &V) -> bool {
210        if !self.0.run(world, view) {
211            return false;
212        }
213        self.1.0.call(world, view);
214        true
215    }
216}
217
218impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = bool>> ViewSteps<V> for (Prev, ViewGuard<S>) {
219    fn run(&mut self, world: &mut World, view: &V) -> bool {
220        if !self.0.run(world, view) {
221            return false;
222        }
223        self.1.0.call(world, view)
224    }
225}
226
227// =============================================================================
228// end_view
229// =============================================================================
230
231impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
232where
233    PrevChain: ChainCall<In, Out = Out>,
234    V: View<Out>,
235    InnerSteps: ViewSteps<V::StaticViewType>,
236{
237    /// Close the view scope. The event passes through unchanged.
238    pub fn end_view(self) -> PipelineChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
239        PipelineChain {
240            chain: ViewNode {
241                prev: self.prev_chain,
242                inner: self.inner,
243                _marker: PhantomData,
244            },
245            _marker: PhantomData,
246        }
247    }
248
249    /// Close the view scope. If any guard rejected, returns `None`.
250    pub fn end_view_guarded(
251        self,
252    ) -> PipelineChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
253        PipelineChain {
254            chain: ViewGuardedNode {
255                prev: self.prev_chain,
256                inner: self.inner,
257                _marker: PhantomData,
258            },
259            _marker: PhantomData,
260        }
261    }
262}
263
264// =============================================================================
265// Chain nodes
266// =============================================================================
267
268#[doc(hidden)]
269pub struct ViewNode<Prev, Inner, V> {
270    prev: Prev,
271    inner: Inner,
272    _marker: PhantomData<V>,
273}
274
275impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewNode<Prev, Inner, V>
276where
277    Prev: ChainCall<In, Out = Out>,
278    V: View<Out>,
279    Inner: ViewSteps<V::StaticViewType>,
280{
281    type Out = Out;
282
283    fn call(&mut self, world: &mut World, input: In) -> Out {
284        let event = self.prev.call(world, input);
285        with_view::<Out, V, ()>(&event, |view| {
286            self.inner.run(world, view);
287        });
288        event
289    }
290}
291
292#[doc(hidden)]
293pub struct ViewGuardedNode<Prev, Inner, V> {
294    prev: Prev,
295    inner: Inner,
296    _marker: PhantomData<V>,
297}
298
299impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewGuardedNode<Prev, Inner, V>
300where
301    Prev: ChainCall<In, Out = Out>,
302    V: View<Out>,
303    Inner: ViewSteps<V::StaticViewType>,
304{
305    type Out = Option<Out>;
306
307    fn call(&mut self, world: &mut World, input: In) -> Option<Out> {
308        let event = self.prev.call(world, input);
309        let pass = with_view::<Out, V, bool>(&event, |view| self.inner.run(world, view));
310        if pass { Some(event) } else { None }
311    }
312}
313
314// =============================================================================
315// PipelineBuilder / PipelineChain integration
316// =============================================================================
317
318impl<In> crate::pipeline::PipelineBuilder<In> {
319    /// Open a view scope as the first pipeline step.
320    pub fn view<V: View<In>>(self) -> ViewScope<In, In, V, crate::pipeline::IdentityNode, ()> {
321        ViewScope::new(crate::pipeline::IdentityNode)
322    }
323}
324
325impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
326    /// Open a view scope. Steps inside operate on a read-only view
327    /// constructed from the pipeline's current event.
328    ///
329    /// `V` is a marker type implementing [`View<Out>`]. Inside the scope,
330    /// steps resolve against `V::StaticViewType` — borrowed views work
331    /// via lifetime erasure (same pattern as `std::thread::scope`).
332    pub fn view<V: View<Out>>(self) -> ViewScope<In, Out, V, Chain, ()> {
333        ViewScope::new(self.chain)
334    }
335}
336
337// =============================================================================
338// DagChain / DagArm integration
339// =============================================================================
340
341impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
342where
343    PrevChain: ChainCall<In, Out = Out>,
344    V: View<Out>,
345    InnerSteps: ViewSteps<V::StaticViewType>,
346    Out: 'static,
347{
348    /// Close the view scope, returning a [`DagChain`](crate::dag::DagChain).
349    pub fn end_view_dag(self) -> crate::dag::DagChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
350        crate::dag::DagChain {
351            chain: ViewNode {
352                prev: self.prev_chain,
353                inner: self.inner,
354                _marker: PhantomData,
355            },
356            _marker: PhantomData,
357        }
358    }
359
360    /// Close a guarded view scope, returning a [`DagChain`](crate::dag::DagChain).
361    pub fn end_view_dag_guarded(
362        self,
363    ) -> crate::dag::DagChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
364        crate::dag::DagChain {
365            chain: ViewGuardedNode {
366                prev: self.prev_chain,
367                inner: self.inner,
368                _marker: PhantomData,
369            },
370            _marker: PhantomData,
371        }
372    }
373
374    /// Close the view scope, returning a [`DagArm`](crate::dag::DagArm).
375    pub fn end_view_arm(self) -> crate::dag::DagArm<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
376        crate::dag::DagArm {
377            chain: ViewNode {
378                prev: self.prev_chain,
379                inner: self.inner,
380                _marker: PhantomData,
381            },
382            _marker: PhantomData,
383        }
384    }
385
386    /// Close a guarded view scope, returning a [`DagArm`](crate::dag::DagArm).
387    pub fn end_view_arm_guarded(
388        self,
389    ) -> crate::dag::DagArm<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
390        crate::dag::DagArm {
391            chain: ViewGuardedNode {
392                prev: self.prev_chain,
393                inner: self.inner,
394                _marker: PhantomData,
395            },
396            _marker: PhantomData,
397        }
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::{PipelineBuilder, Res, ResMut, Resource, WorldBuilder};
405
406    // -- Domain types --
407
408    struct AuditLog(Vec<String>);
409    impl Resource for AuditLog {}
410
411    struct RiskLimits {
412        max_qty: u64,
413    }
414    impl Resource for RiskLimits {}
415
416    // Borrowed view — the whole point
417    struct OrderView<'a> {
418        symbol: &'a str,
419        qty: u64,
420    }
421
422    struct NewOrderCommand {
423        source: String,
424        symbol: String,
425        qty: u64,
426        #[allow(dead_code)]
427        price: f64,
428    }
429
430    struct AmendOrderCommand {
431        #[allow(dead_code)]
432        order_id: u64,
433        symbol: String,
434        qty: u64,
435        #[allow(dead_code)]
436        price: f64,
437    }
438
439    // -- View marker + impls (zero-cost borrows) --
440
441    struct AsOrderView;
442
443    unsafe impl View<NewOrderCommand> for AsOrderView {
444        type ViewType<'a> = OrderView<'a>;
445        type StaticViewType = OrderView<'static>;
446        fn view(source: &NewOrderCommand) -> OrderView<'_> {
447            OrderView {
448                symbol: &source.symbol,
449                qty: source.qty,
450            }
451        }
452    }
453
454    unsafe impl View<AmendOrderCommand> for AsOrderView {
455        type ViewType<'a> = OrderView<'a>;
456        type StaticViewType = OrderView<'static>;
457        fn view(source: &AmendOrderCommand) -> OrderView<'_> {
458            OrderView {
459                symbol: &source.symbol,
460                qty: source.qty,
461            }
462        }
463    }
464
465    // -- Reusable steps (Params first, &View last) --
466
467    fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
468        log.0.push(format!("{} qty={}", v.symbol, v.qty));
469    }
470
471    fn check_risk(limits: Res<RiskLimits>, v: &OrderView) -> bool {
472        v.qty <= limits.max_qty
473    }
474
475    // -- Tests --
476
477    #[test]
478    fn tap_observes_view() {
479        let mut wb = WorldBuilder::new();
480        wb.register(AuditLog(Vec::new()));
481        let mut world = wb.build();
482        let reg = world.registry();
483
484        let mut p = PipelineBuilder::<NewOrderCommand>::new()
485            .view::<AsOrderView>()
486            .tap(log_order, reg)
487            .end_view()
488            .then(|_cmd: NewOrderCommand| {}, reg);
489
490        p.run(
491            &mut world,
492            NewOrderCommand {
493                source: "test".into(),
494                symbol: "BTC".into(),
495                qty: 50,
496                price: 42000.0,
497            },
498        );
499
500        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
501    }
502
503    #[test]
504    fn guard_rejects() {
505        let mut wb = WorldBuilder::new();
506        wb.register(AuditLog(Vec::new()));
507        wb.register(RiskLimits { max_qty: 100 });
508        let mut world = wb.build();
509        let reg = world.registry();
510
511        let mut p = PipelineBuilder::<NewOrderCommand>::new()
512            .view::<AsOrderView>()
513            .tap(log_order, reg)
514            .guard(check_risk, reg)
515            .end_view_guarded();
516
517        let result = p.run(
518            &mut world,
519            NewOrderCommand {
520                source: "a".into(),
521                symbol: "BTC".into(),
522                qty: 50,
523                price: 42000.0,
524            },
525        );
526        assert!(result.is_some());
527
528        let result = p.run(
529            &mut world,
530            NewOrderCommand {
531                source: "b".into(),
532                symbol: "ETH".into(),
533                qty: 200,
534                price: 3000.0,
535            },
536        );
537        assert!(result.is_none());
538
539        // Tap is before guard, so both events are logged.
540        // Steps after a rejecting guard short-circuit.
541        assert_eq!(world.resource::<AuditLog>().0.len(), 2);
542    }
543
544    #[test]
545    fn event_passes_through_unchanged() {
546        let mut wb = WorldBuilder::new();
547        wb.register(AuditLog(Vec::new()));
548        let mut world = wb.build();
549        let reg = world.registry();
550
551        fn sink(mut out: ResMut<AuditLog>, cmd: NewOrderCommand) {
552            out.0
553                .push(format!("sink: {} from {}", cmd.symbol, cmd.source));
554        }
555
556        let mut p = PipelineBuilder::<NewOrderCommand>::new()
557            .view::<AsOrderView>()
558            .tap(log_order, reg)
559            .end_view()
560            .then(sink, reg);
561
562        p.run(
563            &mut world,
564            NewOrderCommand {
565                source: "ops".into(),
566                symbol: "SOL".into(),
567                qty: 10,
568                price: 150.0,
569            },
570        );
571
572        let log = &world.resource::<AuditLog>().0;
573        assert_eq!(log[0], "SOL qty=10");
574        assert_eq!(log[1], "sink: SOL from ops");
575    }
576
577    #[test]
578    fn reusable_across_event_types() {
579        let mut wb = WorldBuilder::new();
580        wb.register(AuditLog(Vec::new()));
581        let mut world = wb.build();
582        let reg = world.registry();
583
584        let mut p_new = PipelineBuilder::<NewOrderCommand>::new()
585            .view::<AsOrderView>()
586            .tap(log_order, reg)
587            .end_view()
588            .then(|_: NewOrderCommand| {}, reg);
589
590        let mut p_amend = PipelineBuilder::<AmendOrderCommand>::new()
591            .view::<AsOrderView>()
592            .tap(log_order, reg) // SAME function
593            .end_view()
594            .then(|_: AmendOrderCommand| {}, reg);
595
596        p_new.run(
597            &mut world,
598            NewOrderCommand {
599                source: "a".into(),
600                symbol: "BTC".into(),
601                qty: 50,
602                price: 42000.0,
603            },
604        );
605        p_amend.run(
606            &mut world,
607            AmendOrderCommand {
608                order_id: 123,
609                symbol: "ETH".into(),
610                qty: 25,
611                price: 3000.0,
612            },
613        );
614
615        let log = &world.resource::<AuditLog>().0;
616        assert_eq!(log[0], "BTC qty=50");
617        assert_eq!(log[1], "ETH qty=25");
618    }
619
620    #[test]
621    fn multiple_taps_in_scope() {
622        let mut wb = WorldBuilder::new();
623        wb.register(AuditLog(Vec::new()));
624        let mut world = wb.build();
625        let reg = world.registry();
626
627        fn log_symbol(mut log: ResMut<AuditLog>, v: &OrderView) {
628            log.0.push(format!("symbol: {}", v.symbol));
629        }
630        fn log_qty(mut log: ResMut<AuditLog>, v: &OrderView) {
631            log.0.push(format!("qty: {}", v.qty));
632        }
633
634        let mut p = PipelineBuilder::<NewOrderCommand>::new()
635            .view::<AsOrderView>()
636            .tap(log_symbol, reg)
637            .tap(log_qty, reg)
638            .end_view()
639            .then(|_: NewOrderCommand| {}, reg);
640
641        p.run(
642            &mut world,
643            NewOrderCommand {
644                source: "a".into(),
645                symbol: "BTC".into(),
646                qty: 50,
647                price: 42000.0,
648            },
649        );
650
651        let log = &world.resource::<AuditLog>().0;
652        assert_eq!(log[0], "symbol: BTC");
653        assert_eq!(log[1], "qty: 50");
654    }
655
656    #[test]
657    fn sequential_views() {
658        struct SymbolView<'a> {
659            symbol: &'a str,
660        }
661        struct QtyView {
662            qty: u64,
663        }
664
665        struct AsSymbolView;
666        unsafe impl View<NewOrderCommand> for AsSymbolView {
667            type ViewType<'a> = SymbolView<'a>;
668            type StaticViewType = SymbolView<'static>;
669            fn view(source: &NewOrderCommand) -> SymbolView<'_> {
670                SymbolView {
671                    symbol: &source.symbol,
672                }
673            }
674        }
675
676        struct AsQtyView;
677        unsafe impl View<NewOrderCommand> for AsQtyView {
678            type ViewType<'a> = QtyView;
679            type StaticViewType = QtyView;
680            fn view(source: &NewOrderCommand) -> QtyView {
681                QtyView { qty: source.qty }
682            }
683        }
684
685        fn log_sym(mut log: ResMut<AuditLog>, v: &SymbolView) {
686            log.0.push(format!("sym: {}", v.symbol));
687        }
688        fn log_qty_view(mut log: ResMut<AuditLog>, v: &QtyView) {
689            log.0.push(format!("qty: {}", v.qty));
690        }
691
692        let mut wb = WorldBuilder::new();
693        wb.register(AuditLog(Vec::new()));
694        let mut world = wb.build();
695        let reg = world.registry();
696
697        let mut p = PipelineBuilder::<NewOrderCommand>::new()
698            .view::<AsSymbolView>()
699            .tap(log_sym, reg)
700            .end_view()
701            .view::<AsQtyView>()
702            .tap(log_qty_view, reg)
703            .end_view()
704            .then(|_: NewOrderCommand| {}, reg);
705
706        p.run(
707            &mut world,
708            NewOrderCommand {
709                source: "a".into(),
710                symbol: "BTC".into(),
711                qty: 50,
712                price: 42000.0,
713            },
714        );
715
716        let log = &world.resource::<AuditLog>().0;
717        assert_eq!(log[0], "sym: BTC");
718        assert_eq!(log[1], "qty: 50");
719    }
720
721    // -- DAG tests --
722
723    #[test]
724    fn dag_view_tap() {
725        use crate::{DagBuilder, Handler};
726
727        let mut wb = WorldBuilder::new();
728        wb.register(AuditLog(Vec::new()));
729        let mut world = wb.build();
730        let reg = world.registry();
731
732        let dag = DagBuilder::<NewOrderCommand>::new()
733            .root(|cmd: NewOrderCommand| cmd, reg)
734            .view::<AsOrderView>()
735            .tap(log_order, reg)
736            .end_view_dag()
737            .then(|_cmd: &NewOrderCommand| {}, reg);
738
739        let mut handler = dag.build();
740        handler.run(
741            &mut world,
742            NewOrderCommand {
743                source: "test".into(),
744                symbol: "BTC".into(),
745                qty: 50,
746                price: 42000.0,
747            },
748        );
749
750        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
751    }
752
753    #[test]
754    fn dag_view_guard() {
755        use crate::{DagBuilder, Handler};
756
757        let mut wb = WorldBuilder::new();
758        wb.register(AuditLog(Vec::new()));
759        wb.register(RiskLimits { max_qty: 100 });
760        let mut world = wb.build();
761        let reg = world.registry();
762
763        fn sink(mut log: ResMut<AuditLog>, val: &Option<NewOrderCommand>) {
764            if val.is_some() {
765                log.0.push("accepted".into());
766            } else {
767                log.0.push("rejected".into());
768            }
769        }
770
771        let dag = DagBuilder::<NewOrderCommand>::new()
772            .root(|cmd: NewOrderCommand| cmd, reg)
773            .view::<AsOrderView>()
774            .tap(log_order, reg)
775            .guard(check_risk, reg)
776            .end_view_dag_guarded()
777            .then(sink, reg);
778
779        let mut handler = dag.build();
780        handler.run(
781            &mut world,
782            NewOrderCommand {
783                source: "a".into(),
784                symbol: "BTC".into(),
785                qty: 50,
786                price: 42000.0,
787            },
788        );
789        handler.run(
790            &mut world,
791            NewOrderCommand {
792                source: "b".into(),
793                symbol: "ETH".into(),
794                qty: 200,
795                price: 3000.0,
796            },
797        );
798
799        let log = &world.resource::<AuditLog>().0;
800        assert_eq!(log[0], "BTC qty=50");
801        assert_eq!(log[1], "accepted");
802        assert_eq!(log[2], "ETH qty=200");
803        assert_eq!(log[3], "rejected");
804    }
805
806    #[test]
807    fn inspect_no_params() {
808        let mut wb = WorldBuilder::new();
809        wb.register(AuditLog(Vec::new()));
810        let mut world = wb.build();
811        let reg = world.registry();
812
813        // inspect takes no Params — just &View
814        fn just_print(v: &OrderView) {
815            assert!(!v.symbol.is_empty());
816        }
817
818        let mut p = PipelineBuilder::<NewOrderCommand>::new()
819            .view::<AsOrderView>()
820            .inspect(just_print, reg)
821            .tap(log_order, reg)
822            .end_view()
823            .then(|_: NewOrderCommand| {}, reg);
824
825        p.run(
826            &mut world,
827            NewOrderCommand {
828                source: "a".into(),
829                symbol: "BTC".into(),
830                qty: 50,
831                price: 42000.0,
832            },
833        );
834
835        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
836    }
837
838    #[test]
839    fn filter_rejects() {
840        let mut wb = WorldBuilder::new();
841        wb.register(AuditLog(Vec::new()));
842        wb.register(RiskLimits { max_qty: 100 });
843        let mut world = wb.build();
844        let reg = world.registry();
845
846        let mut p = PipelineBuilder::<NewOrderCommand>::new()
847            .view::<AsOrderView>()
848            .tap(log_order, reg)
849            .filter(check_risk, reg)
850            .end_view_guarded();
851
852        let result = p.run(
853            &mut world,
854            NewOrderCommand {
855                source: "a".into(),
856                symbol: "BTC".into(),
857                qty: 50,
858                price: 42000.0,
859            },
860        );
861        assert!(result.is_some());
862
863        let result = p.run(
864            &mut world,
865            NewOrderCommand {
866                source: "b".into(),
867                symbol: "ETH".into(),
868                qty: 200,
869                price: 3000.0,
870            },
871        );
872        assert!(result.is_none());
873    }
874
875    #[test]
876    fn guard_short_circuits_subsequent_tap() {
877        // guard BEFORE tap — if guard rejects, tap should NOT fire
878        let mut wb = WorldBuilder::new();
879        wb.register(AuditLog(Vec::new()));
880        wb.register(RiskLimits { max_qty: 100 });
881        let mut world = wb.build();
882        let reg = world.registry();
883
884        let mut p = PipelineBuilder::<NewOrderCommand>::new()
885            .view::<AsOrderView>()
886            .guard(check_risk, reg) // guard FIRST
887            .tap(log_order, reg) // tap AFTER — should NOT fire on rejection
888            .end_view_guarded();
889
890        // Accepted: guard passes, tap fires
891        let result = p.run(
892            &mut world,
893            NewOrderCommand {
894                source: "a".into(),
895                symbol: "BTC".into(),
896                qty: 50,
897                price: 42000.0,
898            },
899        );
900        assert!(result.is_some());
901        assert_eq!(world.resource::<AuditLog>().0.len(), 1);
902
903        // Rejected: guard fails, tap does NOT fire (short-circuit)
904        let result = p.run(
905            &mut world,
906            NewOrderCommand {
907                source: "b".into(),
908                symbol: "ETH".into(),
909                qty: 200,
910                price: 3000.0,
911            },
912        );
913        assert!(result.is_none());
914        assert_eq!(world.resource::<AuditLog>().0.len(), 1); // still 1, not 2
915    }
916
917    // -- Multi-field view tests -----------------------------------------------
918
919    struct FullOrderView<'a> {
920        source: &'a str,
921        symbol: &'a str,
922        qty: u64,
923        price: f64,
924    }
925
926    struct AsFullOrderView;
927    unsafe impl View<NewOrderCommand> for AsFullOrderView {
928        type ViewType<'a> = FullOrderView<'a>;
929        type StaticViewType = FullOrderView<'static>;
930        fn view(source: &NewOrderCommand) -> FullOrderView<'_> {
931            FullOrderView {
932                source: &source.source,
933                symbol: &source.symbol,
934                qty: source.qty,
935                price: source.price,
936            }
937        }
938    }
939
940    #[test]
941    fn view_with_multiple_borrowed_fields() {
942        let mut wb = WorldBuilder::new();
943        wb.register(AuditLog(Vec::new()));
944        let mut world = wb.build();
945        let reg = world.registry();
946
947        fn log_full(mut log: ResMut<AuditLog>, v: &FullOrderView) {
948            log.0.push(format!(
949                "{} {} qty={} px={}",
950                v.source, v.symbol, v.qty, v.price
951            ));
952        }
953
954        let mut p = PipelineBuilder::<NewOrderCommand>::new()
955            .view::<AsFullOrderView>()
956            .tap(log_full, reg)
957            .end_view()
958            .then(|_: NewOrderCommand| {}, reg);
959
960        p.run(
961            &mut world,
962            NewOrderCommand {
963                source: "desk-a".into(),
964                symbol: "BTC".into(),
965                qty: 50,
966                price: 42000.0,
967            },
968        );
969
970        assert_eq!(
971            world.resource::<AuditLog>().0,
972            vec!["desk-a BTC qty=50 px=42000"]
973        );
974    }
975
976    // -- Non-Copy type tests --------------------------------------------------
977
978    struct Payload {
979        data: Vec<u8>,
980        tag: String,
981    }
982
983    struct PayloadView<'a> {
984        data: &'a [u8],
985        tag: &'a str,
986    }
987
988    struct AsPayloadView;
989    unsafe impl View<Payload> for AsPayloadView {
990        type ViewType<'a> = PayloadView<'a>;
991        type StaticViewType = PayloadView<'static>;
992        fn view(source: &Payload) -> PayloadView<'_> {
993            PayloadView {
994                data: &source.data,
995                tag: &source.tag,
996            }
997        }
998    }
999
1000    #[test]
1001    fn view_of_non_copy_types() {
1002        let mut wb = WorldBuilder::new();
1003        wb.register(AuditLog(Vec::new()));
1004        let mut world = wb.build();
1005        let reg = world.registry();
1006
1007        fn log_payload(mut log: ResMut<AuditLog>, v: &PayloadView) {
1008            log.0.push(format!("tag={} len={}", v.tag, v.data.len()));
1009        }
1010
1011        let mut p = PipelineBuilder::<Payload>::new()
1012            .view::<AsPayloadView>()
1013            .tap(log_payload, reg)
1014            .end_view()
1015            .then(|_: Payload| {}, reg);
1016
1017        p.run(
1018            &mut world,
1019            Payload {
1020                data: vec![1, 2, 3],
1021                tag: "test".into(),
1022            },
1023        );
1024
1025        assert_eq!(world.resource::<AuditLog>().0, vec!["tag=test len=3"]);
1026    }
1027
1028    #[test]
1029    fn view_guard_preserves_non_copy_event() {
1030        let mut wb = WorldBuilder::new();
1031        wb.register(AuditLog(Vec::new()));
1032        let mut world = wb.build();
1033        let reg = world.registry();
1034
1035        fn check_tag(v: &PayloadView) -> bool {
1036            v.tag == "accept"
1037        }
1038
1039        let mut p = PipelineBuilder::<Payload>::new()
1040            .view::<AsPayloadView>()
1041            .guard(check_tag, reg)
1042            .end_view_guarded();
1043
1044        let accepted = p.run(
1045            &mut world,
1046            Payload {
1047                data: vec![1],
1048                tag: "accept".into(),
1049            },
1050        );
1051        assert!(accepted.is_some());
1052        assert_eq!(accepted.unwrap().data, vec![1]);
1053
1054        let rejected = p.run(
1055            &mut world,
1056            Payload {
1057                data: vec![2],
1058                tag: "reject".into(),
1059            },
1060        );
1061        assert!(rejected.is_none());
1062    }
1063
1064    #[test]
1065    fn view_guard_inside_view() {
1066        // Guard inside view scope — tests guarded view with accept/reject
1067        let mut wb = WorldBuilder::new();
1068        wb.register(AuditLog(Vec::new()));
1069        let mut world = wb.build();
1070        let reg = world.registry();
1071
1072        fn accept_tag(v: &PayloadView) -> bool {
1073            v.tag == "accept"
1074        }
1075
1076        let mut p = PipelineBuilder::<Payload>::new()
1077            .view::<AsPayloadView>()
1078            .guard(accept_tag, reg)
1079            .end_view_guarded();
1080
1081        // Wrong tag — guarded out
1082        let result = p.run(
1083            &mut world,
1084            Payload {
1085                data: vec![1],
1086                tag: "reject".into(),
1087            },
1088        );
1089        assert!(result.is_none());
1090
1091        // Right tag — passes through
1092        let result = p.run(
1093            &mut world,
1094            Payload {
1095                data: vec![1, 2, 3],
1096                tag: "accept".into(),
1097            },
1098        );
1099        assert!(result.is_some());
1100        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
1101    }
1102
1103    #[test]
1104    fn view_tap_with_world_resources() {
1105        // View tap step reads a World resource
1106        let mut wb = WorldBuilder::new();
1107        wb.register(AuditLog(Vec::new()));
1108        let mut world = wb.build();
1109        let reg = world.registry();
1110
1111        fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
1112            log.0.push(format!("{}:{}", v.symbol, v.qty));
1113        }
1114
1115        let mut p = PipelineBuilder::<NewOrderCommand>::new()
1116            .view::<AsOrderView>()
1117            .tap(log_order, reg)
1118            .end_view();
1119
1120        p.run(
1121            &mut world,
1122            NewOrderCommand {
1123                source: "test".into(),
1124                symbol: "BTC".into(),
1125                qty: 100,
1126                price: 50000.0,
1127            },
1128        );
1129        p.run(
1130            &mut world,
1131            NewOrderCommand {
1132                source: "test".into(),
1133                symbol: "ETH".into(),
1134                qty: 50,
1135                price: 3000.0,
1136            },
1137        );
1138
1139        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC:100", "ETH:50"]);
1140    }
1141
1142    #[test]
1143    fn view_repeated_dispatch() {
1144        // Stress: same pipeline dispatched many times — no leaks, no drift
1145        let mut wb = WorldBuilder::new();
1146        wb.register(AuditLog(Vec::new()));
1147        let mut world = wb.build();
1148        let reg = world.registry();
1149
1150        fn count(mut log: ResMut<AuditLog>, _v: &OrderView) {
1151            log.0.push("hit".into());
1152        }
1153
1154        let mut p = PipelineBuilder::<NewOrderCommand>::new()
1155            .view::<AsOrderView>()
1156            .tap(count, reg)
1157            .end_view();
1158
1159        for _ in 0..100 {
1160            p.run(
1161                &mut world,
1162                NewOrderCommand {
1163                    source: "stress".into(),
1164                    symbol: "X".into(),
1165                    qty: 1,
1166                    price: 1.0,
1167                },
1168            );
1169        }
1170
1171        assert_eq!(world.resource::<AuditLog>().0.len(), 100);
1172    }
1173}