Skip to main content

nexus_rt/
view.rs

1#![allow(clippy::type_complexity)]
2// Pipeline view scopes — borrow a projected view from an event.
3//
4// `.view::<V>()` opens a scope where steps operate on a read-only
5// view constructed from the event. `.end_view()` closes the scope
6// and the original event continues unchanged.
7//
8// Lifetime erasure: the view may borrow from the event (e.g.,
9// `OrderView<'a>` with `symbol: &'a str`). IntoRefStep resolves
10// against `StaticViewType` ('static stand-in). `with_view` bridges
11// the two via a scoped transmute — same pattern as std::thread::scope.
12
13use core::marker::PhantomData;
14
15use crate::pipeline::{ChainCall, IntoRefStep, PipelineChain, RefStepCall};
16use crate::world::World;
17
18// =============================================================================
19// View trait
20// =============================================================================
21
22/// Associates a source type with a projected view via a marker.
23///
24/// `ViewType<'a>` is the real view with borrowed fields.
25/// `StaticViewType` is the same struct with `'static` — used only for
26/// step resolution at build time, never observed at runtime.
27///
28/// # Examples
29///
30/// ```
31/// use nexus_rt::View;
32///
33/// struct OrderView<'a> { symbol: &'a str, qty: u64 }
34/// struct NewOrder { symbol: String, qty: u64 }
35///
36/// struct AsOrderView;
37/// unsafe impl View<NewOrder> for AsOrderView {
38///     type ViewType<'a> = OrderView<'a>;
39///     type StaticViewType = OrderView<'static>;
40///     fn view(source: &NewOrder) -> OrderView<'_> {
41///         OrderView { symbol: &source.symbol, qty: source.qty }
42///     }
43/// }
44/// ```
45#[diagnostic::on_unimplemented(
46    message = "`{Source}` cannot be viewed as `{Self}`",
47    note = "implement `View<{Source}>` for your view marker type"
48)]
49/// # Safety
50///
51/// `StaticViewType` must be layout-identical to `ViewType<'a>` for any `'a`.
52/// They must be the same struct with different lifetime parameters. The
53/// framework performs a pointer cast between them in `with_view()`.
54///
55/// For view structs with borrowed fields (e.g., `&'a str`), use `#[repr(C)]`
56/// to guarantee layout stability across lifetime parameters. Rust does not
57/// currently guarantee that `repr(Rust)` types with different lifetime
58/// parameters have identical layouts, though all current compilers do so.
59/// `#[repr(C)]` removes any theoretical risk.
60///
61/// Incorrect implementations (e.g., `StaticViewType` being a different struct)
62/// cause undefined behavior. Use the `#[derive(View)]` macro when available
63/// to generate correct implementations.
64pub unsafe trait View<Source> {
65    /// The view type with the source borrow lifetime.
66    type ViewType<'a>
67    where
68        Source: 'a;
69
70    /// The same type with `'static` — for `IntoRefStep` trait resolution.
71    /// Must be layout-identical to `ViewType<'a>` for any `'a`.
72    type StaticViewType: 'static;
73
74    /// Construct the view from a borrowed source.
75    fn view(source: &Source) -> Self::ViewType<'_>;
76}
77
78// =============================================================================
79// with_view — scoped lifetime erasure
80// =============================================================================
81
82/// Constructs a view, erases its lifetime, runs a closure with `&StaticViewType`,
83/// then drops the view. The closure cannot leak the reference.
84///
85/// # Safety argument
86///
87/// The view borrows from `source` which is alive for this entire call.
88/// The transmute erases the borrow lifetime to `'static` so that
89/// `RefStepCall<StaticViewType>` (resolved at build time) can accept it.
90/// The closure boundary prevents the `'static` reference from escaping —
91/// it's dropped before this function returns.
92///
93/// This is the same pattern as `std::thread::scope` and `crossbeam::scope`.
94#[inline(always)]
95fn with_view<Source, V, R>(source: &Source, f: impl for<'a> FnOnce(&'a V::StaticViewType) -> R) -> R
96where
97    V: View<Source>,
98{
99    let view = V::view(source);
100    // SAFETY: ViewType<'a> and StaticViewType are the same struct with
101    // different lifetime parameters. The pointer cast is sound because:
102    // 1. The layouts are identical (same repr, same fields) — guaranteed
103    //    by the `unsafe trait View` contract.
104    // 2. `source` outlives this entire function, so the borrow is valid.
105    // 3. The `for<'a> FnOnce(&'a ...)` bound prevents the closure from
106    //    storing the reference — it must work for ANY lifetime, so it
107    //    cannot assume 'static.
108    // 4. The view is explicitly dropped after the closure returns.
109    let static_ref: &V::StaticViewType =
110        unsafe { &*(std::ptr::from_ref(&view) as *const V::StaticViewType) };
111    let result = f(static_ref);
112    drop(view);
113    result
114}
115
116// =============================================================================
117// ViewScope — builder for steps inside a view scope
118// =============================================================================
119
120/// Builder for steps inside a `.view::<V>()` scope.
121///
122/// `V` is the view marker (implements [`View<Out>`]). Steps resolve
123/// against `V::StaticViewType` via `IntoRefStep`.
124pub struct ViewScope<In, Out, V: View<Out>, PrevChain, InnerSteps> {
125    prev_chain: PrevChain,
126    inner: InnerSteps,
127    _marker: PhantomData<(fn(In) -> Out, V)>,
128}
129
130impl<In, Out, V: View<Out>, PrevChain> ViewScope<In, Out, V, PrevChain, ()> {
131    pub(crate) fn new(prev_chain: PrevChain) -> Self {
132        ViewScope {
133            prev_chain,
134            inner: (),
135            _marker: PhantomData,
136        }
137    }
138}
139
140// --- Combinators ---
141
142impl<In, Out, V: View<Out>, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps> {
143    /// Observe the view. Side effects via `Res`/`ResMut`.
144    /// Step signature: `fn(Params..., &ViewType) -> ()`.
145    pub fn tap<Params, S: IntoRefStep<V::StaticViewType, (), Params>>(
146        self,
147        f: S,
148        registry: &crate::world::Registry,
149    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
150        ViewScope {
151            prev_chain: self.prev_chain,
152            inner: (self.inner, ViewTap(f.into_ref_step(registry))),
153            _marker: PhantomData,
154        }
155    }
156
157    /// Observe the view without side effects (no Params).
158    /// Step signature: `fn(&ViewType)`.
159    pub fn inspect<S: IntoRefStep<V::StaticViewType, (), ()>>(
160        self,
161        f: S,
162        registry: &crate::world::Registry,
163    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
164        self.tap(f, registry)
165    }
166
167    /// Filter the event based on the view. Same as `guard` — returns
168    /// `bool` to accept/reject.
169    /// Step signature: `fn(Params..., &ViewType) -> bool`.
170    pub fn filter<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
171        self,
172        f: S,
173        registry: &crate::world::Registry,
174    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
175        self.guard(f, registry)
176    }
177
178    /// Guard the event based on the view.
179    /// Step signature: `fn(Params..., &ViewType) -> bool`.
180    pub fn guard<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
181        self,
182        f: S,
183        registry: &crate::world::Registry,
184    ) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
185        ViewScope {
186            prev_chain: self.prev_chain,
187            inner: (self.inner, ViewGuard(f.into_ref_step(registry))),
188            _marker: PhantomData,
189        }
190    }
191}
192
193// --- Step wrappers ---
194
195#[doc(hidden)]
196pub struct ViewTap<S>(S);
197
198#[doc(hidden)]
199pub struct ViewGuard<S>(S);
200
201// --- ViewSteps trait ---
202
203#[doc(hidden)]
204pub trait ViewSteps<V> {
205    fn run(&mut self, world: &mut World, view: &V) -> bool;
206}
207
208impl<V> ViewSteps<V> for () {
209    fn run(&mut self, _world: &mut World, _view: &V) -> bool {
210        true
211    }
212}
213
214impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = ()>> ViewSteps<V> for (Prev, ViewTap<S>) {
215    fn run(&mut self, world: &mut World, view: &V) -> bool {
216        if !self.0.run(world, view) {
217            return false;
218        }
219        self.1.0.call(world, view);
220        true
221    }
222}
223
224impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = bool>> ViewSteps<V> for (Prev, ViewGuard<S>) {
225    fn run(&mut self, world: &mut World, view: &V) -> bool {
226        if !self.0.run(world, view) {
227            return false;
228        }
229        self.1.0.call(world, view)
230    }
231}
232
233// =============================================================================
234// end_view
235// =============================================================================
236
237impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
238where
239    PrevChain: ChainCall<In, Out = Out>,
240    V: View<Out>,
241    InnerSteps: ViewSteps<V::StaticViewType>,
242{
243    /// Close the view scope. The event passes through unchanged.
244    pub fn end_view(self) -> PipelineChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
245        PipelineChain {
246            chain: ViewNode {
247                prev: self.prev_chain,
248                inner: self.inner,
249                _marker: PhantomData,
250            },
251            _marker: PhantomData,
252        }
253    }
254
255    /// Close the view scope. If any guard rejected, returns `None`.
256    pub fn end_view_guarded(
257        self,
258    ) -> PipelineChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
259        PipelineChain {
260            chain: ViewGuardedNode {
261                prev: self.prev_chain,
262                inner: self.inner,
263                _marker: PhantomData,
264            },
265            _marker: PhantomData,
266        }
267    }
268}
269
270// =============================================================================
271// Chain nodes
272// =============================================================================
273
274#[doc(hidden)]
275pub struct ViewNode<Prev, Inner, V> {
276    prev: Prev,
277    inner: Inner,
278    _marker: PhantomData<V>,
279}
280
281impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewNode<Prev, Inner, V>
282where
283    Prev: ChainCall<In, Out = Out>,
284    V: View<Out>,
285    Inner: ViewSteps<V::StaticViewType>,
286{
287    type Out = Out;
288
289    fn call(&mut self, world: &mut World, input: In) -> Out {
290        let event = self.prev.call(world, input);
291        with_view::<Out, V, ()>(&event, |view| {
292            self.inner.run(world, view);
293        });
294        event
295    }
296}
297
298#[doc(hidden)]
299pub struct ViewGuardedNode<Prev, Inner, V> {
300    prev: Prev,
301    inner: Inner,
302    _marker: PhantomData<V>,
303}
304
305impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewGuardedNode<Prev, Inner, V>
306where
307    Prev: ChainCall<In, Out = Out>,
308    V: View<Out>,
309    Inner: ViewSteps<V::StaticViewType>,
310{
311    type Out = Option<Out>;
312
313    fn call(&mut self, world: &mut World, input: In) -> Option<Out> {
314        let event = self.prev.call(world, input);
315        let pass = with_view::<Out, V, bool>(&event, |view| self.inner.run(world, view));
316        if pass { Some(event) } else { None }
317    }
318}
319
320// =============================================================================
321// PipelineBuilder / PipelineChain integration
322// =============================================================================
323
324impl<In> crate::pipeline::PipelineBuilder<In> {
325    /// Open a view scope as the first pipeline step.
326    pub fn view<V: View<In>>(self) -> ViewScope<In, In, V, crate::pipeline::IdentityNode, ()> {
327        ViewScope::new(crate::pipeline::IdentityNode)
328    }
329}
330
331impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
332    /// Open a view scope. Steps inside operate on a read-only view
333    /// constructed from the pipeline's current event.
334    ///
335    /// `V` is a marker type implementing [`View<Out>`]. Inside the scope,
336    /// steps resolve against `V::StaticViewType` — borrowed views work
337    /// via lifetime erasure (same pattern as `std::thread::scope`).
338    pub fn view<V: View<Out>>(self) -> ViewScope<In, Out, V, Chain, ()> {
339        ViewScope::new(self.chain)
340    }
341}
342
343// =============================================================================
344// DagChain / DagArm integration
345// =============================================================================
346
347impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
348where
349    PrevChain: ChainCall<In, Out = Out>,
350    V: View<Out>,
351    InnerSteps: ViewSteps<V::StaticViewType>,
352    Out: 'static,
353{
354    /// Close the view scope, returning a [`DagChain`](crate::dag::DagChain).
355    pub fn end_view_dag(self) -> crate::dag::DagChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
356        crate::dag::DagChain {
357            chain: ViewNode {
358                prev: self.prev_chain,
359                inner: self.inner,
360                _marker: PhantomData,
361            },
362            _marker: PhantomData,
363        }
364    }
365
366    /// Close a guarded view scope, returning a [`DagChain`](crate::dag::DagChain).
367    pub fn end_view_dag_guarded(
368        self,
369    ) -> crate::dag::DagChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
370        crate::dag::DagChain {
371            chain: ViewGuardedNode {
372                prev: self.prev_chain,
373                inner: self.inner,
374                _marker: PhantomData,
375            },
376            _marker: PhantomData,
377        }
378    }
379
380    /// Close the view scope, returning a [`DagArm`](crate::dag::DagArm).
381    pub fn end_view_arm(self) -> crate::dag::DagArm<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
382        crate::dag::DagArm {
383            chain: ViewNode {
384                prev: self.prev_chain,
385                inner: self.inner,
386                _marker: PhantomData,
387            },
388            _marker: PhantomData,
389        }
390    }
391
392    /// Close a guarded view scope, returning a [`DagArm`](crate::dag::DagArm).
393    pub fn end_view_arm_guarded(
394        self,
395    ) -> crate::dag::DagArm<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
396        crate::dag::DagArm {
397            chain: ViewGuardedNode {
398                prev: self.prev_chain,
399                inner: self.inner,
400                _marker: PhantomData,
401            },
402            _marker: PhantomData,
403        }
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use crate::{PipelineBuilder, Res, ResMut, Resource, WorldBuilder};
411
412    // -- Domain types --
413
414    struct AuditLog(Vec<String>);
415    impl Resource for AuditLog {}
416
417    struct RiskLimits {
418        max_qty: u64,
419    }
420    impl Resource for RiskLimits {}
421
422    // Borrowed view — the whole point
423    struct OrderView<'a> {
424        symbol: &'a str,
425        qty: u64,
426    }
427
428    struct NewOrderCommand {
429        source: String,
430        symbol: String,
431        qty: u64,
432        #[allow(dead_code)]
433        price: f64,
434    }
435
436    struct AmendOrderCommand {
437        #[allow(dead_code)]
438        order_id: u64,
439        symbol: String,
440        qty: u64,
441        #[allow(dead_code)]
442        price: f64,
443    }
444
445    // -- View marker + impls (zero-cost borrows) --
446
447    struct AsOrderView;
448
449    unsafe impl View<NewOrderCommand> for AsOrderView {
450        type ViewType<'a> = OrderView<'a>;
451        type StaticViewType = OrderView<'static>;
452        fn view(source: &NewOrderCommand) -> OrderView<'_> {
453            OrderView {
454                symbol: &source.symbol,
455                qty: source.qty,
456            }
457        }
458    }
459
460    unsafe impl View<AmendOrderCommand> for AsOrderView {
461        type ViewType<'a> = OrderView<'a>;
462        type StaticViewType = OrderView<'static>;
463        fn view(source: &AmendOrderCommand) -> OrderView<'_> {
464            OrderView {
465                symbol: &source.symbol,
466                qty: source.qty,
467            }
468        }
469    }
470
471    // -- Reusable steps (Params first, &View last) --
472
473    fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
474        log.0.push(format!("{} qty={}", v.symbol, v.qty));
475    }
476
477    fn check_risk(limits: Res<RiskLimits>, v: &OrderView) -> bool {
478        v.qty <= limits.max_qty
479    }
480
481    // -- Tests --
482
483    #[test]
484    fn tap_observes_view() {
485        let mut wb = WorldBuilder::new();
486        wb.register(AuditLog(Vec::new()));
487        let mut world = wb.build();
488        let reg = world.registry();
489
490        let mut p = PipelineBuilder::<NewOrderCommand>::new()
491            .view::<AsOrderView>()
492            .tap(log_order, reg)
493            .end_view()
494            .then(|_cmd: NewOrderCommand| {}, reg);
495
496        p.run(
497            &mut world,
498            NewOrderCommand {
499                source: "test".into(),
500                symbol: "BTC".into(),
501                qty: 50,
502                price: 42000.0,
503            },
504        );
505
506        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
507    }
508
509    #[test]
510    fn guard_rejects() {
511        let mut wb = WorldBuilder::new();
512        wb.register(AuditLog(Vec::new()));
513        wb.register(RiskLimits { max_qty: 100 });
514        let mut world = wb.build();
515        let reg = world.registry();
516
517        let mut p = PipelineBuilder::<NewOrderCommand>::new()
518            .view::<AsOrderView>()
519            .tap(log_order, reg)
520            .guard(check_risk, reg)
521            .end_view_guarded();
522
523        let result = p.run(
524            &mut world,
525            NewOrderCommand {
526                source: "a".into(),
527                symbol: "BTC".into(),
528                qty: 50,
529                price: 42000.0,
530            },
531        );
532        assert!(result.is_some());
533
534        let result = p.run(
535            &mut world,
536            NewOrderCommand {
537                source: "b".into(),
538                symbol: "ETH".into(),
539                qty: 200,
540                price: 3000.0,
541            },
542        );
543        assert!(result.is_none());
544
545        // Tap is before guard, so both events are logged.
546        // Steps after a rejecting guard short-circuit.
547        assert_eq!(world.resource::<AuditLog>().0.len(), 2);
548    }
549
550    #[test]
551    fn event_passes_through_unchanged() {
552        let mut wb = WorldBuilder::new();
553        wb.register(AuditLog(Vec::new()));
554        let mut world = wb.build();
555        let reg = world.registry();
556
557        fn sink(mut out: ResMut<AuditLog>, cmd: NewOrderCommand) {
558            out.0
559                .push(format!("sink: {} from {}", cmd.symbol, cmd.source));
560        }
561
562        let mut p = PipelineBuilder::<NewOrderCommand>::new()
563            .view::<AsOrderView>()
564            .tap(log_order, reg)
565            .end_view()
566            .then(sink, reg);
567
568        p.run(
569            &mut world,
570            NewOrderCommand {
571                source: "ops".into(),
572                symbol: "SOL".into(),
573                qty: 10,
574                price: 150.0,
575            },
576        );
577
578        let log = &world.resource::<AuditLog>().0;
579        assert_eq!(log[0], "SOL qty=10");
580        assert_eq!(log[1], "sink: SOL from ops");
581    }
582
583    #[test]
584    fn reusable_across_event_types() {
585        let mut wb = WorldBuilder::new();
586        wb.register(AuditLog(Vec::new()));
587        let mut world = wb.build();
588        let reg = world.registry();
589
590        let mut p_new = PipelineBuilder::<NewOrderCommand>::new()
591            .view::<AsOrderView>()
592            .tap(log_order, reg)
593            .end_view()
594            .then(|_: NewOrderCommand| {}, reg);
595
596        let mut p_amend = PipelineBuilder::<AmendOrderCommand>::new()
597            .view::<AsOrderView>()
598            .tap(log_order, reg) // SAME function
599            .end_view()
600            .then(|_: AmendOrderCommand| {}, reg);
601
602        p_new.run(
603            &mut world,
604            NewOrderCommand {
605                source: "a".into(),
606                symbol: "BTC".into(),
607                qty: 50,
608                price: 42000.0,
609            },
610        );
611        p_amend.run(
612            &mut world,
613            AmendOrderCommand {
614                order_id: 123,
615                symbol: "ETH".into(),
616                qty: 25,
617                price: 3000.0,
618            },
619        );
620
621        let log = &world.resource::<AuditLog>().0;
622        assert_eq!(log[0], "BTC qty=50");
623        assert_eq!(log[1], "ETH qty=25");
624    }
625
626    #[test]
627    fn multiple_taps_in_scope() {
628        let mut wb = WorldBuilder::new();
629        wb.register(AuditLog(Vec::new()));
630        let mut world = wb.build();
631        let reg = world.registry();
632
633        fn log_symbol(mut log: ResMut<AuditLog>, v: &OrderView) {
634            log.0.push(format!("symbol: {}", v.symbol));
635        }
636        fn log_qty(mut log: ResMut<AuditLog>, v: &OrderView) {
637            log.0.push(format!("qty: {}", v.qty));
638        }
639
640        let mut p = PipelineBuilder::<NewOrderCommand>::new()
641            .view::<AsOrderView>()
642            .tap(log_symbol, reg)
643            .tap(log_qty, reg)
644            .end_view()
645            .then(|_: NewOrderCommand| {}, reg);
646
647        p.run(
648            &mut world,
649            NewOrderCommand {
650                source: "a".into(),
651                symbol: "BTC".into(),
652                qty: 50,
653                price: 42000.0,
654            },
655        );
656
657        let log = &world.resource::<AuditLog>().0;
658        assert_eq!(log[0], "symbol: BTC");
659        assert_eq!(log[1], "qty: 50");
660    }
661
662    #[test]
663    fn sequential_views() {
664        struct SymbolView<'a> {
665            symbol: &'a str,
666        }
667        struct QtyView {
668            qty: u64,
669        }
670
671        struct AsSymbolView;
672        unsafe impl View<NewOrderCommand> for AsSymbolView {
673            type ViewType<'a> = SymbolView<'a>;
674            type StaticViewType = SymbolView<'static>;
675            fn view(source: &NewOrderCommand) -> SymbolView<'_> {
676                SymbolView {
677                    symbol: &source.symbol,
678                }
679            }
680        }
681
682        struct AsQtyView;
683        unsafe impl View<NewOrderCommand> for AsQtyView {
684            type ViewType<'a> = QtyView;
685            type StaticViewType = QtyView;
686            fn view(source: &NewOrderCommand) -> QtyView {
687                QtyView { qty: source.qty }
688            }
689        }
690
691        fn log_sym(mut log: ResMut<AuditLog>, v: &SymbolView) {
692            log.0.push(format!("sym: {}", v.symbol));
693        }
694        fn log_qty_view(mut log: ResMut<AuditLog>, v: &QtyView) {
695            log.0.push(format!("qty: {}", v.qty));
696        }
697
698        let mut wb = WorldBuilder::new();
699        wb.register(AuditLog(Vec::new()));
700        let mut world = wb.build();
701        let reg = world.registry();
702
703        let mut p = PipelineBuilder::<NewOrderCommand>::new()
704            .view::<AsSymbolView>()
705            .tap(log_sym, reg)
706            .end_view()
707            .view::<AsQtyView>()
708            .tap(log_qty_view, reg)
709            .end_view()
710            .then(|_: NewOrderCommand| {}, reg);
711
712        p.run(
713            &mut world,
714            NewOrderCommand {
715                source: "a".into(),
716                symbol: "BTC".into(),
717                qty: 50,
718                price: 42000.0,
719            },
720        );
721
722        let log = &world.resource::<AuditLog>().0;
723        assert_eq!(log[0], "sym: BTC");
724        assert_eq!(log[1], "qty: 50");
725    }
726
727    // -- DAG tests --
728
729    #[test]
730    fn dag_view_tap() {
731        use crate::{DagBuilder, Handler};
732
733        let mut wb = WorldBuilder::new();
734        wb.register(AuditLog(Vec::new()));
735        let mut world = wb.build();
736        let reg = world.registry();
737
738        let dag = DagBuilder::<NewOrderCommand>::new()
739            .root(|cmd: NewOrderCommand| cmd, reg)
740            .view::<AsOrderView>()
741            .tap(log_order, reg)
742            .end_view_dag()
743            .then(|_cmd: &NewOrderCommand| {}, reg);
744
745        let mut handler = dag.build();
746        handler.run(
747            &mut world,
748            NewOrderCommand {
749                source: "test".into(),
750                symbol: "BTC".into(),
751                qty: 50,
752                price: 42000.0,
753            },
754        );
755
756        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
757    }
758
759    #[test]
760    fn dag_view_guard() {
761        use crate::{DagBuilder, Handler};
762
763        let mut wb = WorldBuilder::new();
764        wb.register(AuditLog(Vec::new()));
765        wb.register(RiskLimits { max_qty: 100 });
766        let mut world = wb.build();
767        let reg = world.registry();
768
769        fn sink(mut log: ResMut<AuditLog>, val: &Option<NewOrderCommand>) {
770            if val.is_some() {
771                log.0.push("accepted".into());
772            } else {
773                log.0.push("rejected".into());
774            }
775        }
776
777        let dag = DagBuilder::<NewOrderCommand>::new()
778            .root(|cmd: NewOrderCommand| cmd, reg)
779            .view::<AsOrderView>()
780            .tap(log_order, reg)
781            .guard(check_risk, reg)
782            .end_view_dag_guarded()
783            .then(sink, reg);
784
785        let mut handler = dag.build();
786        handler.run(
787            &mut world,
788            NewOrderCommand {
789                source: "a".into(),
790                symbol: "BTC".into(),
791                qty: 50,
792                price: 42000.0,
793            },
794        );
795        handler.run(
796            &mut world,
797            NewOrderCommand {
798                source: "b".into(),
799                symbol: "ETH".into(),
800                qty: 200,
801                price: 3000.0,
802            },
803        );
804
805        let log = &world.resource::<AuditLog>().0;
806        assert_eq!(log[0], "BTC qty=50");
807        assert_eq!(log[1], "accepted");
808        assert_eq!(log[2], "ETH qty=200");
809        assert_eq!(log[3], "rejected");
810    }
811
812    #[test]
813    fn inspect_no_params() {
814        let mut wb = WorldBuilder::new();
815        wb.register(AuditLog(Vec::new()));
816        let mut world = wb.build();
817        let reg = world.registry();
818
819        // inspect takes no Params — just &View
820        fn just_print(v: &OrderView) {
821            assert!(!v.symbol.is_empty());
822        }
823
824        let mut p = PipelineBuilder::<NewOrderCommand>::new()
825            .view::<AsOrderView>()
826            .inspect(just_print, reg)
827            .tap(log_order, reg)
828            .end_view()
829            .then(|_: NewOrderCommand| {}, reg);
830
831        p.run(
832            &mut world,
833            NewOrderCommand {
834                source: "a".into(),
835                symbol: "BTC".into(),
836                qty: 50,
837                price: 42000.0,
838            },
839        );
840
841        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
842    }
843
844    #[test]
845    fn filter_rejects() {
846        let mut wb = WorldBuilder::new();
847        wb.register(AuditLog(Vec::new()));
848        wb.register(RiskLimits { max_qty: 100 });
849        let mut world = wb.build();
850        let reg = world.registry();
851
852        let mut p = PipelineBuilder::<NewOrderCommand>::new()
853            .view::<AsOrderView>()
854            .tap(log_order, reg)
855            .filter(check_risk, reg)
856            .end_view_guarded();
857
858        let result = p.run(
859            &mut world,
860            NewOrderCommand {
861                source: "a".into(),
862                symbol: "BTC".into(),
863                qty: 50,
864                price: 42000.0,
865            },
866        );
867        assert!(result.is_some());
868
869        let result = p.run(
870            &mut world,
871            NewOrderCommand {
872                source: "b".into(),
873                symbol: "ETH".into(),
874                qty: 200,
875                price: 3000.0,
876            },
877        );
878        assert!(result.is_none());
879    }
880
881    #[test]
882    fn guard_short_circuits_subsequent_tap() {
883        // guard BEFORE tap — if guard rejects, tap should NOT fire
884        let mut wb = WorldBuilder::new();
885        wb.register(AuditLog(Vec::new()));
886        wb.register(RiskLimits { max_qty: 100 });
887        let mut world = wb.build();
888        let reg = world.registry();
889
890        let mut p = PipelineBuilder::<NewOrderCommand>::new()
891            .view::<AsOrderView>()
892            .guard(check_risk, reg) // guard FIRST
893            .tap(log_order, reg) // tap AFTER — should NOT fire on rejection
894            .end_view_guarded();
895
896        // Accepted: guard passes, tap fires
897        let result = p.run(
898            &mut world,
899            NewOrderCommand {
900                source: "a".into(),
901                symbol: "BTC".into(),
902                qty: 50,
903                price: 42000.0,
904            },
905        );
906        assert!(result.is_some());
907        assert_eq!(world.resource::<AuditLog>().0.len(), 1);
908
909        // Rejected: guard fails, tap does NOT fire (short-circuit)
910        let result = p.run(
911            &mut world,
912            NewOrderCommand {
913                source: "b".into(),
914                symbol: "ETH".into(),
915                qty: 200,
916                price: 3000.0,
917            },
918        );
919        assert!(result.is_none());
920        assert_eq!(world.resource::<AuditLog>().0.len(), 1); // still 1, not 2
921    }
922
923    // -- Multi-field view tests -----------------------------------------------
924
925    struct FullOrderView<'a> {
926        source: &'a str,
927        symbol: &'a str,
928        qty: u64,
929        price: f64,
930    }
931
932    struct AsFullOrderView;
933    unsafe impl View<NewOrderCommand> for AsFullOrderView {
934        type ViewType<'a> = FullOrderView<'a>;
935        type StaticViewType = FullOrderView<'static>;
936        fn view(source: &NewOrderCommand) -> FullOrderView<'_> {
937            FullOrderView {
938                source: &source.source,
939                symbol: &source.symbol,
940                qty: source.qty,
941                price: source.price,
942            }
943        }
944    }
945
946    #[test]
947    fn view_with_multiple_borrowed_fields() {
948        let mut wb = WorldBuilder::new();
949        wb.register(AuditLog(Vec::new()));
950        let mut world = wb.build();
951        let reg = world.registry();
952
953        fn log_full(mut log: ResMut<AuditLog>, v: &FullOrderView) {
954            log.0.push(format!(
955                "{} {} qty={} px={}",
956                v.source, v.symbol, v.qty, v.price
957            ));
958        }
959
960        let mut p = PipelineBuilder::<NewOrderCommand>::new()
961            .view::<AsFullOrderView>()
962            .tap(log_full, reg)
963            .end_view()
964            .then(|_: NewOrderCommand| {}, reg);
965
966        p.run(
967            &mut world,
968            NewOrderCommand {
969                source: "desk-a".into(),
970                symbol: "BTC".into(),
971                qty: 50,
972                price: 42000.0,
973            },
974        );
975
976        assert_eq!(
977            world.resource::<AuditLog>().0,
978            vec!["desk-a BTC qty=50 px=42000"]
979        );
980    }
981
982    // -- Non-Copy type tests --------------------------------------------------
983
984    struct Payload {
985        data: Vec<u8>,
986        tag: String,
987    }
988
989    struct PayloadView<'a> {
990        data: &'a [u8],
991        tag: &'a str,
992    }
993
994    struct AsPayloadView;
995    unsafe impl View<Payload> for AsPayloadView {
996        type ViewType<'a> = PayloadView<'a>;
997        type StaticViewType = PayloadView<'static>;
998        fn view(source: &Payload) -> PayloadView<'_> {
999            PayloadView {
1000                data: &source.data,
1001                tag: &source.tag,
1002            }
1003        }
1004    }
1005
1006    #[test]
1007    fn view_of_non_copy_types() {
1008        let mut wb = WorldBuilder::new();
1009        wb.register(AuditLog(Vec::new()));
1010        let mut world = wb.build();
1011        let reg = world.registry();
1012
1013        fn log_payload(mut log: ResMut<AuditLog>, v: &PayloadView) {
1014            log.0.push(format!("tag={} len={}", v.tag, v.data.len()));
1015        }
1016
1017        let mut p = PipelineBuilder::<Payload>::new()
1018            .view::<AsPayloadView>()
1019            .tap(log_payload, reg)
1020            .end_view()
1021            .then(|_: Payload| {}, reg);
1022
1023        p.run(
1024            &mut world,
1025            Payload {
1026                data: vec![1, 2, 3],
1027                tag: "test".into(),
1028            },
1029        );
1030
1031        assert_eq!(world.resource::<AuditLog>().0, vec!["tag=test len=3"]);
1032    }
1033
1034    #[test]
1035    fn view_guard_preserves_non_copy_event() {
1036        let mut wb = WorldBuilder::new();
1037        wb.register(AuditLog(Vec::new()));
1038        let mut world = wb.build();
1039        let reg = world.registry();
1040
1041        fn check_tag(v: &PayloadView) -> bool {
1042            v.tag == "accept"
1043        }
1044
1045        let mut p = PipelineBuilder::<Payload>::new()
1046            .view::<AsPayloadView>()
1047            .guard(check_tag, reg)
1048            .end_view_guarded();
1049
1050        let accepted = p.run(
1051            &mut world,
1052            Payload {
1053                data: vec![1],
1054                tag: "accept".into(),
1055            },
1056        );
1057        assert!(accepted.is_some());
1058        assert_eq!(accepted.unwrap().data, vec![1]);
1059
1060        let rejected = p.run(
1061            &mut world,
1062            Payload {
1063                data: vec![2],
1064                tag: "reject".into(),
1065            },
1066        );
1067        assert!(rejected.is_none());
1068    }
1069
1070    #[test]
1071    fn view_guard_inside_view() {
1072        // Guard inside view scope — tests guarded view with accept/reject
1073        let mut wb = WorldBuilder::new();
1074        wb.register(AuditLog(Vec::new()));
1075        let mut world = wb.build();
1076        let reg = world.registry();
1077
1078        fn accept_tag(v: &PayloadView) -> bool {
1079            v.tag == "accept"
1080        }
1081
1082        let mut p = PipelineBuilder::<Payload>::new()
1083            .view::<AsPayloadView>()
1084            .guard(accept_tag, reg)
1085            .end_view_guarded();
1086
1087        // Wrong tag — guarded out
1088        let result = p.run(
1089            &mut world,
1090            Payload {
1091                data: vec![1],
1092                tag: "reject".into(),
1093            },
1094        );
1095        assert!(result.is_none());
1096
1097        // Right tag — passes through
1098        let result = p.run(
1099            &mut world,
1100            Payload {
1101                data: vec![1, 2, 3],
1102                tag: "accept".into(),
1103            },
1104        );
1105        assert!(result.is_some());
1106        assert_eq!(result.unwrap().data, vec![1, 2, 3]);
1107    }
1108
1109    #[test]
1110    fn view_tap_with_world_resources() {
1111        // View tap step reads a World resource
1112        let mut wb = WorldBuilder::new();
1113        wb.register(AuditLog(Vec::new()));
1114        let mut world = wb.build();
1115        let reg = world.registry();
1116
1117        fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
1118            log.0.push(format!("{}:{}", v.symbol, v.qty));
1119        }
1120
1121        let mut p = PipelineBuilder::<NewOrderCommand>::new()
1122            .view::<AsOrderView>()
1123            .tap(log_order, reg)
1124            .end_view();
1125
1126        p.run(
1127            &mut world,
1128            NewOrderCommand {
1129                source: "test".into(),
1130                symbol: "BTC".into(),
1131                qty: 100,
1132                price: 50000.0,
1133            },
1134        );
1135        p.run(
1136            &mut world,
1137            NewOrderCommand {
1138                source: "test".into(),
1139                symbol: "ETH".into(),
1140                qty: 50,
1141                price: 3000.0,
1142            },
1143        );
1144
1145        assert_eq!(world.resource::<AuditLog>().0, vec!["BTC:100", "ETH:50"]);
1146    }
1147
1148    #[test]
1149    fn view_repeated_dispatch() {
1150        // Stress: same pipeline dispatched many times — no leaks, no drift
1151        let mut wb = WorldBuilder::new();
1152        wb.register(AuditLog(Vec::new()));
1153        let mut world = wb.build();
1154        let reg = world.registry();
1155
1156        fn count(mut log: ResMut<AuditLog>, _v: &OrderView) {
1157            log.0.push("hit".into());
1158        }
1159
1160        let mut p = PipelineBuilder::<NewOrderCommand>::new()
1161            .view::<AsOrderView>()
1162            .tap(count, reg)
1163            .end_view();
1164
1165        for _ in 0..100 {
1166            p.run(
1167                &mut world,
1168                NewOrderCommand {
1169                    source: "stress".into(),
1170                    symbol: "X".into(),
1171                    qty: 1,
1172                    price: 1.0,
1173                },
1174            );
1175        }
1176
1177        assert_eq!(world.resource::<AuditLog>().0.len(), 100);
1178    }
1179}