#![allow(clippy::type_complexity)]
use core::marker::PhantomData;
use crate::pipeline::{ChainCall, IntoRefStep, PipelineChain, RefStepCall};
use crate::world::World;
#[diagnostic::on_unimplemented(
message = "`{Source}` cannot be viewed as `{Self}`",
note = "implement `View<{Source}>` for your view marker type"
)]
pub unsafe trait View<Source> {
type ViewType<'a>
where
Source: 'a;
type StaticViewType: 'static;
fn view(source: &Source) -> Self::ViewType<'_>;
}
#[inline(always)]
fn with_view<Source, V, R>(source: &Source, f: impl for<'a> FnOnce(&'a V::StaticViewType) -> R) -> R
where
V: View<Source>,
{
let view = V::view(source);
let static_ref: &V::StaticViewType =
unsafe { &*(std::ptr::from_ref(&view) as *const V::StaticViewType) };
let result = f(static_ref);
drop(view);
result
}
pub struct ViewScope<In, Out, V: View<Out>, PrevChain, InnerSteps> {
prev_chain: PrevChain,
inner: InnerSteps,
_marker: PhantomData<(fn(In) -> Out, V)>,
}
impl<In, Out, V: View<Out>, PrevChain> ViewScope<In, Out, V, PrevChain, ()> {
pub(crate) fn new(prev_chain: PrevChain) -> Self {
ViewScope {
prev_chain,
inner: (),
_marker: PhantomData,
}
}
}
impl<In, Out, V: View<Out>, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps> {
pub fn tap<Params, S: IntoRefStep<V::StaticViewType, (), Params>>(
self,
f: S,
registry: &crate::world::Registry,
) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
ViewScope {
prev_chain: self.prev_chain,
inner: (self.inner, ViewTap(f.into_ref_step(registry))),
_marker: PhantomData,
}
}
pub fn inspect<S: IntoRefStep<V::StaticViewType, (), ()>>(
self,
f: S,
registry: &crate::world::Registry,
) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewTap<S::Step>)> {
self.tap(f, registry)
}
pub fn filter<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
self,
f: S,
registry: &crate::world::Registry,
) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
self.guard(f, registry)
}
pub fn guard<Params, S: IntoRefStep<V::StaticViewType, bool, Params>>(
self,
f: S,
registry: &crate::world::Registry,
) -> ViewScope<In, Out, V, PrevChain, (InnerSteps, ViewGuard<S::Step>)> {
ViewScope {
prev_chain: self.prev_chain,
inner: (self.inner, ViewGuard(f.into_ref_step(registry))),
_marker: PhantomData,
}
}
}
#[doc(hidden)]
pub struct ViewTap<S>(S);
#[doc(hidden)]
pub struct ViewGuard<S>(S);
#[doc(hidden)]
pub trait ViewSteps<V> {
fn run(&mut self, world: &mut World, view: &V) -> bool;
}
impl<V> ViewSteps<V> for () {
fn run(&mut self, _world: &mut World, _view: &V) -> bool {
true
}
}
impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = ()>> ViewSteps<V> for (Prev, ViewTap<S>) {
fn run(&mut self, world: &mut World, view: &V) -> bool {
if !self.0.run(world, view) {
return false;
}
self.1.0.call(world, view);
true
}
}
impl<V, Prev: ViewSteps<V>, S: RefStepCall<V, Out = bool>> ViewSteps<V> for (Prev, ViewGuard<S>) {
fn run(&mut self, world: &mut World, view: &V) -> bool {
if !self.0.run(world, view) {
return false;
}
self.1.0.call(world, view)
}
}
impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
where
PrevChain: ChainCall<In, Out = Out>,
V: View<Out>,
InnerSteps: ViewSteps<V::StaticViewType>,
{
pub fn end_view(self) -> PipelineChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
PipelineChain {
chain: ViewNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
pub fn end_view_guarded(
self,
) -> PipelineChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
PipelineChain {
chain: ViewGuardedNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
}
#[doc(hidden)]
pub struct ViewNode<Prev, Inner, V> {
prev: Prev,
inner: Inner,
_marker: PhantomData<V>,
}
impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewNode<Prev, Inner, V>
where
Prev: ChainCall<In, Out = Out>,
V: View<Out>,
Inner: ViewSteps<V::StaticViewType>,
{
type Out = Out;
fn call(&mut self, world: &mut World, input: In) -> Out {
let event = self.prev.call(world, input);
with_view::<Out, V, ()>(&event, |view| {
self.inner.run(world, view);
});
event
}
}
#[doc(hidden)]
pub struct ViewGuardedNode<Prev, Inner, V> {
prev: Prev,
inner: Inner,
_marker: PhantomData<V>,
}
impl<In, Out, Prev, Inner, V> ChainCall<In> for ViewGuardedNode<Prev, Inner, V>
where
Prev: ChainCall<In, Out = Out>,
V: View<Out>,
Inner: ViewSteps<V::StaticViewType>,
{
type Out = Option<Out>;
fn call(&mut self, world: &mut World, input: In) -> Option<Out> {
let event = self.prev.call(world, input);
let pass = with_view::<Out, V, bool>(&event, |view| self.inner.run(world, view));
if pass { Some(event) } else { None }
}
}
impl<In> crate::pipeline::PipelineBuilder<In> {
pub fn view<V: View<In>>(self) -> ViewScope<In, In, V, crate::pipeline::IdentityNode, ()> {
ViewScope::new(crate::pipeline::IdentityNode)
}
}
impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
pub fn view<V: View<Out>>(self) -> ViewScope<In, Out, V, Chain, ()> {
ViewScope::new(self.chain)
}
}
impl<In, Out, V, PrevChain, InnerSteps> ViewScope<In, Out, V, PrevChain, InnerSteps>
where
PrevChain: ChainCall<In, Out = Out>,
V: View<Out>,
InnerSteps: ViewSteps<V::StaticViewType>,
Out: 'static,
{
pub fn end_view_dag(self) -> crate::dag::DagChain<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
crate::dag::DagChain {
chain: ViewNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
pub fn end_view_dag_guarded(
self,
) -> crate::dag::DagChain<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
crate::dag::DagChain {
chain: ViewGuardedNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
pub fn end_view_arm(self) -> crate::dag::DagArm<In, Out, ViewNode<PrevChain, InnerSteps, V>> {
crate::dag::DagArm {
chain: ViewNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
pub fn end_view_arm_guarded(
self,
) -> crate::dag::DagArm<In, Option<Out>, ViewGuardedNode<PrevChain, InnerSteps, V>> {
crate::dag::DagArm {
chain: ViewGuardedNode {
prev: self.prev_chain,
inner: self.inner,
_marker: PhantomData,
},
_marker: PhantomData,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{PipelineBuilder, Res, ResMut, Resource, WorldBuilder};
struct AuditLog(Vec<String>);
impl Resource for AuditLog {}
struct RiskLimits {
max_qty: u64,
}
impl Resource for RiskLimits {}
struct OrderView<'a> {
symbol: &'a str,
qty: u64,
}
struct NewOrderCommand {
source: String,
symbol: String,
qty: u64,
#[allow(dead_code)]
price: f64,
}
struct AmendOrderCommand {
#[allow(dead_code)]
order_id: u64,
symbol: String,
qty: u64,
#[allow(dead_code)]
price: f64,
}
struct AsOrderView;
unsafe impl View<NewOrderCommand> for AsOrderView {
type ViewType<'a> = OrderView<'a>;
type StaticViewType = OrderView<'static>;
fn view(source: &NewOrderCommand) -> OrderView<'_> {
OrderView {
symbol: &source.symbol,
qty: source.qty,
}
}
}
unsafe impl View<AmendOrderCommand> for AsOrderView {
type ViewType<'a> = OrderView<'a>;
type StaticViewType = OrderView<'static>;
fn view(source: &AmendOrderCommand) -> OrderView<'_> {
OrderView {
symbol: &source.symbol,
qty: source.qty,
}
}
}
fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
log.0.push(format!("{} qty={}", v.symbol, v.qty));
}
fn check_risk(limits: Res<RiskLimits>, v: &OrderView) -> bool {
v.qty <= limits.max_qty
}
#[test]
fn tap_observes_view() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.end_view()
.then(|_cmd: NewOrderCommand| {}, reg);
p.run(
&mut world,
NewOrderCommand {
source: "test".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
}
#[test]
fn guard_rejects() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
wb.register(RiskLimits { max_qty: 100 });
let mut world = wb.build();
let reg = world.registry();
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.guard(check_risk, reg)
.end_view_guarded();
let result = p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert!(result.is_some());
let result = p.run(
&mut world,
NewOrderCommand {
source: "b".into(),
symbol: "ETH".into(),
qty: 200,
price: 3000.0,
},
);
assert!(result.is_none());
assert_eq!(world.resource::<AuditLog>().0.len(), 2);
}
#[test]
fn event_passes_through_unchanged() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn sink(mut out: ResMut<AuditLog>, cmd: NewOrderCommand) {
out.0
.push(format!("sink: {} from {}", cmd.symbol, cmd.source));
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.end_view()
.then(sink, reg);
p.run(
&mut world,
NewOrderCommand {
source: "ops".into(),
symbol: "SOL".into(),
qty: 10,
price: 150.0,
},
);
let log = &world.resource::<AuditLog>().0;
assert_eq!(log[0], "SOL qty=10");
assert_eq!(log[1], "sink: SOL from ops");
}
#[test]
fn reusable_across_event_types() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
let mut p_new = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.end_view()
.then(|_: NewOrderCommand| {}, reg);
let mut p_amend = PipelineBuilder::<AmendOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg) .end_view()
.then(|_: AmendOrderCommand| {}, reg);
p_new.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
p_amend.run(
&mut world,
AmendOrderCommand {
order_id: 123,
symbol: "ETH".into(),
qty: 25,
price: 3000.0,
},
);
let log = &world.resource::<AuditLog>().0;
assert_eq!(log[0], "BTC qty=50");
assert_eq!(log[1], "ETH qty=25");
}
#[test]
fn multiple_taps_in_scope() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn log_symbol(mut log: ResMut<AuditLog>, v: &OrderView) {
log.0.push(format!("symbol: {}", v.symbol));
}
fn log_qty(mut log: ResMut<AuditLog>, v: &OrderView) {
log.0.push(format!("qty: {}", v.qty));
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_symbol, reg)
.tap(log_qty, reg)
.end_view()
.then(|_: NewOrderCommand| {}, reg);
p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
let log = &world.resource::<AuditLog>().0;
assert_eq!(log[0], "symbol: BTC");
assert_eq!(log[1], "qty: 50");
}
#[test]
fn sequential_views() {
struct SymbolView<'a> {
symbol: &'a str,
}
struct QtyView {
qty: u64,
}
struct AsSymbolView;
unsafe impl View<NewOrderCommand> for AsSymbolView {
type ViewType<'a> = SymbolView<'a>;
type StaticViewType = SymbolView<'static>;
fn view(source: &NewOrderCommand) -> SymbolView<'_> {
SymbolView {
symbol: &source.symbol,
}
}
}
struct AsQtyView;
unsafe impl View<NewOrderCommand> for AsQtyView {
type ViewType<'a> = QtyView;
type StaticViewType = QtyView;
fn view(source: &NewOrderCommand) -> QtyView {
QtyView { qty: source.qty }
}
}
fn log_sym(mut log: ResMut<AuditLog>, v: &SymbolView) {
log.0.push(format!("sym: {}", v.symbol));
}
fn log_qty_view(mut log: ResMut<AuditLog>, v: &QtyView) {
log.0.push(format!("qty: {}", v.qty));
}
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsSymbolView>()
.tap(log_sym, reg)
.end_view()
.view::<AsQtyView>()
.tap(log_qty_view, reg)
.end_view()
.then(|_: NewOrderCommand| {}, reg);
p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
let log = &world.resource::<AuditLog>().0;
assert_eq!(log[0], "sym: BTC");
assert_eq!(log[1], "qty: 50");
}
#[test]
fn dag_view_tap() {
use crate::{DagBuilder, Handler};
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
let dag = DagBuilder::<NewOrderCommand>::new()
.root(|cmd: NewOrderCommand| cmd, reg)
.view::<AsOrderView>()
.tap(log_order, reg)
.end_view_dag()
.then(|_cmd: &NewOrderCommand| {}, reg);
let mut handler = dag.build();
handler.run(
&mut world,
NewOrderCommand {
source: "test".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
}
#[test]
fn dag_view_guard() {
use crate::{DagBuilder, Handler};
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
wb.register(RiskLimits { max_qty: 100 });
let mut world = wb.build();
let reg = world.registry();
fn sink(mut log: ResMut<AuditLog>, val: &Option<NewOrderCommand>) {
if val.is_some() {
log.0.push("accepted".into());
} else {
log.0.push("rejected".into());
}
}
let dag = DagBuilder::<NewOrderCommand>::new()
.root(|cmd: NewOrderCommand| cmd, reg)
.view::<AsOrderView>()
.tap(log_order, reg)
.guard(check_risk, reg)
.end_view_dag_guarded()
.then(sink, reg);
let mut handler = dag.build();
handler.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
handler.run(
&mut world,
NewOrderCommand {
source: "b".into(),
symbol: "ETH".into(),
qty: 200,
price: 3000.0,
},
);
let log = &world.resource::<AuditLog>().0;
assert_eq!(log[0], "BTC qty=50");
assert_eq!(log[1], "accepted");
assert_eq!(log[2], "ETH qty=200");
assert_eq!(log[3], "rejected");
}
#[test]
fn inspect_no_params() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn just_print(v: &OrderView) {
assert!(!v.symbol.is_empty());
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.inspect(just_print, reg)
.tap(log_order, reg)
.end_view()
.then(|_: NewOrderCommand| {}, reg);
p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert_eq!(world.resource::<AuditLog>().0, vec!["BTC qty=50"]);
}
#[test]
fn filter_rejects() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
wb.register(RiskLimits { max_qty: 100 });
let mut world = wb.build();
let reg = world.registry();
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.filter(check_risk, reg)
.end_view_guarded();
let result = p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert!(result.is_some());
let result = p.run(
&mut world,
NewOrderCommand {
source: "b".into(),
symbol: "ETH".into(),
qty: 200,
price: 3000.0,
},
);
assert!(result.is_none());
}
#[test]
fn guard_short_circuits_subsequent_tap() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
wb.register(RiskLimits { max_qty: 100 });
let mut world = wb.build();
let reg = world.registry();
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.guard(check_risk, reg) .tap(log_order, reg) .end_view_guarded();
let result = p.run(
&mut world,
NewOrderCommand {
source: "a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert!(result.is_some());
assert_eq!(world.resource::<AuditLog>().0.len(), 1);
let result = p.run(
&mut world,
NewOrderCommand {
source: "b".into(),
symbol: "ETH".into(),
qty: 200,
price: 3000.0,
},
);
assert!(result.is_none());
assert_eq!(world.resource::<AuditLog>().0.len(), 1); }
struct FullOrderView<'a> {
source: &'a str,
symbol: &'a str,
qty: u64,
price: f64,
}
struct AsFullOrderView;
unsafe impl View<NewOrderCommand> for AsFullOrderView {
type ViewType<'a> = FullOrderView<'a>;
type StaticViewType = FullOrderView<'static>;
fn view(source: &NewOrderCommand) -> FullOrderView<'_> {
FullOrderView {
source: &source.source,
symbol: &source.symbol,
qty: source.qty,
price: source.price,
}
}
}
#[test]
fn view_with_multiple_borrowed_fields() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn log_full(mut log: ResMut<AuditLog>, v: &FullOrderView) {
log.0.push(format!(
"{} {} qty={} px={}",
v.source, v.symbol, v.qty, v.price
));
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsFullOrderView>()
.tap(log_full, reg)
.end_view()
.then(|_: NewOrderCommand| {}, reg);
p.run(
&mut world,
NewOrderCommand {
source: "desk-a".into(),
symbol: "BTC".into(),
qty: 50,
price: 42000.0,
},
);
assert_eq!(
world.resource::<AuditLog>().0,
vec!["desk-a BTC qty=50 px=42000"]
);
}
struct Payload {
data: Vec<u8>,
tag: String,
}
struct PayloadView<'a> {
data: &'a [u8],
tag: &'a str,
}
struct AsPayloadView;
unsafe impl View<Payload> for AsPayloadView {
type ViewType<'a> = PayloadView<'a>;
type StaticViewType = PayloadView<'static>;
fn view(source: &Payload) -> PayloadView<'_> {
PayloadView {
data: &source.data,
tag: &source.tag,
}
}
}
#[test]
fn view_of_non_copy_types() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn log_payload(mut log: ResMut<AuditLog>, v: &PayloadView) {
log.0.push(format!("tag={} len={}", v.tag, v.data.len()));
}
let mut p = PipelineBuilder::<Payload>::new()
.view::<AsPayloadView>()
.tap(log_payload, reg)
.end_view()
.then(|_: Payload| {}, reg);
p.run(
&mut world,
Payload {
data: vec![1, 2, 3],
tag: "test".into(),
},
);
assert_eq!(world.resource::<AuditLog>().0, vec!["tag=test len=3"]);
}
#[test]
fn view_guard_preserves_non_copy_event() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn check_tag(v: &PayloadView) -> bool {
v.tag == "accept"
}
let mut p = PipelineBuilder::<Payload>::new()
.view::<AsPayloadView>()
.guard(check_tag, reg)
.end_view_guarded();
let accepted = p.run(
&mut world,
Payload {
data: vec![1],
tag: "accept".into(),
},
);
assert!(accepted.is_some());
assert_eq!(accepted.unwrap().data, vec![1]);
let rejected = p.run(
&mut world,
Payload {
data: vec![2],
tag: "reject".into(),
},
);
assert!(rejected.is_none());
}
#[test]
fn view_guard_inside_view() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn accept_tag(v: &PayloadView) -> bool {
v.tag == "accept"
}
let mut p = PipelineBuilder::<Payload>::new()
.view::<AsPayloadView>()
.guard(accept_tag, reg)
.end_view_guarded();
let result = p.run(
&mut world,
Payload {
data: vec![1],
tag: "reject".into(),
},
);
assert!(result.is_none());
let result = p.run(
&mut world,
Payload {
data: vec![1, 2, 3],
tag: "accept".into(),
},
);
assert!(result.is_some());
assert_eq!(result.unwrap().data, vec![1, 2, 3]);
}
#[test]
fn view_tap_with_world_resources() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn log_order(mut log: ResMut<AuditLog>, v: &OrderView) {
log.0.push(format!("{}:{}", v.symbol, v.qty));
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(log_order, reg)
.end_view();
p.run(
&mut world,
NewOrderCommand {
source: "test".into(),
symbol: "BTC".into(),
qty: 100,
price: 50000.0,
},
);
p.run(
&mut world,
NewOrderCommand {
source: "test".into(),
symbol: "ETH".into(),
qty: 50,
price: 3000.0,
},
);
assert_eq!(world.resource::<AuditLog>().0, vec!["BTC:100", "ETH:50"]);
}
#[test]
fn view_repeated_dispatch() {
let mut wb = WorldBuilder::new();
wb.register(AuditLog(Vec::new()));
let mut world = wb.build();
let reg = world.registry();
fn count(mut log: ResMut<AuditLog>, _v: &OrderView) {
log.0.push("hit".into());
}
let mut p = PipelineBuilder::<NewOrderCommand>::new()
.view::<AsOrderView>()
.tap(count, reg)
.end_view();
for _ in 0..100 {
p.run(
&mut world,
NewOrderCommand {
source: "stress".into(),
symbol: "X".into(),
qty: 1,
price: 1.0,
},
);
}
assert_eq!(world.resource::<AuditLog>().0.len(), 100);
}
}